aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--MAINTAINERS7
-rw-r--r--drivers/block/rbd.c193
-rw-r--r--fs/ceph/acl.c14
-rw-r--r--fs/ceph/addr.c19
-rw-r--r--fs/ceph/caps.c127
-rw-r--r--fs/ceph/dir.c33
-rw-r--r--fs/ceph/file.c37
-rw-r--r--fs/ceph/inode.c41
-rw-r--r--fs/ceph/mds_client.c127
-rw-r--r--fs/ceph/mds_client.h2
-rw-r--r--fs/ceph/snap.c54
-rw-r--r--fs/ceph/super.c4
-rw-r--r--fs/ceph/super.h5
-rw-r--r--include/linux/ceph/ceph_fs.h37
-rw-r--r--include/linux/ceph/libceph.h3
-rw-r--r--include/linux/ceph/messenger.h4
-rw-r--r--include/linux/ceph/mon_client.h9
-rw-r--r--net/ceph/ceph_common.c16
-rw-r--r--net/ceph/ceph_strings.c14
-rw-r--r--net/ceph/debugfs.c2
-rw-r--r--net/ceph/messenger.c14
-rw-r--r--net/ceph/mon_client.c139
-rw-r--r--net/ceph/osd_client.c31
23 files changed, 444 insertions, 488 deletions
diff --git a/MAINTAINERS b/MAINTAINERS
index 1921ed58d1a0..7cfcee4e2bea 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -2433,7 +2433,8 @@ F: arch/powerpc/oprofile/*cell*
2433F: arch/powerpc/platforms/cell/ 2433F: arch/powerpc/platforms/cell/
2434 2434
2435CEPH DISTRIBUTED FILE SYSTEM CLIENT 2435CEPH DISTRIBUTED FILE SYSTEM CLIENT
2436M: Sage Weil <sage@inktank.com> 2436M: Yan, Zheng <zyan@redhat.com>
2437M: Sage Weil <sage@redhat.com>
2437L: ceph-devel@vger.kernel.org 2438L: ceph-devel@vger.kernel.org
2438W: http://ceph.com/ 2439W: http://ceph.com/
2439T: git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git 2440T: git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git
@@ -7998,8 +7999,8 @@ S: Supported
7998F: drivers/net/wireless/ath/wcn36xx/ 7999F: drivers/net/wireless/ath/wcn36xx/
7999 8000
8000RADOS BLOCK DEVICE (RBD) 8001RADOS BLOCK DEVICE (RBD)
8001M: Yehuda Sadeh <yehuda@inktank.com> 8002M: Ilya Dryomov <idryomov@gmail.com>
8002M: Sage Weil <sage@inktank.com> 8003M: Sage Weil <sage@redhat.com>
8003M: Alex Elder <elder@kernel.org> 8004M: Alex Elder <elder@kernel.org>
8004M: ceph-devel@vger.kernel.org 8005M: ceph-devel@vger.kernel.org
8005W: http://ceph.com/ 8006W: http://ceph.com/
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 8a86b62466f7..b40af3203089 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -38,6 +38,7 @@
38#include <linux/kernel.h> 38#include <linux/kernel.h>
39#include <linux/device.h> 39#include <linux/device.h>
40#include <linux/module.h> 40#include <linux/module.h>
41#include <linux/blk-mq.h>
41#include <linux/fs.h> 42#include <linux/fs.h>
42#include <linux/blkdev.h> 43#include <linux/blkdev.h>
43#include <linux/slab.h> 44#include <linux/slab.h>
@@ -340,9 +341,7 @@ struct rbd_device {
340 341
341 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 342 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
342 343
343 struct list_head rq_queue; /* incoming rq queue */
344 spinlock_t lock; /* queue, flags, open_count */ 344 spinlock_t lock; /* queue, flags, open_count */
345 struct work_struct rq_work;
346 345
347 struct rbd_image_header header; 346 struct rbd_image_header header;
348 unsigned long flags; /* possibly lock protected */ 347 unsigned long flags; /* possibly lock protected */
@@ -360,6 +359,9 @@ struct rbd_device {
360 atomic_t parent_ref; 359 atomic_t parent_ref;
361 struct rbd_device *parent; 360 struct rbd_device *parent;
362 361
362 /* Block layer tags. */
363 struct blk_mq_tag_set tag_set;
364
363 /* protects updating the header */ 365 /* protects updating the header */
364 struct rw_semaphore header_rwsem; 366 struct rw_semaphore header_rwsem;
365 367
@@ -1817,7 +1819,8 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1817 1819
1818 /* 1820 /*
1819 * We support a 64-bit length, but ultimately it has to be 1821 * We support a 64-bit length, but ultimately it has to be
1820 * passed to blk_end_request(), which takes an unsigned int. 1822 * passed to the block layer, which just supports a 32-bit
1823 * length field.
1821 */ 1824 */
1822 obj_request->xferred = osd_req->r_reply_op_len[0]; 1825 obj_request->xferred = osd_req->r_reply_op_len[0];
1823 rbd_assert(obj_request->xferred < (u64)UINT_MAX); 1826 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
@@ -2275,7 +2278,10 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2275 more = obj_request->which < img_request->obj_request_count - 1; 2278 more = obj_request->which < img_request->obj_request_count - 1;
2276 } else { 2279 } else {
2277 rbd_assert(img_request->rq != NULL); 2280 rbd_assert(img_request->rq != NULL);
2278 more = blk_end_request(img_request->rq, result, xferred); 2281
2282 more = blk_update_request(img_request->rq, result, xferred);
2283 if (!more)
2284 __blk_mq_end_request(img_request->rq, result);
2279 } 2285 }
2280 2286
2281 return more; 2287 return more;
@@ -3304,8 +3310,10 @@ out:
3304 return ret; 3310 return ret;
3305} 3311}
3306 3312
3307static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq) 3313static void rbd_queue_workfn(struct work_struct *work)
3308{ 3314{
3315 struct request *rq = blk_mq_rq_from_pdu(work);
3316 struct rbd_device *rbd_dev = rq->q->queuedata;
3309 struct rbd_img_request *img_request; 3317 struct rbd_img_request *img_request;
3310 struct ceph_snap_context *snapc = NULL; 3318 struct ceph_snap_context *snapc = NULL;
3311 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT; 3319 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
@@ -3314,6 +3322,13 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
3314 u64 mapping_size; 3322 u64 mapping_size;
3315 int result; 3323 int result;
3316 3324
3325 if (rq->cmd_type != REQ_TYPE_FS) {
3326 dout("%s: non-fs request type %d\n", __func__,
3327 (int) rq->cmd_type);
3328 result = -EIO;
3329 goto err;
3330 }
3331
3317 if (rq->cmd_flags & REQ_DISCARD) 3332 if (rq->cmd_flags & REQ_DISCARD)
3318 op_type = OBJ_OP_DISCARD; 3333 op_type = OBJ_OP_DISCARD;
3319 else if (rq->cmd_flags & REQ_WRITE) 3334 else if (rq->cmd_flags & REQ_WRITE)
@@ -3359,6 +3374,8 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
3359 goto err_rq; /* Shouldn't happen */ 3374 goto err_rq; /* Shouldn't happen */
3360 } 3375 }
3361 3376
3377 blk_mq_start_request(rq);
3378
3362 down_read(&rbd_dev->header_rwsem); 3379 down_read(&rbd_dev->header_rwsem);
3363 mapping_size = rbd_dev->mapping.size; 3380 mapping_size = rbd_dev->mapping.size;
3364 if (op_type != OBJ_OP_READ) { 3381 if (op_type != OBJ_OP_READ) {
@@ -3404,53 +3421,18 @@ err_rq:
3404 rbd_warn(rbd_dev, "%s %llx at %llx result %d", 3421 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3405 obj_op_name(op_type), length, offset, result); 3422 obj_op_name(op_type), length, offset, result);
3406 ceph_put_snap_context(snapc); 3423 ceph_put_snap_context(snapc);
3407 blk_end_request_all(rq, result); 3424err:
3425 blk_mq_end_request(rq, result);
3408} 3426}
3409 3427
3410static void rbd_request_workfn(struct work_struct *work) 3428static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
3429 const struct blk_mq_queue_data *bd)
3411{ 3430{
3412 struct rbd_device *rbd_dev = 3431 struct request *rq = bd->rq;
3413 container_of(work, struct rbd_device, rq_work); 3432 struct work_struct *work = blk_mq_rq_to_pdu(rq);
3414 struct request *rq, *next;
3415 LIST_HEAD(requests);
3416
3417 spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
3418 list_splice_init(&rbd_dev->rq_queue, &requests);
3419 spin_unlock_irq(&rbd_dev->lock);
3420 3433
3421 list_for_each_entry_safe(rq, next, &requests, queuelist) { 3434 queue_work(rbd_wq, work);
3422 list_del_init(&rq->queuelist); 3435 return BLK_MQ_RQ_QUEUE_OK;
3423 rbd_handle_request(rbd_dev, rq);
3424 }
3425}
3426
3427/*
3428 * Called with q->queue_lock held and interrupts disabled, possibly on
3429 * the way to schedule(). Do not sleep here!
3430 */
3431static void rbd_request_fn(struct request_queue *q)
3432{
3433 struct rbd_device *rbd_dev = q->queuedata;
3434 struct request *rq;
3435 int queued = 0;
3436
3437 rbd_assert(rbd_dev);
3438
3439 while ((rq = blk_fetch_request(q))) {
3440 /* Ignore any non-FS requests that filter through. */
3441 if (rq->cmd_type != REQ_TYPE_FS) {
3442 dout("%s: non-fs request type %d\n", __func__,
3443 (int) rq->cmd_type);
3444 __blk_end_request_all(rq, 0);
3445 continue;
3446 }
3447
3448 list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
3449 queued++;
3450 }
3451
3452 if (queued)
3453 queue_work(rbd_wq, &rbd_dev->rq_work);
3454} 3436}
3455 3437
3456/* 3438/*
@@ -3511,6 +3493,7 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
3511 del_gendisk(disk); 3493 del_gendisk(disk);
3512 if (disk->queue) 3494 if (disk->queue)
3513 blk_cleanup_queue(disk->queue); 3495 blk_cleanup_queue(disk->queue);
3496 blk_mq_free_tag_set(&rbd_dev->tag_set);
3514 } 3497 }
3515 put_disk(disk); 3498 put_disk(disk);
3516} 3499}
@@ -3694,7 +3677,7 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3694 3677
3695 ret = rbd_dev_header_info(rbd_dev); 3678 ret = rbd_dev_header_info(rbd_dev);
3696 if (ret) 3679 if (ret)
3697 return ret; 3680 goto out;
3698 3681
3699 /* 3682 /*
3700 * If there is a parent, see if it has disappeared due to the 3683 * If there is a parent, see if it has disappeared due to the
@@ -3703,30 +3686,46 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3703 if (rbd_dev->parent) { 3686 if (rbd_dev->parent) {
3704 ret = rbd_dev_v2_parent_info(rbd_dev); 3687 ret = rbd_dev_v2_parent_info(rbd_dev);
3705 if (ret) 3688 if (ret)
3706 return ret; 3689 goto out;
3707 } 3690 }
3708 3691
3709 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) { 3692 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3710 if (rbd_dev->mapping.size != rbd_dev->header.image_size) 3693 rbd_dev->mapping.size = rbd_dev->header.image_size;
3711 rbd_dev->mapping.size = rbd_dev->header.image_size;
3712 } else { 3694 } else {
3713 /* validate mapped snapshot's EXISTS flag */ 3695 /* validate mapped snapshot's EXISTS flag */
3714 rbd_exists_validate(rbd_dev); 3696 rbd_exists_validate(rbd_dev);
3715 } 3697 }
3716 3698
3699out:
3717 up_write(&rbd_dev->header_rwsem); 3700 up_write(&rbd_dev->header_rwsem);
3718 3701 if (!ret && mapping_size != rbd_dev->mapping.size)
3719 if (mapping_size != rbd_dev->mapping.size)
3720 rbd_dev_update_size(rbd_dev); 3702 rbd_dev_update_size(rbd_dev);
3721 3703
3704 return ret;
3705}
3706
3707static int rbd_init_request(void *data, struct request *rq,
3708 unsigned int hctx_idx, unsigned int request_idx,
3709 unsigned int numa_node)
3710{
3711 struct work_struct *work = blk_mq_rq_to_pdu(rq);
3712
3713 INIT_WORK(work, rbd_queue_workfn);
3722 return 0; 3714 return 0;
3723} 3715}
3724 3716
3717static struct blk_mq_ops rbd_mq_ops = {
3718 .queue_rq = rbd_queue_rq,
3719 .map_queue = blk_mq_map_queue,
3720 .init_request = rbd_init_request,
3721};
3722
3725static int rbd_init_disk(struct rbd_device *rbd_dev) 3723static int rbd_init_disk(struct rbd_device *rbd_dev)
3726{ 3724{
3727 struct gendisk *disk; 3725 struct gendisk *disk;
3728 struct request_queue *q; 3726 struct request_queue *q;
3729 u64 segment_size; 3727 u64 segment_size;
3728 int err;
3730 3729
3731 /* create gendisk info */ 3730 /* create gendisk info */
3732 disk = alloc_disk(single_major ? 3731 disk = alloc_disk(single_major ?
@@ -3744,10 +3743,25 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
3744 disk->fops = &rbd_bd_ops; 3743 disk->fops = &rbd_bd_ops;
3745 disk->private_data = rbd_dev; 3744 disk->private_data = rbd_dev;
3746 3745
3747 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock); 3746 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
3748 if (!q) 3747 rbd_dev->tag_set.ops = &rbd_mq_ops;
3748 rbd_dev->tag_set.queue_depth = BLKDEV_MAX_RQ;
3749 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
3750 rbd_dev->tag_set.flags =
3751 BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
3752 rbd_dev->tag_set.nr_hw_queues = 1;
3753 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
3754
3755 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
3756 if (err)
3749 goto out_disk; 3757 goto out_disk;
3750 3758
3759 q = blk_mq_init_queue(&rbd_dev->tag_set);
3760 if (IS_ERR(q)) {
3761 err = PTR_ERR(q);
3762 goto out_tag_set;
3763 }
3764
3751 /* We use the default size, but let's be explicit about it. */ 3765 /* We use the default size, but let's be explicit about it. */
3752 blk_queue_physical_block_size(q, SECTOR_SIZE); 3766 blk_queue_physical_block_size(q, SECTOR_SIZE);
3753 3767
@@ -3773,10 +3787,11 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
3773 rbd_dev->disk = disk; 3787 rbd_dev->disk = disk;
3774 3788
3775 return 0; 3789 return 0;
3790out_tag_set:
3791 blk_mq_free_tag_set(&rbd_dev->tag_set);
3776out_disk: 3792out_disk:
3777 put_disk(disk); 3793 put_disk(disk);
3778 3794 return err;
3779 return -ENOMEM;
3780} 3795}
3781 3796
3782/* 3797/*
@@ -4033,8 +4048,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4033 return NULL; 4048 return NULL;
4034 4049
4035 spin_lock_init(&rbd_dev->lock); 4050 spin_lock_init(&rbd_dev->lock);
4036 INIT_LIST_HEAD(&rbd_dev->rq_queue);
4037 INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
4038 rbd_dev->flags = 0; 4051 rbd_dev->flags = 0;
4039 atomic_set(&rbd_dev->parent_ref, 0); 4052 atomic_set(&rbd_dev->parent_ref, 0);
4040 INIT_LIST_HEAD(&rbd_dev->node); 4053 INIT_LIST_HEAD(&rbd_dev->node);
@@ -4274,32 +4287,22 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4274 } 4287 }
4275 4288
4276 /* 4289 /*
4277 * We always update the parent overlap. If it's zero we 4290 * We always update the parent overlap. If it's zero we issue
4278 * treat it specially. 4291 * a warning, as we will proceed as if there was no parent.
4279 */ 4292 */
4280 rbd_dev->parent_overlap = overlap;
4281 if (!overlap) { 4293 if (!overlap) {
4282
4283 /* A null parent_spec indicates it's the initial probe */
4284
4285 if (parent_spec) { 4294 if (parent_spec) {
4286 /* 4295 /* refresh, careful to warn just once */
4287 * The overlap has become zero, so the clone 4296 if (rbd_dev->parent_overlap)
4288 * must have been resized down to 0 at some 4297 rbd_warn(rbd_dev,
4289 * point. Treat this the same as a flatten. 4298 "clone now standalone (overlap became 0)");
4290 */
4291 rbd_dev_parent_put(rbd_dev);
4292 pr_info("%s: clone image now standalone\n",
4293 rbd_dev->disk->disk_name);
4294 } else { 4299 } else {
4295 /* 4300 /* initial probe */
4296 * For the initial probe, if we find the 4301 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
4297 * overlap is zero we just pretend there was
4298 * no parent image.
4299 */
4300 rbd_warn(rbd_dev, "ignoring parent with overlap 0");
4301 } 4302 }
4302 } 4303 }
4304 rbd_dev->parent_overlap = overlap;
4305
4303out: 4306out:
4304 ret = 0; 4307 ret = 0;
4305out_err: 4308out_err:
@@ -4771,36 +4774,6 @@ static inline size_t next_token(const char **buf)
4771} 4774}
4772 4775
4773/* 4776/*
4774 * Finds the next token in *buf, and if the provided token buffer is
4775 * big enough, copies the found token into it. The result, if
4776 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4777 * must be terminated with '\0' on entry.
4778 *
4779 * Returns the length of the token found (not including the '\0').
4780 * Return value will be 0 if no token is found, and it will be >=
4781 * token_size if the token would not fit.
4782 *
4783 * The *buf pointer will be updated to point beyond the end of the
4784 * found token. Note that this occurs even if the token buffer is
4785 * too small to hold it.
4786 */
4787static inline size_t copy_token(const char **buf,
4788 char *token,
4789 size_t token_size)
4790{
4791 size_t len;
4792
4793 len = next_token(buf);
4794 if (len < token_size) {
4795 memcpy(token, *buf, len);
4796 *(token + len) = '\0';
4797 }
4798 *buf += len;
4799
4800 return len;
4801}
4802
4803/*
4804 * Finds the next token in *buf, dynamically allocates a buffer big 4777 * Finds the next token in *buf, dynamically allocates a buffer big
4805 * enough to hold a copy of it, and copies the token into the new 4778 * enough to hold a copy of it, and copies the token into the new
4806 * buffer. The copy is guaranteed to be terminated with '\0'. Note 4779 * buffer. The copy is guaranteed to be terminated with '\0'. Note
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 5bd853ba44ff..64fa248343f6 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -40,20 +40,6 @@ static inline void ceph_set_cached_acl(struct inode *inode,
40 spin_unlock(&ci->i_ceph_lock); 40 spin_unlock(&ci->i_ceph_lock);
41} 41}
42 42
43static inline struct posix_acl *ceph_get_cached_acl(struct inode *inode,
44 int type)
45{
46 struct ceph_inode_info *ci = ceph_inode(inode);
47 struct posix_acl *acl = ACL_NOT_CACHED;
48
49 spin_lock(&ci->i_ceph_lock);
50 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0))
51 acl = get_cached_acl(inode, type);
52 spin_unlock(&ci->i_ceph_lock);
53
54 return acl;
55}
56
57struct posix_acl *ceph_get_acl(struct inode *inode, int type) 43struct posix_acl *ceph_get_acl(struct inode *inode, int type)
58{ 44{
59 int size; 45 int size;
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 24be059fd1f8..fd5599d32362 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -196,17 +196,22 @@ static int readpage_nounlock(struct file *filp, struct page *page)
196 u64 len = PAGE_CACHE_SIZE; 196 u64 len = PAGE_CACHE_SIZE;
197 197
198 if (off >= i_size_read(inode)) { 198 if (off >= i_size_read(inode)) {
199 zero_user_segment(page, err, PAGE_CACHE_SIZE); 199 zero_user_segment(page, 0, PAGE_CACHE_SIZE);
200 SetPageUptodate(page); 200 SetPageUptodate(page);
201 return 0; 201 return 0;
202 } 202 }
203 203
204 /* 204 if (ci->i_inline_version != CEPH_INLINE_NONE) {
205 * Uptodate inline data should have been added into page cache 205 /*
206 * while getting Fcr caps. 206 * Uptodate inline data should have been added
207 */ 207 * into page cache while getting Fcr caps.
208 if (ci->i_inline_version != CEPH_INLINE_NONE) 208 */
209 return -EINVAL; 209 if (off == 0)
210 return -EINVAL;
211 zero_user_segment(page, 0, PAGE_CACHE_SIZE);
212 SetPageUptodate(page);
213 return 0;
214 }
210 215
211 err = ceph_readpage_from_fscache(inode, page); 216 err = ceph_readpage_from_fscache(inode, page);
212 if (err == 0) 217 if (err == 0)
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index b93c631c6c87..8172775428a0 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -577,7 +577,6 @@ void ceph_add_cap(struct inode *inode,
577 struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc, 577 struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
578 realmino); 578 realmino);
579 if (realm) { 579 if (realm) {
580 ceph_get_snap_realm(mdsc, realm);
581 spin_lock(&realm->inodes_with_caps_lock); 580 spin_lock(&realm->inodes_with_caps_lock);
582 ci->i_snap_realm = realm; 581 ci->i_snap_realm = realm;
583 list_add(&ci->i_snap_realm_item, 582 list_add(&ci->i_snap_realm_item,
@@ -1451,8 +1450,8 @@ static int __mark_caps_flushing(struct inode *inode,
1451 spin_lock(&mdsc->cap_dirty_lock); 1450 spin_lock(&mdsc->cap_dirty_lock);
1452 list_del_init(&ci->i_dirty_item); 1451 list_del_init(&ci->i_dirty_item);
1453 1452
1454 ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
1455 if (list_empty(&ci->i_flushing_item)) { 1453 if (list_empty(&ci->i_flushing_item)) {
1454 ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
1456 list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); 1455 list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
1457 mdsc->num_cap_flushing++; 1456 mdsc->num_cap_flushing++;
1458 dout(" inode %p now flushing seq %lld\n", inode, 1457 dout(" inode %p now flushing seq %lld\n", inode,
@@ -2073,17 +2072,16 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
2073 * requested from the MDS. 2072 * requested from the MDS.
2074 */ 2073 */
2075static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, 2074static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2076 loff_t endoff, int *got, struct page **pinned_page, 2075 loff_t endoff, int *got, int *check_max, int *err)
2077 int *check_max, int *err)
2078{ 2076{
2079 struct inode *inode = &ci->vfs_inode; 2077 struct inode *inode = &ci->vfs_inode;
2080 int ret = 0; 2078 int ret = 0;
2081 int have, implemented, _got = 0; 2079 int have, implemented;
2082 int file_wanted; 2080 int file_wanted;
2083 2081
2084 dout("get_cap_refs %p need %s want %s\n", inode, 2082 dout("get_cap_refs %p need %s want %s\n", inode,
2085 ceph_cap_string(need), ceph_cap_string(want)); 2083 ceph_cap_string(need), ceph_cap_string(want));
2086again: 2084
2087 spin_lock(&ci->i_ceph_lock); 2085 spin_lock(&ci->i_ceph_lock);
2088 2086
2089 /* make sure file is actually open */ 2087 /* make sure file is actually open */
@@ -2138,50 +2136,34 @@ again:
2138 inode, ceph_cap_string(have), ceph_cap_string(not), 2136 inode, ceph_cap_string(have), ceph_cap_string(not),
2139 ceph_cap_string(revoking)); 2137 ceph_cap_string(revoking));
2140 if ((revoking & not) == 0) { 2138 if ((revoking & not) == 0) {
2141 _got = need | (have & want); 2139 *got = need | (have & want);
2142 __take_cap_refs(ci, _got); 2140 __take_cap_refs(ci, *got);
2143 ret = 1; 2141 ret = 1;
2144 } 2142 }
2145 } else { 2143 } else {
2144 int session_readonly = false;
2145 if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) {
2146 struct ceph_mds_session *s = ci->i_auth_cap->session;
2147 spin_lock(&s->s_cap_lock);
2148 session_readonly = s->s_readonly;
2149 spin_unlock(&s->s_cap_lock);
2150 }
2151 if (session_readonly) {
2152 dout("get_cap_refs %p needed %s but mds%d readonly\n",
2153 inode, ceph_cap_string(need), ci->i_auth_cap->mds);
2154 *err = -EROFS;
2155 ret = 1;
2156 goto out_unlock;
2157 }
2158
2146 dout("get_cap_refs %p have %s needed %s\n", inode, 2159 dout("get_cap_refs %p have %s needed %s\n", inode,
2147 ceph_cap_string(have), ceph_cap_string(need)); 2160 ceph_cap_string(have), ceph_cap_string(need));
2148 } 2161 }
2149out_unlock: 2162out_unlock:
2150 spin_unlock(&ci->i_ceph_lock); 2163 spin_unlock(&ci->i_ceph_lock);
2151 2164
2152 if (ci->i_inline_version != CEPH_INLINE_NONE &&
2153 (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
2154 i_size_read(inode) > 0) {
2155 int ret1;
2156 struct page *page = find_get_page(inode->i_mapping, 0);
2157 if (page) {
2158 if (PageUptodate(page)) {
2159 *pinned_page = page;
2160 goto out;
2161 }
2162 page_cache_release(page);
2163 }
2164 /*
2165 * drop cap refs first because getattr while holding
2166 * caps refs can cause deadlock.
2167 */
2168 ceph_put_cap_refs(ci, _got);
2169 _got = 0;
2170
2171 /* getattr request will bring inline data into page cache */
2172 ret1 = __ceph_do_getattr(inode, NULL,
2173 CEPH_STAT_CAP_INLINE_DATA, true);
2174 if (ret1 >= 0) {
2175 ret = 0;
2176 goto again;
2177 }
2178 *err = ret1;
2179 ret = 1;
2180 }
2181out:
2182 dout("get_cap_refs %p ret %d got %s\n", inode, 2165 dout("get_cap_refs %p ret %d got %s\n", inode,
2183 ret, ceph_cap_string(_got)); 2166 ret, ceph_cap_string(*got));
2184 *got = _got;
2185 return ret; 2167 return ret;
2186} 2168}
2187 2169
@@ -2221,22 +2203,52 @@ static void check_max_size(struct inode *inode, loff_t endoff)
2221int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, 2203int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
2222 loff_t endoff, int *got, struct page **pinned_page) 2204 loff_t endoff, int *got, struct page **pinned_page)
2223{ 2205{
2224 int check_max, ret, err; 2206 int _got, check_max, ret, err = 0;
2225 2207
2226retry: 2208retry:
2227 if (endoff > 0) 2209 if (endoff > 0)
2228 check_max_size(&ci->vfs_inode, endoff); 2210 check_max_size(&ci->vfs_inode, endoff);
2211 _got = 0;
2229 check_max = 0; 2212 check_max = 0;
2230 err = 0;
2231 ret = wait_event_interruptible(ci->i_cap_wq, 2213 ret = wait_event_interruptible(ci->i_cap_wq,
2232 try_get_cap_refs(ci, need, want, endoff, 2214 try_get_cap_refs(ci, need, want, endoff,
2233 got, pinned_page, 2215 &_got, &check_max, &err));
2234 &check_max, &err));
2235 if (err) 2216 if (err)
2236 ret = err; 2217 ret = err;
2218 if (ret < 0)
2219 return ret;
2220
2237 if (check_max) 2221 if (check_max)
2238 goto retry; 2222 goto retry;
2239 return ret; 2223
2224 if (ci->i_inline_version != CEPH_INLINE_NONE &&
2225 (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
2226 i_size_read(&ci->vfs_inode) > 0) {
2227 struct page *page = find_get_page(ci->vfs_inode.i_mapping, 0);
2228 if (page) {
2229 if (PageUptodate(page)) {
2230 *pinned_page = page;
2231 goto out;
2232 }
2233 page_cache_release(page);
2234 }
2235 /*
2236 * drop cap refs first because getattr while holding
2237 * caps refs can cause deadlock.
2238 */
2239 ceph_put_cap_refs(ci, _got);
2240 _got = 0;
2241
2242 /* getattr request will bring inline data into page cache */
2243 ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
2244 CEPH_STAT_CAP_INLINE_DATA, true);
2245 if (ret < 0)
2246 return ret;
2247 goto retry;
2248 }
2249out:
2250 *got = _got;
2251 return 0;
2240} 2252}
2241 2253
2242/* 2254/*
@@ -2432,13 +2444,13 @@ static void invalidate_aliases(struct inode *inode)
2432 */ 2444 */
2433static void handle_cap_grant(struct ceph_mds_client *mdsc, 2445static void handle_cap_grant(struct ceph_mds_client *mdsc,
2434 struct inode *inode, struct ceph_mds_caps *grant, 2446 struct inode *inode, struct ceph_mds_caps *grant,
2435 void *snaptrace, int snaptrace_len,
2436 u64 inline_version, 2447 u64 inline_version,
2437 void *inline_data, int inline_len, 2448 void *inline_data, int inline_len,
2438 struct ceph_buffer *xattr_buf, 2449 struct ceph_buffer *xattr_buf,
2439 struct ceph_mds_session *session, 2450 struct ceph_mds_session *session,
2440 struct ceph_cap *cap, int issued) 2451 struct ceph_cap *cap, int issued)
2441 __releases(ci->i_ceph_lock) 2452 __releases(ci->i_ceph_lock)
2453 __releases(mdsc->snap_rwsem)
2442{ 2454{
2443 struct ceph_inode_info *ci = ceph_inode(inode); 2455 struct ceph_inode_info *ci = ceph_inode(inode);
2444 int mds = session->s_mds; 2456 int mds = session->s_mds;
@@ -2639,10 +2651,6 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
2639 spin_unlock(&ci->i_ceph_lock); 2651 spin_unlock(&ci->i_ceph_lock);
2640 2652
2641 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { 2653 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
2642 down_write(&mdsc->snap_rwsem);
2643 ceph_update_snap_trace(mdsc, snaptrace,
2644 snaptrace + snaptrace_len, false);
2645 downgrade_write(&mdsc->snap_rwsem);
2646 kick_flushing_inode_caps(mdsc, session, inode); 2654 kick_flushing_inode_caps(mdsc, session, inode);
2647 up_read(&mdsc->snap_rwsem); 2655 up_read(&mdsc->snap_rwsem);
2648 if (newcaps & ~issued) 2656 if (newcaps & ~issued)
@@ -3052,6 +3060,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3052 struct ceph_cap *cap; 3060 struct ceph_cap *cap;
3053 struct ceph_mds_caps *h; 3061 struct ceph_mds_caps *h;
3054 struct ceph_mds_cap_peer *peer = NULL; 3062 struct ceph_mds_cap_peer *peer = NULL;
3063 struct ceph_snap_realm *realm;
3055 int mds = session->s_mds; 3064 int mds = session->s_mds;
3056 int op, issued; 3065 int op, issued;
3057 u32 seq, mseq; 3066 u32 seq, mseq;
@@ -3153,11 +3162,23 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3153 goto done_unlocked; 3162 goto done_unlocked;
3154 3163
3155 case CEPH_CAP_OP_IMPORT: 3164 case CEPH_CAP_OP_IMPORT:
3165 realm = NULL;
3166 if (snaptrace_len) {
3167 down_write(&mdsc->snap_rwsem);
3168 ceph_update_snap_trace(mdsc, snaptrace,
3169 snaptrace + snaptrace_len,
3170 false, &realm);
3171 downgrade_write(&mdsc->snap_rwsem);
3172 } else {
3173 down_read(&mdsc->snap_rwsem);
3174 }
3156 handle_cap_import(mdsc, inode, h, peer, session, 3175 handle_cap_import(mdsc, inode, h, peer, session,
3157 &cap, &issued); 3176 &cap, &issued);
3158 handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len, 3177 handle_cap_grant(mdsc, inode, h,
3159 inline_version, inline_data, inline_len, 3178 inline_version, inline_data, inline_len,
3160 msg->middle, session, cap, issued); 3179 msg->middle, session, cap, issued);
3180 if (realm)
3181 ceph_put_snap_realm(mdsc, realm);
3161 goto done_unlocked; 3182 goto done_unlocked;
3162 } 3183 }
3163 3184
@@ -3177,7 +3198,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3177 case CEPH_CAP_OP_GRANT: 3198 case CEPH_CAP_OP_GRANT:
3178 __ceph_caps_issued(ci, &issued); 3199 __ceph_caps_issued(ci, &issued);
3179 issued |= __ceph_caps_dirty(ci); 3200 issued |= __ceph_caps_dirty(ci);
3180 handle_cap_grant(mdsc, inode, h, NULL, 0, 3201 handle_cap_grant(mdsc, inode, h,
3181 inline_version, inline_data, inline_len, 3202 inline_version, inline_data, inline_len,
3182 msg->middle, session, cap, issued); 3203 msg->middle, session, cap, issued);
3183 goto done_unlocked; 3204 goto done_unlocked;
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index c241603764fd..0411dbb15815 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -26,8 +26,6 @@
26 * point by name. 26 * point by name.
27 */ 27 */
28 28
29const struct inode_operations ceph_dir_iops;
30const struct file_operations ceph_dir_fops;
31const struct dentry_operations ceph_dentry_ops; 29const struct dentry_operations ceph_dentry_ops;
32 30
33/* 31/*
@@ -672,13 +670,17 @@ int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry)
672 /* 670 /*
673 * We created the item, then did a lookup, and found 671 * We created the item, then did a lookup, and found
674 * it was already linked to another inode we already 672 * it was already linked to another inode we already
675 * had in our cache (and thus got spliced). Link our 673 * had in our cache (and thus got spliced). To not
676 * dentry to that inode, but don't hash it, just in 674 * confuse VFS (especially when inode is a directory),
677 * case the VFS wants to dereference it. 675 * we don't link our dentry to that inode, return an
676 * error instead.
677 *
678 * This event should be rare and it happens only when
679 * we talk to old MDS. Recent MDS does not send traceless
680 * reply for request that creates new inode.
678 */ 681 */
679 BUG_ON(!result->d_inode); 682 d_drop(result);
680 d_instantiate(dentry, result->d_inode); 683 return -ESTALE;
681 return 0;
682 } 684 }
683 return PTR_ERR(result); 685 return PTR_ERR(result);
684} 686}
@@ -1335,6 +1337,13 @@ const struct file_operations ceph_dir_fops = {
1335 .fsync = ceph_dir_fsync, 1337 .fsync = ceph_dir_fsync,
1336}; 1338};
1337 1339
1340const struct file_operations ceph_snapdir_fops = {
1341 .iterate = ceph_readdir,
1342 .llseek = ceph_dir_llseek,
1343 .open = ceph_open,
1344 .release = ceph_release,
1345};
1346
1338const struct inode_operations ceph_dir_iops = { 1347const struct inode_operations ceph_dir_iops = {
1339 .lookup = ceph_lookup, 1348 .lookup = ceph_lookup,
1340 .permission = ceph_permission, 1349 .permission = ceph_permission,
@@ -1357,6 +1366,14 @@ const struct inode_operations ceph_dir_iops = {
1357 .atomic_open = ceph_atomic_open, 1366 .atomic_open = ceph_atomic_open,
1358}; 1367};
1359 1368
1369const struct inode_operations ceph_snapdir_iops = {
1370 .lookup = ceph_lookup,
1371 .permission = ceph_permission,
1372 .getattr = ceph_getattr,
1373 .mkdir = ceph_mkdir,
1374 .rmdir = ceph_unlink,
1375};
1376
1360const struct dentry_operations ceph_dentry_ops = { 1377const struct dentry_operations ceph_dentry_ops = {
1361 .d_revalidate = ceph_d_revalidate, 1378 .d_revalidate = ceph_d_revalidate,
1362 .d_release = ceph_d_release, 1379 .d_release = ceph_d_release,
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 905986dd4c3c..a3d774b35149 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -275,10 +275,10 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
275 err = ceph_mdsc_do_request(mdsc, 275 err = ceph_mdsc_do_request(mdsc,
276 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, 276 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
277 req); 277 req);
278 err = ceph_handle_snapdir(req, dentry, err);
278 if (err) 279 if (err)
279 goto out_req; 280 goto out_req;
280 281
281 err = ceph_handle_snapdir(req, dentry, err);
282 if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) 282 if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
283 err = ceph_handle_notrace_create(dir, dentry); 283 err = ceph_handle_notrace_create(dir, dentry);
284 284
@@ -392,13 +392,14 @@ more:
392 if (ret >= 0) { 392 if (ret >= 0) {
393 int didpages; 393 int didpages;
394 if (was_short && (pos + ret < inode->i_size)) { 394 if (was_short && (pos + ret < inode->i_size)) {
395 u64 tmp = min(this_len - ret, 395 int zlen = min(this_len - ret,
396 inode->i_size - pos - ret); 396 inode->i_size - pos - ret);
397 int zoff = (o_direct ? buf_align : io_align) +
398 read + ret;
397 dout(" zero gap %llu to %llu\n", 399 dout(" zero gap %llu to %llu\n",
398 pos + ret, pos + ret + tmp); 400 pos + ret, pos + ret + zlen);
399 ceph_zero_page_vector_range(page_align + read + ret, 401 ceph_zero_page_vector_range(zoff, zlen, pages);
400 tmp, pages); 402 ret += zlen;
401 ret += tmp;
402 } 403 }
403 404
404 didpages = (page_align + ret) >> PAGE_CACHE_SHIFT; 405 didpages = (page_align + ret) >> PAGE_CACHE_SHIFT;
@@ -878,28 +879,34 @@ again:
878 879
879 i_size = i_size_read(inode); 880 i_size = i_size_read(inode);
880 if (retry_op == READ_INLINE) { 881 if (retry_op == READ_INLINE) {
881 /* does not support inline data > PAGE_SIZE */ 882 BUG_ON(ret > 0 || read > 0);
882 if (i_size > PAGE_CACHE_SIZE) { 883 if (iocb->ki_pos < i_size &&
883 ret = -EIO; 884 iocb->ki_pos < PAGE_CACHE_SIZE) {
884 } else if (iocb->ki_pos < i_size) {
885 loff_t end = min_t(loff_t, i_size, 885 loff_t end = min_t(loff_t, i_size,
886 iocb->ki_pos + len); 886 iocb->ki_pos + len);
887 end = min_t(loff_t, end, PAGE_CACHE_SIZE);
887 if (statret < end) 888 if (statret < end)
888 zero_user_segment(page, statret, end); 889 zero_user_segment(page, statret, end);
889 ret = copy_page_to_iter(page, 890 ret = copy_page_to_iter(page,
890 iocb->ki_pos & ~PAGE_MASK, 891 iocb->ki_pos & ~PAGE_MASK,
891 end - iocb->ki_pos, to); 892 end - iocb->ki_pos, to);
892 iocb->ki_pos += ret; 893 iocb->ki_pos += ret;
893 } else { 894 read += ret;
894 ret = 0; 895 }
896 if (iocb->ki_pos < i_size && read < len) {
897 size_t zlen = min_t(size_t, len - read,
898 i_size - iocb->ki_pos);
899 ret = iov_iter_zero(zlen, to);
900 iocb->ki_pos += ret;
901 read += ret;
895 } 902 }
896 __free_pages(page, 0); 903 __free_pages(page, 0);
897 return ret; 904 return read;
898 } 905 }
899 906
900 /* hit EOF or hole? */ 907 /* hit EOF or hole? */
901 if (retry_op == CHECK_EOF && iocb->ki_pos < i_size && 908 if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
902 ret < len) { 909 ret < len) {
903 dout("sync_read hit hole, ppos %lld < size %lld" 910 dout("sync_read hit hole, ppos %lld < size %lld"
904 ", reading more\n", iocb->ki_pos, 911 ", reading more\n", iocb->ki_pos,
905 inode->i_size); 912 inode->i_size);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 6b5173605154..119c43c80638 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -82,8 +82,8 @@ struct inode *ceph_get_snapdir(struct inode *parent)
82 inode->i_mode = parent->i_mode; 82 inode->i_mode = parent->i_mode;
83 inode->i_uid = parent->i_uid; 83 inode->i_uid = parent->i_uid;
84 inode->i_gid = parent->i_gid; 84 inode->i_gid = parent->i_gid;
85 inode->i_op = &ceph_dir_iops; 85 inode->i_op = &ceph_snapdir_iops;
86 inode->i_fop = &ceph_dir_fops; 86 inode->i_fop = &ceph_snapdir_fops;
87 ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */ 87 ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */
88 ci->i_rbytes = 0; 88 ci->i_rbytes = 0;
89 return inode; 89 return inode;
@@ -838,30 +838,31 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
838 ceph_vinop(inode), inode->i_mode); 838 ceph_vinop(inode), inode->i_mode);
839 } 839 }
840 840
841 /* set dir completion flag? */
842 if (S_ISDIR(inode->i_mode) &&
843 ci->i_files == 0 && ci->i_subdirs == 0 &&
844 ceph_snap(inode) == CEPH_NOSNAP &&
845 (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
846 (issued & CEPH_CAP_FILE_EXCL) == 0 &&
847 !__ceph_dir_is_complete(ci)) {
848 dout(" marking %p complete (empty)\n", inode);
849 __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
850 ci->i_ordered_count);
851 }
852
853 /* were we issued a capability? */ 841 /* were we issued a capability? */
854 if (info->cap.caps) { 842 if (info->cap.caps) {
855 if (ceph_snap(inode) == CEPH_NOSNAP) { 843 if (ceph_snap(inode) == CEPH_NOSNAP) {
844 unsigned caps = le32_to_cpu(info->cap.caps);
856 ceph_add_cap(inode, session, 845 ceph_add_cap(inode, session,
857 le64_to_cpu(info->cap.cap_id), 846 le64_to_cpu(info->cap.cap_id),
858 cap_fmode, 847 cap_fmode, caps,
859 le32_to_cpu(info->cap.caps),
860 le32_to_cpu(info->cap.wanted), 848 le32_to_cpu(info->cap.wanted),
861 le32_to_cpu(info->cap.seq), 849 le32_to_cpu(info->cap.seq),
862 le32_to_cpu(info->cap.mseq), 850 le32_to_cpu(info->cap.mseq),
863 le64_to_cpu(info->cap.realm), 851 le64_to_cpu(info->cap.realm),
864 info->cap.flags, &new_cap); 852 info->cap.flags, &new_cap);
853
854 /* set dir completion flag? */
855 if (S_ISDIR(inode->i_mode) &&
856 ci->i_files == 0 && ci->i_subdirs == 0 &&
857 (caps & CEPH_CAP_FILE_SHARED) &&
858 (issued & CEPH_CAP_FILE_EXCL) == 0 &&
859 !__ceph_dir_is_complete(ci)) {
860 dout(" marking %p complete (empty)\n", inode);
861 __ceph_dir_set_complete(ci,
862 atomic_read(&ci->i_release_count),
863 ci->i_ordered_count);
864 }
865
865 wake = true; 866 wake = true;
866 } else { 867 } else {
867 dout(" %p got snap_caps %s\n", inode, 868 dout(" %p got snap_caps %s\n", inode,
@@ -1446,12 +1447,14 @@ retry_lookup:
1446 } 1447 }
1447 1448
1448 if (!dn->d_inode) { 1449 if (!dn->d_inode) {
1449 dn = splice_dentry(dn, in, NULL); 1450 struct dentry *realdn = splice_dentry(dn, in, NULL);
1450 if (IS_ERR(dn)) { 1451 if (IS_ERR(realdn)) {
1451 err = PTR_ERR(dn); 1452 err = PTR_ERR(realdn);
1453 d_drop(dn);
1452 dn = NULL; 1454 dn = NULL;
1453 goto next_item; 1455 goto next_item;
1454 } 1456 }
1457 dn = realdn;
1455 } 1458 }
1456 1459
1457 di = dn->d_fsdata; 1460 di = dn->d_fsdata;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 5f62fb7a5d0a..71c073f38e54 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -480,6 +480,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
480 mdsc->max_sessions = newmax; 480 mdsc->max_sessions = newmax;
481 } 481 }
482 mdsc->sessions[mds] = s; 482 mdsc->sessions[mds] = s;
483 atomic_inc(&mdsc->num_sessions);
483 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 484 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */
484 485
485 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 486 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
@@ -503,6 +504,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc,
503 mdsc->sessions[s->s_mds] = NULL; 504 mdsc->sessions[s->s_mds] = NULL;
504 ceph_con_close(&s->s_con); 505 ceph_con_close(&s->s_con);
505 ceph_put_mds_session(s); 506 ceph_put_mds_session(s);
507 atomic_dec(&mdsc->num_sessions);
506} 508}
507 509
508/* 510/*
@@ -842,8 +844,9 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
842 struct ceph_options *opt = mdsc->fsc->client->options; 844 struct ceph_options *opt = mdsc->fsc->client->options;
843 void *p; 845 void *p;
844 846
845 const char* metadata[3][2] = { 847 const char* metadata[][2] = {
846 {"hostname", utsname()->nodename}, 848 {"hostname", utsname()->nodename},
849 {"kernel_version", utsname()->release},
847 {"entity_id", opt->name ? opt->name : ""}, 850 {"entity_id", opt->name ? opt->name : ""},
848 {NULL, NULL} 851 {NULL, NULL}
849 }; 852 };
@@ -1464,19 +1467,33 @@ out_unlocked:
1464 return err; 1467 return err;
1465} 1468}
1466 1469
1470static int check_cap_flush(struct inode *inode, u64 want_flush_seq)
1471{
1472 struct ceph_inode_info *ci = ceph_inode(inode);
1473 int ret;
1474 spin_lock(&ci->i_ceph_lock);
1475 if (ci->i_flushing_caps)
1476 ret = ci->i_cap_flush_seq >= want_flush_seq;
1477 else
1478 ret = 1;
1479 spin_unlock(&ci->i_ceph_lock);
1480 return ret;
1481}
1482
1467/* 1483/*
1468 * flush all dirty inode data to disk. 1484 * flush all dirty inode data to disk.
1469 * 1485 *
1470 * returns true if we've flushed through want_flush_seq 1486 * returns true if we've flushed through want_flush_seq
1471 */ 1487 */
1472static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) 1488static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1473{ 1489{
1474 int mds, ret = 1; 1490 int mds;
1475 1491
1476 dout("check_cap_flush want %lld\n", want_flush_seq); 1492 dout("check_cap_flush want %lld\n", want_flush_seq);
1477 mutex_lock(&mdsc->mutex); 1493 mutex_lock(&mdsc->mutex);
1478 for (mds = 0; ret && mds < mdsc->max_sessions; mds++) { 1494 for (mds = 0; mds < mdsc->max_sessions; mds++) {
1479 struct ceph_mds_session *session = mdsc->sessions[mds]; 1495 struct ceph_mds_session *session = mdsc->sessions[mds];
1496 struct inode *inode = NULL;
1480 1497
1481 if (!session) 1498 if (!session)
1482 continue; 1499 continue;
@@ -1489,29 +1506,29 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1489 list_entry(session->s_cap_flushing.next, 1506 list_entry(session->s_cap_flushing.next,
1490 struct ceph_inode_info, 1507 struct ceph_inode_info,
1491 i_flushing_item); 1508 i_flushing_item);
1492 struct inode *inode = &ci->vfs_inode;
1493 1509
1494 spin_lock(&ci->i_ceph_lock); 1510 if (!check_cap_flush(&ci->vfs_inode, want_flush_seq)) {
1495 if (ci->i_cap_flush_seq <= want_flush_seq) {
1496 dout("check_cap_flush still flushing %p " 1511 dout("check_cap_flush still flushing %p "
1497 "seq %lld <= %lld to mds%d\n", inode, 1512 "seq %lld <= %lld to mds%d\n",
1498 ci->i_cap_flush_seq, want_flush_seq, 1513 &ci->vfs_inode, ci->i_cap_flush_seq,
1499 session->s_mds); 1514 want_flush_seq, session->s_mds);
1500 ret = 0; 1515 inode = igrab(&ci->vfs_inode);
1501 } 1516 }
1502 spin_unlock(&ci->i_ceph_lock);
1503 } 1517 }
1504 mutex_unlock(&session->s_mutex); 1518 mutex_unlock(&session->s_mutex);
1505 ceph_put_mds_session(session); 1519 ceph_put_mds_session(session);
1506 1520
1507 if (!ret) 1521 if (inode) {
1508 return ret; 1522 wait_event(mdsc->cap_flushing_wq,
1523 check_cap_flush(inode, want_flush_seq));
1524 iput(inode);
1525 }
1526
1509 mutex_lock(&mdsc->mutex); 1527 mutex_lock(&mdsc->mutex);
1510 } 1528 }
1511 1529
1512 mutex_unlock(&mdsc->mutex); 1530 mutex_unlock(&mdsc->mutex);
1513 dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq); 1531 dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1514 return ret;
1515} 1532}
1516 1533
1517/* 1534/*
@@ -1923,7 +1940,11 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1923 head->num_releases = cpu_to_le16(releases); 1940 head->num_releases = cpu_to_le16(releases);
1924 1941
1925 /* time stamp */ 1942 /* time stamp */
1926 ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp)); 1943 {
1944 struct ceph_timespec ts;
1945 ceph_encode_timespec(&ts, &req->r_stamp);
1946 ceph_encode_copy(&p, &ts, sizeof(ts));
1947 }
1927 1948
1928 BUG_ON(p > end); 1949 BUG_ON(p > end);
1929 msg->front.iov_len = p - msg->front.iov_base; 1950 msg->front.iov_len = p - msg->front.iov_base;
@@ -2012,7 +2033,11 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
2012 2033
2013 /* time stamp */ 2034 /* time stamp */
2014 p = msg->front.iov_base + req->r_request_release_offset; 2035 p = msg->front.iov_base + req->r_request_release_offset;
2015 ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp)); 2036 {
2037 struct ceph_timespec ts;
2038 ceph_encode_timespec(&ts, &req->r_stamp);
2039 ceph_encode_copy(&p, &ts, sizeof(ts));
2040 }
2016 2041
2017 msg->front.iov_len = p - msg->front.iov_base; 2042 msg->front.iov_len = p - msg->front.iov_base;
2018 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2043 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
@@ -2159,6 +2184,8 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2159 p = rb_next(p); 2184 p = rb_next(p);
2160 if (req->r_got_unsafe) 2185 if (req->r_got_unsafe)
2161 continue; 2186 continue;
2187 if (req->r_attempts > 0)
2188 continue; /* only new requests */
2162 if (req->r_session && 2189 if (req->r_session &&
2163 req->r_session->s_mds == mds) { 2190 req->r_session->s_mds == mds) {
2164 dout(" kicking tid %llu\n", req->r_tid); 2191 dout(" kicking tid %llu\n", req->r_tid);
@@ -2286,6 +2313,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2286 struct ceph_mds_request *req; 2313 struct ceph_mds_request *req;
2287 struct ceph_mds_reply_head *head = msg->front.iov_base; 2314 struct ceph_mds_reply_head *head = msg->front.iov_base;
2288 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 2315 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
2316 struct ceph_snap_realm *realm;
2289 u64 tid; 2317 u64 tid;
2290 int err, result; 2318 int err, result;
2291 int mds = session->s_mds; 2319 int mds = session->s_mds;
@@ -2401,11 +2429,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2401 } 2429 }
2402 2430
2403 /* snap trace */ 2431 /* snap trace */
2432 realm = NULL;
2404 if (rinfo->snapblob_len) { 2433 if (rinfo->snapblob_len) {
2405 down_write(&mdsc->snap_rwsem); 2434 down_write(&mdsc->snap_rwsem);
2406 ceph_update_snap_trace(mdsc, rinfo->snapblob, 2435 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2407 rinfo->snapblob + rinfo->snapblob_len, 2436 rinfo->snapblob + rinfo->snapblob_len,
2408 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP); 2437 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2438 &realm);
2409 downgrade_write(&mdsc->snap_rwsem); 2439 downgrade_write(&mdsc->snap_rwsem);
2410 } else { 2440 } else {
2411 down_read(&mdsc->snap_rwsem); 2441 down_read(&mdsc->snap_rwsem);
@@ -2423,6 +2453,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2423 mutex_unlock(&req->r_fill_mutex); 2453 mutex_unlock(&req->r_fill_mutex);
2424 2454
2425 up_read(&mdsc->snap_rwsem); 2455 up_read(&mdsc->snap_rwsem);
2456 if (realm)
2457 ceph_put_snap_realm(mdsc, realm);
2426out_err: 2458out_err:
2427 mutex_lock(&mdsc->mutex); 2459 mutex_lock(&mdsc->mutex);
2428 if (!req->r_aborted) { 2460 if (!req->r_aborted) {
@@ -2487,6 +2519,7 @@ static void handle_forward(struct ceph_mds_client *mdsc,
2487 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 2519 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2488 BUG_ON(req->r_err); 2520 BUG_ON(req->r_err);
2489 BUG_ON(req->r_got_result); 2521 BUG_ON(req->r_got_result);
2522 req->r_attempts = 0;
2490 req->r_num_fwd = fwd_seq; 2523 req->r_num_fwd = fwd_seq;
2491 req->r_resend_mds = next_mds; 2524 req->r_resend_mds = next_mds;
2492 put_request_session(req); 2525 put_request_session(req);
@@ -2580,6 +2613,14 @@ static void handle_session(struct ceph_mds_session *session,
2580 send_flushmsg_ack(mdsc, session, seq); 2613 send_flushmsg_ack(mdsc, session, seq);
2581 break; 2614 break;
2582 2615
2616 case CEPH_SESSION_FORCE_RO:
2617 dout("force_session_readonly %p\n", session);
2618 spin_lock(&session->s_cap_lock);
2619 session->s_readonly = true;
2620 spin_unlock(&session->s_cap_lock);
2621 wake_up_session_caps(session, 0);
2622 break;
2623
2583 default: 2624 default:
2584 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 2625 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2585 WARN_ON(1); 2626 WARN_ON(1);
@@ -2610,6 +2651,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2610 struct ceph_mds_session *session) 2651 struct ceph_mds_session *session)
2611{ 2652{
2612 struct ceph_mds_request *req, *nreq; 2653 struct ceph_mds_request *req, *nreq;
2654 struct rb_node *p;
2613 int err; 2655 int err;
2614 2656
2615 dout("replay_unsafe_requests mds%d\n", session->s_mds); 2657 dout("replay_unsafe_requests mds%d\n", session->s_mds);
@@ -2622,6 +2664,28 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2622 ceph_con_send(&session->s_con, req->r_request); 2664 ceph_con_send(&session->s_con, req->r_request);
2623 } 2665 }
2624 } 2666 }
2667
2668 /*
2669 * also re-send old requests when MDS enters reconnect stage. So that MDS
2670 * can process completed request in clientreplay stage.
2671 */
2672 p = rb_first(&mdsc->request_tree);
2673 while (p) {
2674 req = rb_entry(p, struct ceph_mds_request, r_node);
2675 p = rb_next(p);
2676 if (req->r_got_unsafe)
2677 continue;
2678 if (req->r_attempts == 0)
2679 continue; /* only old requests */
2680 if (req->r_session &&
2681 req->r_session->s_mds == session->s_mds) {
2682 err = __prepare_send_request(mdsc, req, session->s_mds);
2683 if (!err) {
2684 ceph_msg_get(req->r_request);
2685 ceph_con_send(&session->s_con, req->r_request);
2686 }
2687 }
2688 }
2625 mutex_unlock(&mdsc->mutex); 2689 mutex_unlock(&mdsc->mutex);
2626} 2690}
2627 2691
@@ -2787,6 +2851,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2787 spin_unlock(&session->s_gen_ttl_lock); 2851 spin_unlock(&session->s_gen_ttl_lock);
2788 2852
2789 spin_lock(&session->s_cap_lock); 2853 spin_lock(&session->s_cap_lock);
2854 /* don't know if session is readonly */
2855 session->s_readonly = 0;
2790 /* 2856 /*
2791 * notify __ceph_remove_cap() that we are composing cap reconnect. 2857 * notify __ceph_remove_cap() that we are composing cap reconnect.
2792 * If a cap get released before being added to the cap reconnect, 2858 * If a cap get released before being added to the cap reconnect,
@@ -2933,9 +2999,6 @@ static void check_new_map(struct ceph_mds_client *mdsc,
2933 mutex_unlock(&s->s_mutex); 2999 mutex_unlock(&s->s_mutex);
2934 s->s_state = CEPH_MDS_SESSION_RESTARTING; 3000 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2935 } 3001 }
2936
2937 /* kick any requests waiting on the recovering mds */
2938 kick_requests(mdsc, i);
2939 } else if (oldstate == newstate) { 3002 } else if (oldstate == newstate) {
2940 continue; /* nothing new with this mds */ 3003 continue; /* nothing new with this mds */
2941 } 3004 }
@@ -3295,6 +3358,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
3295 init_waitqueue_head(&mdsc->session_close_wq); 3358 init_waitqueue_head(&mdsc->session_close_wq);
3296 INIT_LIST_HEAD(&mdsc->waiting_for_map); 3359 INIT_LIST_HEAD(&mdsc->waiting_for_map);
3297 mdsc->sessions = NULL; 3360 mdsc->sessions = NULL;
3361 atomic_set(&mdsc->num_sessions, 0);
3298 mdsc->max_sessions = 0; 3362 mdsc->max_sessions = 0;
3299 mdsc->stopping = 0; 3363 mdsc->stopping = 0;
3300 init_rwsem(&mdsc->snap_rwsem); 3364 init_rwsem(&mdsc->snap_rwsem);
@@ -3428,14 +3492,17 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3428 dout("sync\n"); 3492 dout("sync\n");
3429 mutex_lock(&mdsc->mutex); 3493 mutex_lock(&mdsc->mutex);
3430 want_tid = mdsc->last_tid; 3494 want_tid = mdsc->last_tid;
3431 want_flush = mdsc->cap_flush_seq;
3432 mutex_unlock(&mdsc->mutex); 3495 mutex_unlock(&mdsc->mutex);
3433 dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3434 3496
3435 ceph_flush_dirty_caps(mdsc); 3497 ceph_flush_dirty_caps(mdsc);
3498 spin_lock(&mdsc->cap_dirty_lock);
3499 want_flush = mdsc->cap_flush_seq;
3500 spin_unlock(&mdsc->cap_dirty_lock);
3501
3502 dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3436 3503
3437 wait_unsafe_requests(mdsc, want_tid); 3504 wait_unsafe_requests(mdsc, want_tid);
3438 wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush)); 3505 wait_caps_flush(mdsc, want_flush);
3439} 3506}
3440 3507
3441/* 3508/*
@@ -3443,17 +3510,9 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3443 */ 3510 */
3444static bool done_closing_sessions(struct ceph_mds_client *mdsc) 3511static bool done_closing_sessions(struct ceph_mds_client *mdsc)
3445{ 3512{
3446 int i, n = 0;
3447
3448 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) 3513 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3449 return true; 3514 return true;
3450 3515 return atomic_read(&mdsc->num_sessions) == 0;
3451 mutex_lock(&mdsc->mutex);
3452 for (i = 0; i < mdsc->max_sessions; i++)
3453 if (mdsc->sessions[i])
3454 n++;
3455 mutex_unlock(&mdsc->mutex);
3456 return n == 0;
3457} 3516}
3458 3517
3459/* 3518/*
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index e2817d00f7d9..1875b5d985c6 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -137,6 +137,7 @@ struct ceph_mds_session {
137 int s_nr_caps, s_trim_caps; 137 int s_nr_caps, s_trim_caps;
138 int s_num_cap_releases; 138 int s_num_cap_releases;
139 int s_cap_reconnect; 139 int s_cap_reconnect;
140 int s_readonly;
140 struct list_head s_cap_releases; /* waiting cap_release messages */ 141 struct list_head s_cap_releases; /* waiting cap_release messages */
141 struct list_head s_cap_releases_done; /* ready to send */ 142 struct list_head s_cap_releases_done; /* ready to send */
142 struct ceph_cap *s_cap_iterator; 143 struct ceph_cap *s_cap_iterator;
@@ -272,6 +273,7 @@ struct ceph_mds_client {
272 struct list_head waiting_for_map; 273 struct list_head waiting_for_map;
273 274
274 struct ceph_mds_session **sessions; /* NULL for mds if no session */ 275 struct ceph_mds_session **sessions; /* NULL for mds if no session */
276 atomic_t num_sessions;
275 int max_sessions; /* len of s_mds_sessions */ 277 int max_sessions; /* len of s_mds_sessions */
276 int stopping; /* true if shutting down */ 278 int stopping; /* true if shutting down */
277 279
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index ce35fbd4ba5d..a97e39f09ba6 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -70,13 +70,11 @@ void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
70 * safe. we do need to protect against concurrent empty list 70 * safe. we do need to protect against concurrent empty list
71 * additions, however. 71 * additions, however.
72 */ 72 */
73 if (atomic_read(&realm->nref) == 0) { 73 if (atomic_inc_return(&realm->nref) == 1) {
74 spin_lock(&mdsc->snap_empty_lock); 74 spin_lock(&mdsc->snap_empty_lock);
75 list_del_init(&realm->empty_item); 75 list_del_init(&realm->empty_item);
76 spin_unlock(&mdsc->snap_empty_lock); 76 spin_unlock(&mdsc->snap_empty_lock);
77 } 77 }
78
79 atomic_inc(&realm->nref);
80} 78}
81 79
82static void __insert_snap_realm(struct rb_root *root, 80static void __insert_snap_realm(struct rb_root *root,
@@ -116,7 +114,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
116 if (!realm) 114 if (!realm)
117 return ERR_PTR(-ENOMEM); 115 return ERR_PTR(-ENOMEM);
118 116
119 atomic_set(&realm->nref, 0); /* tree does not take a ref */ 117 atomic_set(&realm->nref, 1); /* for caller */
120 realm->ino = ino; 118 realm->ino = ino;
121 INIT_LIST_HEAD(&realm->children); 119 INIT_LIST_HEAD(&realm->children);
122 INIT_LIST_HEAD(&realm->child_item); 120 INIT_LIST_HEAD(&realm->child_item);
@@ -134,8 +132,8 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
134 * 132 *
135 * caller must hold snap_rwsem for write. 133 * caller must hold snap_rwsem for write.
136 */ 134 */
137struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc, 135static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
138 u64 ino) 136 u64 ino)
139{ 137{
140 struct rb_node *n = mdsc->snap_realms.rb_node; 138 struct rb_node *n = mdsc->snap_realms.rb_node;
141 struct ceph_snap_realm *r; 139 struct ceph_snap_realm *r;
@@ -154,6 +152,16 @@ struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
154 return NULL; 152 return NULL;
155} 153}
156 154
155struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
156 u64 ino)
157{
158 struct ceph_snap_realm *r;
159 r = __lookup_snap_realm(mdsc, ino);
160 if (r)
161 ceph_get_snap_realm(mdsc, r);
162 return r;
163}
164
157static void __put_snap_realm(struct ceph_mds_client *mdsc, 165static void __put_snap_realm(struct ceph_mds_client *mdsc,
158 struct ceph_snap_realm *realm); 166 struct ceph_snap_realm *realm);
159 167
@@ -273,7 +281,6 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
273 } 281 }
274 realm->parent_ino = parentino; 282 realm->parent_ino = parentino;
275 realm->parent = parent; 283 realm->parent = parent;
276 ceph_get_snap_realm(mdsc, parent);
277 list_add(&realm->child_item, &parent->children); 284 list_add(&realm->child_item, &parent->children);
278 return 1; 285 return 1;
279} 286}
@@ -631,12 +638,14 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
631 * Caller must hold snap_rwsem for write. 638 * Caller must hold snap_rwsem for write.
632 */ 639 */
633int ceph_update_snap_trace(struct ceph_mds_client *mdsc, 640int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
634 void *p, void *e, bool deletion) 641 void *p, void *e, bool deletion,
642 struct ceph_snap_realm **realm_ret)
635{ 643{
636 struct ceph_mds_snap_realm *ri; /* encoded */ 644 struct ceph_mds_snap_realm *ri; /* encoded */
637 __le64 *snaps; /* encoded */ 645 __le64 *snaps; /* encoded */
638 __le64 *prior_parent_snaps; /* encoded */ 646 __le64 *prior_parent_snaps; /* encoded */
639 struct ceph_snap_realm *realm; 647 struct ceph_snap_realm *realm = NULL;
648 struct ceph_snap_realm *first_realm = NULL;
640 int invalidate = 0; 649 int invalidate = 0;
641 int err = -ENOMEM; 650 int err = -ENOMEM;
642 LIST_HEAD(dirty_realms); 651 LIST_HEAD(dirty_realms);
@@ -704,13 +713,18 @@ more:
704 dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino, 713 dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
705 realm, invalidate, p, e); 714 realm, invalidate, p, e);
706 715
707 if (p < e)
708 goto more;
709
710 /* invalidate when we reach the _end_ (root) of the trace */ 716 /* invalidate when we reach the _end_ (root) of the trace */
711 if (invalidate) 717 if (invalidate && p >= e)
712 rebuild_snap_realms(realm); 718 rebuild_snap_realms(realm);
713 719
720 if (!first_realm)
721 first_realm = realm;
722 else
723 ceph_put_snap_realm(mdsc, realm);
724
725 if (p < e)
726 goto more;
727
714 /* 728 /*
715 * queue cap snaps _after_ we've built the new snap contexts, 729 * queue cap snaps _after_ we've built the new snap contexts,
716 * so that i_head_snapc can be set appropriately. 730 * so that i_head_snapc can be set appropriately.
@@ -721,12 +735,21 @@ more:
721 queue_realm_cap_snaps(realm); 735 queue_realm_cap_snaps(realm);
722 } 736 }
723 737
738 if (realm_ret)
739 *realm_ret = first_realm;
740 else
741 ceph_put_snap_realm(mdsc, first_realm);
742
724 __cleanup_empty_realms(mdsc); 743 __cleanup_empty_realms(mdsc);
725 return 0; 744 return 0;
726 745
727bad: 746bad:
728 err = -EINVAL; 747 err = -EINVAL;
729fail: 748fail:
749 if (realm && !IS_ERR(realm))
750 ceph_put_snap_realm(mdsc, realm);
751 if (first_realm)
752 ceph_put_snap_realm(mdsc, first_realm);
730 pr_err("update_snap_trace error %d\n", err); 753 pr_err("update_snap_trace error %d\n", err);
731 return err; 754 return err;
732} 755}
@@ -844,7 +867,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
844 if (IS_ERR(realm)) 867 if (IS_ERR(realm))
845 goto out; 868 goto out;
846 } 869 }
847 ceph_get_snap_realm(mdsc, realm);
848 870
849 dout("splitting snap_realm %llx %p\n", realm->ino, realm); 871 dout("splitting snap_realm %llx %p\n", realm->ino, realm);
850 for (i = 0; i < num_split_inos; i++) { 872 for (i = 0; i < num_split_inos; i++) {
@@ -905,7 +927,7 @@ skip_inode:
905 /* we may have taken some of the old realm's children. */ 927 /* we may have taken some of the old realm's children. */
906 for (i = 0; i < num_split_realms; i++) { 928 for (i = 0; i < num_split_realms; i++) {
907 struct ceph_snap_realm *child = 929 struct ceph_snap_realm *child =
908 ceph_lookup_snap_realm(mdsc, 930 __lookup_snap_realm(mdsc,
909 le64_to_cpu(split_realms[i])); 931 le64_to_cpu(split_realms[i]));
910 if (!child) 932 if (!child)
911 continue; 933 continue;
@@ -918,7 +940,7 @@ skip_inode:
918 * snap, we can avoid queueing cap_snaps. 940 * snap, we can avoid queueing cap_snaps.
919 */ 941 */
920 ceph_update_snap_trace(mdsc, p, e, 942 ceph_update_snap_trace(mdsc, p, e,
921 op == CEPH_SNAP_OP_DESTROY); 943 op == CEPH_SNAP_OP_DESTROY, NULL);
922 944
923 if (op == CEPH_SNAP_OP_SPLIT) 945 if (op == CEPH_SNAP_OP_SPLIT)
924 /* we took a reference when we created the realm, above */ 946 /* we took a reference when we created the realm, above */
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 5ae62587a71d..a63997b8bcff 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -414,6 +414,10 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
414 seq_puts(m, ",noshare"); 414 seq_puts(m, ",noshare");
415 if (opt->flags & CEPH_OPT_NOCRC) 415 if (opt->flags & CEPH_OPT_NOCRC)
416 seq_puts(m, ",nocrc"); 416 seq_puts(m, ",nocrc");
417 if (opt->flags & CEPH_OPT_NOMSGAUTH)
418 seq_puts(m, ",nocephx_require_signatures");
419 if ((opt->flags & CEPH_OPT_TCP_NODELAY) == 0)
420 seq_puts(m, ",notcp_nodelay");
417 421
418 if (opt->name) 422 if (opt->name)
419 seq_printf(m, ",name=%s", opt->name); 423 seq_printf(m, ",name=%s", opt->name);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index e1aa32d0759d..04c8124ed30e 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -693,7 +693,8 @@ extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
693extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc, 693extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
694 struct ceph_snap_realm *realm); 694 struct ceph_snap_realm *realm);
695extern int ceph_update_snap_trace(struct ceph_mds_client *m, 695extern int ceph_update_snap_trace(struct ceph_mds_client *m,
696 void *p, void *e, bool deletion); 696 void *p, void *e, bool deletion,
697 struct ceph_snap_realm **realm_ret);
697extern void ceph_handle_snap(struct ceph_mds_client *mdsc, 698extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
698 struct ceph_mds_session *session, 699 struct ceph_mds_session *session,
699 struct ceph_msg *msg); 700 struct ceph_msg *msg);
@@ -892,7 +893,9 @@ extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
892int ceph_uninline_data(struct file *filp, struct page *locked_page); 893int ceph_uninline_data(struct file *filp, struct page *locked_page);
893/* dir.c */ 894/* dir.c */
894extern const struct file_operations ceph_dir_fops; 895extern const struct file_operations ceph_dir_fops;
896extern const struct file_operations ceph_snapdir_fops;
895extern const struct inode_operations ceph_dir_iops; 897extern const struct inode_operations ceph_dir_iops;
898extern const struct inode_operations ceph_snapdir_iops;
896extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops, 899extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
897 ceph_snapdir_dentry_ops; 900 ceph_snapdir_dentry_ops;
898 901
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index c0dadaac26e3..31eb03d0c766 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -158,17 +158,6 @@ enum {
158}; 158};
159 159
160 160
161/* pool operations */
162enum {
163 POOL_OP_CREATE = 0x01,
164 POOL_OP_DELETE = 0x02,
165 POOL_OP_AUID_CHANGE = 0x03,
166 POOL_OP_CREATE_SNAP = 0x11,
167 POOL_OP_DELETE_SNAP = 0x12,
168 POOL_OP_CREATE_UNMANAGED_SNAP = 0x21,
169 POOL_OP_DELETE_UNMANAGED_SNAP = 0x22,
170};
171
172struct ceph_mon_request_header { 161struct ceph_mon_request_header {
173 __le64 have_version; 162 __le64 have_version;
174 __le16 session_mon; 163 __le16 session_mon;
@@ -191,31 +180,6 @@ struct ceph_mon_statfs_reply {
191 struct ceph_statfs st; 180 struct ceph_statfs st;
192} __attribute__ ((packed)); 181} __attribute__ ((packed));
193 182
194const char *ceph_pool_op_name(int op);
195
196struct ceph_mon_poolop {
197 struct ceph_mon_request_header monhdr;
198 struct ceph_fsid fsid;
199 __le32 pool;
200 __le32 op;
201 __le64 auid;
202 __le64 snapid;
203 __le32 name_len;
204} __attribute__ ((packed));
205
206struct ceph_mon_poolop_reply {
207 struct ceph_mon_request_header monhdr;
208 struct ceph_fsid fsid;
209 __le32 reply_code;
210 __le32 epoch;
211 char has_data;
212 char data[0];
213} __attribute__ ((packed));
214
215struct ceph_mon_unmanaged_snap {
216 __le64 snapid;
217} __attribute__ ((packed));
218
219struct ceph_osd_getmap { 183struct ceph_osd_getmap {
220 struct ceph_mon_request_header monhdr; 184 struct ceph_mon_request_header monhdr;
221 struct ceph_fsid fsid; 185 struct ceph_fsid fsid;
@@ -307,6 +271,7 @@ enum {
307 CEPH_SESSION_RECALL_STATE, 271 CEPH_SESSION_RECALL_STATE,
308 CEPH_SESSION_FLUSHMSG, 272 CEPH_SESSION_FLUSHMSG,
309 CEPH_SESSION_FLUSHMSG_ACK, 273 CEPH_SESSION_FLUSHMSG_ACK,
274 CEPH_SESSION_FORCE_RO,
310}; 275};
311 276
312extern const char *ceph_session_op_name(int op); 277extern const char *ceph_session_op_name(int op);
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 8b11a79ca1cb..16fff9608848 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -30,8 +30,9 @@
30#define CEPH_OPT_MYIP (1<<2) /* specified my ip */ 30#define CEPH_OPT_MYIP (1<<2) /* specified my ip */
31#define CEPH_OPT_NOCRC (1<<3) /* no data crc on writes */ 31#define CEPH_OPT_NOCRC (1<<3) /* no data crc on writes */
32#define CEPH_OPT_NOMSGAUTH (1<<4) /* not require cephx message signature */ 32#define CEPH_OPT_NOMSGAUTH (1<<4) /* not require cephx message signature */
33#define CEPH_OPT_TCP_NODELAY (1<<5) /* TCP_NODELAY on TCP sockets */
33 34
34#define CEPH_OPT_DEFAULT (0) 35#define CEPH_OPT_DEFAULT (CEPH_OPT_TCP_NODELAY)
35 36
36#define ceph_set_opt(client, opt) \ 37#define ceph_set_opt(client, opt) \
37 (client)->options->flags |= CEPH_OPT_##opt; 38 (client)->options->flags |= CEPH_OPT_##opt;
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index d9d396c16503..e15499422fdc 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -57,6 +57,7 @@ struct ceph_messenger {
57 57
58 atomic_t stopping; 58 atomic_t stopping;
59 bool nocrc; 59 bool nocrc;
60 bool tcp_nodelay;
60 61
61 /* 62 /*
62 * the global_seq counts connections i (attempt to) initiate 63 * the global_seq counts connections i (attempt to) initiate
@@ -264,7 +265,8 @@ extern void ceph_messenger_init(struct ceph_messenger *msgr,
264 struct ceph_entity_addr *myaddr, 265 struct ceph_entity_addr *myaddr,
265 u64 supported_features, 266 u64 supported_features,
266 u64 required_features, 267 u64 required_features,
267 bool nocrc); 268 bool nocrc,
269 bool tcp_nodelay);
268 270
269extern void ceph_con_init(struct ceph_connection *con, void *private, 271extern void ceph_con_init(struct ceph_connection *con, void *private,
270 const struct ceph_connection_operations *ops, 272 const struct ceph_connection_operations *ops,
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index deb47e45ac7c..81810dc21f06 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -40,7 +40,7 @@ struct ceph_mon_request {
40}; 40};
41 41
42/* 42/*
43 * ceph_mon_generic_request is being used for the statfs, poolop and 43 * ceph_mon_generic_request is being used for the statfs and
44 * mon_get_version requests which are being done a bit differently 44 * mon_get_version requests which are being done a bit differently
45 * because we need to get data back to the caller 45 * because we need to get data back to the caller
46 */ 46 */
@@ -50,7 +50,6 @@ struct ceph_mon_generic_request {
50 struct rb_node node; 50 struct rb_node node;
51 int result; 51 int result;
52 void *buf; 52 void *buf;
53 int buf_len;
54 struct completion completion; 53 struct completion completion;
55 struct ceph_msg *request; /* original request */ 54 struct ceph_msg *request; /* original request */
56 struct ceph_msg *reply; /* and reply */ 55 struct ceph_msg *reply; /* and reply */
@@ -117,10 +116,4 @@ extern int ceph_monc_open_session(struct ceph_mon_client *monc);
117 116
118extern int ceph_monc_validate_auth(struct ceph_mon_client *monc); 117extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
119 118
120extern int ceph_monc_create_snapid(struct ceph_mon_client *monc,
121 u32 pool, u64 *snapid);
122
123extern int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
124 u32 pool, u64 snapid);
125
126#endif 119#endif
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 5d5ab67f516d..ec565508e904 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -239,6 +239,8 @@ enum {
239 Opt_nocrc, 239 Opt_nocrc,
240 Opt_cephx_require_signatures, 240 Opt_cephx_require_signatures,
241 Opt_nocephx_require_signatures, 241 Opt_nocephx_require_signatures,
242 Opt_tcp_nodelay,
243 Opt_notcp_nodelay,
242}; 244};
243 245
244static match_table_t opt_tokens = { 246static match_table_t opt_tokens = {
@@ -259,6 +261,8 @@ static match_table_t opt_tokens = {
259 {Opt_nocrc, "nocrc"}, 261 {Opt_nocrc, "nocrc"},
260 {Opt_cephx_require_signatures, "cephx_require_signatures"}, 262 {Opt_cephx_require_signatures, "cephx_require_signatures"},
261 {Opt_nocephx_require_signatures, "nocephx_require_signatures"}, 263 {Opt_nocephx_require_signatures, "nocephx_require_signatures"},
264 {Opt_tcp_nodelay, "tcp_nodelay"},
265 {Opt_notcp_nodelay, "notcp_nodelay"},
262 {-1, NULL} 266 {-1, NULL}
263}; 267};
264 268
@@ -457,6 +461,7 @@ ceph_parse_options(char *options, const char *dev_name,
457 case Opt_nocrc: 461 case Opt_nocrc:
458 opt->flags |= CEPH_OPT_NOCRC; 462 opt->flags |= CEPH_OPT_NOCRC;
459 break; 463 break;
464
460 case Opt_cephx_require_signatures: 465 case Opt_cephx_require_signatures:
461 opt->flags &= ~CEPH_OPT_NOMSGAUTH; 466 opt->flags &= ~CEPH_OPT_NOMSGAUTH;
462 break; 467 break;
@@ -464,6 +469,13 @@ ceph_parse_options(char *options, const char *dev_name,
464 opt->flags |= CEPH_OPT_NOMSGAUTH; 469 opt->flags |= CEPH_OPT_NOMSGAUTH;
465 break; 470 break;
466 471
472 case Opt_tcp_nodelay:
473 opt->flags |= CEPH_OPT_TCP_NODELAY;
474 break;
475 case Opt_notcp_nodelay:
476 opt->flags &= ~CEPH_OPT_TCP_NODELAY;
477 break;
478
467 default: 479 default:
468 BUG_ON(token); 480 BUG_ON(token);
469 } 481 }
@@ -518,10 +530,12 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
518 /* msgr */ 530 /* msgr */
519 if (ceph_test_opt(client, MYIP)) 531 if (ceph_test_opt(client, MYIP))
520 myaddr = &client->options->my_addr; 532 myaddr = &client->options->my_addr;
533
521 ceph_messenger_init(&client->msgr, myaddr, 534 ceph_messenger_init(&client->msgr, myaddr,
522 client->supported_features, 535 client->supported_features,
523 client->required_features, 536 client->required_features,
524 ceph_test_opt(client, NOCRC)); 537 ceph_test_opt(client, NOCRC),
538 ceph_test_opt(client, TCP_NODELAY));
525 539
526 /* subsystems */ 540 /* subsystems */
527 err = ceph_monc_init(&client->monc, client); 541 err = ceph_monc_init(&client->monc, client);
diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c
index 30560202f57b..139a9cb19b0c 100644
--- a/net/ceph/ceph_strings.c
+++ b/net/ceph/ceph_strings.c
@@ -42,17 +42,3 @@ const char *ceph_osd_state_name(int s)
42 return "???"; 42 return "???";
43 } 43 }
44} 44}
45
46const char *ceph_pool_op_name(int op)
47{
48 switch (op) {
49 case POOL_OP_CREATE: return "create";
50 case POOL_OP_DELETE: return "delete";
51 case POOL_OP_AUID_CHANGE: return "auid change";
52 case POOL_OP_CREATE_SNAP: return "create snap";
53 case POOL_OP_DELETE_SNAP: return "delete snap";
54 case POOL_OP_CREATE_UNMANAGED_SNAP: return "create unmanaged snap";
55 case POOL_OP_DELETE_UNMANAGED_SNAP: return "delete unmanaged snap";
56 }
57 return "???";
58}
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index d2d525529f87..14d9995097cc 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -127,8 +127,6 @@ static int monc_show(struct seq_file *s, void *p)
127 op = le16_to_cpu(req->request->hdr.type); 127 op = le16_to_cpu(req->request->hdr.type);
128 if (op == CEPH_MSG_STATFS) 128 if (op == CEPH_MSG_STATFS)
129 seq_printf(s, "%llu statfs\n", req->tid); 129 seq_printf(s, "%llu statfs\n", req->tid);
130 else if (op == CEPH_MSG_POOLOP)
131 seq_printf(s, "%llu poolop\n", req->tid);
132 else if (op == CEPH_MSG_MON_GET_VERSION) 130 else if (op == CEPH_MSG_MON_GET_VERSION)
133 seq_printf(s, "%llu mon_get_version", req->tid); 131 seq_printf(s, "%llu mon_get_version", req->tid);
134 else 132 else
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 33a2f201e460..6b3f54ed65ba 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -510,6 +510,16 @@ static int ceph_tcp_connect(struct ceph_connection *con)
510 return ret; 510 return ret;
511 } 511 }
512 512
513 if (con->msgr->tcp_nodelay) {
514 int optval = 1;
515
516 ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY,
517 (char *)&optval, sizeof(optval));
518 if (ret)
519 pr_err("kernel_setsockopt(TCP_NODELAY) failed: %d",
520 ret);
521 }
522
513 sk_set_memalloc(sock->sk); 523 sk_set_memalloc(sock->sk);
514 524
515 con->sock = sock; 525 con->sock = sock;
@@ -2922,7 +2932,8 @@ void ceph_messenger_init(struct ceph_messenger *msgr,
2922 struct ceph_entity_addr *myaddr, 2932 struct ceph_entity_addr *myaddr,
2923 u64 supported_features, 2933 u64 supported_features,
2924 u64 required_features, 2934 u64 required_features,
2925 bool nocrc) 2935 bool nocrc,
2936 bool tcp_nodelay)
2926{ 2937{
2927 msgr->supported_features = supported_features; 2938 msgr->supported_features = supported_features;
2928 msgr->required_features = required_features; 2939 msgr->required_features = required_features;
@@ -2937,6 +2948,7 @@ void ceph_messenger_init(struct ceph_messenger *msgr,
2937 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); 2948 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
2938 encode_my_addr(msgr); 2949 encode_my_addr(msgr);
2939 msgr->nocrc = nocrc; 2950 msgr->nocrc = nocrc;
2951 msgr->tcp_nodelay = tcp_nodelay;
2940 2952
2941 atomic_set(&msgr->stopping, 0); 2953 atomic_set(&msgr->stopping, 0);
2942 2954
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index f2148e22b148..2b3cf05e87b0 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -410,7 +410,7 @@ out_unlocked:
410} 410}
411 411
412/* 412/*
413 * generic requests (e.g., statfs, poolop) 413 * generic requests (currently statfs, mon_get_version)
414 */ 414 */
415static struct ceph_mon_generic_request *__lookup_generic_req( 415static struct ceph_mon_generic_request *__lookup_generic_req(
416 struct ceph_mon_client *monc, u64 tid) 416 struct ceph_mon_client *monc, u64 tid)
@@ -569,7 +569,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
569 return; 569 return;
570 570
571bad: 571bad:
572 pr_err("corrupt generic reply, tid %llu\n", tid); 572 pr_err("corrupt statfs reply, tid %llu\n", tid);
573 ceph_msg_dump(msg); 573 ceph_msg_dump(msg);
574} 574}
575 575
@@ -588,7 +588,6 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
588 588
589 kref_init(&req->kref); 589 kref_init(&req->kref);
590 req->buf = buf; 590 req->buf = buf;
591 req->buf_len = sizeof(*buf);
592 init_completion(&req->completion); 591 init_completion(&req->completion);
593 592
594 err = -ENOMEM; 593 err = -ENOMEM;
@@ -611,7 +610,7 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
611 err = do_generic_request(monc, req); 610 err = do_generic_request(monc, req);
612 611
613out: 612out:
614 kref_put(&req->kref, release_generic_request); 613 put_generic_request(req);
615 return err; 614 return err;
616} 615}
617EXPORT_SYMBOL(ceph_monc_do_statfs); 616EXPORT_SYMBOL(ceph_monc_do_statfs);
@@ -647,7 +646,7 @@ static void handle_get_version_reply(struct ceph_mon_client *monc,
647 646
648 return; 647 return;
649bad: 648bad:
650 pr_err("corrupt mon_get_version reply\n"); 649 pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
651 ceph_msg_dump(msg); 650 ceph_msg_dump(msg);
652} 651}
653 652
@@ -670,7 +669,6 @@ int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
670 669
671 kref_init(&req->kref); 670 kref_init(&req->kref);
672 req->buf = newest; 671 req->buf = newest;
673 req->buf_len = sizeof(*newest);
674 init_completion(&req->completion); 672 init_completion(&req->completion);
675 673
676 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, 674 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
@@ -701,134 +699,12 @@ int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
701 699
702 mutex_unlock(&monc->mutex); 700 mutex_unlock(&monc->mutex);
703out: 701out:
704 kref_put(&req->kref, release_generic_request); 702 put_generic_request(req);
705 return err; 703 return err;
706} 704}
707EXPORT_SYMBOL(ceph_monc_do_get_version); 705EXPORT_SYMBOL(ceph_monc_do_get_version);
708 706
709/* 707/*
710 * pool ops
711 */
712static int get_poolop_reply_buf(const char *src, size_t src_len,
713 char *dst, size_t dst_len)
714{
715 u32 buf_len;
716
717 if (src_len != sizeof(u32) + dst_len)
718 return -EINVAL;
719
720 buf_len = le32_to_cpu(*(__le32 *)src);
721 if (buf_len != dst_len)
722 return -EINVAL;
723
724 memcpy(dst, src + sizeof(u32), dst_len);
725 return 0;
726}
727
728static void handle_poolop_reply(struct ceph_mon_client *monc,
729 struct ceph_msg *msg)
730{
731 struct ceph_mon_generic_request *req;
732 struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
733 u64 tid = le64_to_cpu(msg->hdr.tid);
734
735 if (msg->front.iov_len < sizeof(*reply))
736 goto bad;
737 dout("handle_poolop_reply %p tid %llu\n", msg, tid);
738
739 mutex_lock(&monc->mutex);
740 req = __lookup_generic_req(monc, tid);
741 if (req) {
742 if (req->buf_len &&
743 get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
744 msg->front.iov_len - sizeof(*reply),
745 req->buf, req->buf_len) < 0) {
746 mutex_unlock(&monc->mutex);
747 goto bad;
748 }
749 req->result = le32_to_cpu(reply->reply_code);
750 get_generic_request(req);
751 }
752 mutex_unlock(&monc->mutex);
753 if (req) {
754 complete(&req->completion);
755 put_generic_request(req);
756 }
757 return;
758
759bad:
760 pr_err("corrupt generic reply, tid %llu\n", tid);
761 ceph_msg_dump(msg);
762}
763
764/*
765 * Do a synchronous pool op.
766 */
767static int do_poolop(struct ceph_mon_client *monc, u32 op,
768 u32 pool, u64 snapid,
769 char *buf, int len)
770{
771 struct ceph_mon_generic_request *req;
772 struct ceph_mon_poolop *h;
773 int err;
774
775 req = kzalloc(sizeof(*req), GFP_NOFS);
776 if (!req)
777 return -ENOMEM;
778
779 kref_init(&req->kref);
780 req->buf = buf;
781 req->buf_len = len;
782 init_completion(&req->completion);
783
784 err = -ENOMEM;
785 req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS,
786 true);
787 if (!req->request)
788 goto out;
789 req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS,
790 true);
791 if (!req->reply)
792 goto out;
793
794 /* fill out request */
795 req->request->hdr.version = cpu_to_le16(2);
796 h = req->request->front.iov_base;
797 h->monhdr.have_version = 0;
798 h->monhdr.session_mon = cpu_to_le16(-1);
799 h->monhdr.session_mon_tid = 0;
800 h->fsid = monc->monmap->fsid;
801 h->pool = cpu_to_le32(pool);
802 h->op = cpu_to_le32(op);
803 h->auid = 0;
804 h->snapid = cpu_to_le64(snapid);
805 h->name_len = 0;
806
807 err = do_generic_request(monc, req);
808
809out:
810 kref_put(&req->kref, release_generic_request);
811 return err;
812}
813
814int ceph_monc_create_snapid(struct ceph_mon_client *monc,
815 u32 pool, u64 *snapid)
816{
817 return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
818 pool, 0, (char *)snapid, sizeof(*snapid));
819
820}
821EXPORT_SYMBOL(ceph_monc_create_snapid);
822
823int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
824 u32 pool, u64 snapid)
825{
826 return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
827 pool, snapid, NULL, 0);
828
829}
830
831/*
832 * Resend pending generic requests. 708 * Resend pending generic requests.
833 */ 709 */
834static void __resend_generic_request(struct ceph_mon_client *monc) 710static void __resend_generic_request(struct ceph_mon_client *monc)
@@ -1112,10 +988,6 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1112 handle_get_version_reply(monc, msg); 988 handle_get_version_reply(monc, msg);
1113 break; 989 break;
1114 990
1115 case CEPH_MSG_POOLOP_REPLY:
1116 handle_poolop_reply(monc, msg);
1117 break;
1118
1119 case CEPH_MSG_MON_MAP: 991 case CEPH_MSG_MON_MAP:
1120 ceph_monc_handle_map(monc, msg); 992 ceph_monc_handle_map(monc, msg);
1121 break; 993 break;
@@ -1154,7 +1026,6 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1154 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1026 case CEPH_MSG_MON_SUBSCRIBE_ACK:
1155 m = ceph_msg_get(monc->m_subscribe_ack); 1027 m = ceph_msg_get(monc->m_subscribe_ack);
1156 break; 1028 break;
1157 case CEPH_MSG_POOLOP_REPLY:
1158 case CEPH_MSG_STATFS_REPLY: 1029 case CEPH_MSG_STATFS_REPLY:
1159 return get_generic_reply(con, hdr, skip); 1030 return get_generic_reply(con, hdr, skip);
1160 case CEPH_MSG_AUTH_REPLY: 1031 case CEPH_MSG_AUTH_REPLY:
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 53299c7b0ca4..41a4abc7e98e 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1035,10 +1035,11 @@ static void put_osd(struct ceph_osd *osd)
1035{ 1035{
1036 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 1036 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
1037 atomic_read(&osd->o_ref) - 1); 1037 atomic_read(&osd->o_ref) - 1);
1038 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { 1038 if (atomic_dec_and_test(&osd->o_ref)) {
1039 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 1039 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
1040 1040
1041 ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer); 1041 if (osd->o_auth.authorizer)
1042 ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer);
1042 kfree(osd); 1043 kfree(osd);
1043 } 1044 }
1044} 1045}
@@ -1048,14 +1049,24 @@ static void put_osd(struct ceph_osd *osd)
1048 */ 1049 */
1049static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 1050static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1050{ 1051{
1051 dout("__remove_osd %p\n", osd); 1052 dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
1052 WARN_ON(!list_empty(&osd->o_requests)); 1053 WARN_ON(!list_empty(&osd->o_requests));
1053 WARN_ON(!list_empty(&osd->o_linger_requests)); 1054 WARN_ON(!list_empty(&osd->o_linger_requests));
1054 1055
1055 rb_erase(&osd->o_node, &osdc->osds);
1056 list_del_init(&osd->o_osd_lru); 1056 list_del_init(&osd->o_osd_lru);
1057 ceph_con_close(&osd->o_con); 1057 rb_erase(&osd->o_node, &osdc->osds);
1058 put_osd(osd); 1058 RB_CLEAR_NODE(&osd->o_node);
1059}
1060
1061static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1062{
1063 dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
1064
1065 if (!RB_EMPTY_NODE(&osd->o_node)) {
1066 ceph_con_close(&osd->o_con);
1067 __remove_osd(osdc, osd);
1068 put_osd(osd);
1069 }
1059} 1070}
1060 1071
1061static void remove_all_osds(struct ceph_osd_client *osdc) 1072static void remove_all_osds(struct ceph_osd_client *osdc)
@@ -1065,7 +1076,7 @@ static void remove_all_osds(struct ceph_osd_client *osdc)
1065 while (!RB_EMPTY_ROOT(&osdc->osds)) { 1076 while (!RB_EMPTY_ROOT(&osdc->osds)) {
1066 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 1077 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
1067 struct ceph_osd, o_node); 1078 struct ceph_osd, o_node);
1068 __remove_osd(osdc, osd); 1079 remove_osd(osdc, osd);
1069 } 1080 }
1070 mutex_unlock(&osdc->request_mutex); 1081 mutex_unlock(&osdc->request_mutex);
1071} 1082}
@@ -1106,7 +1117,7 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
1106 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 1117 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
1107 if (time_before(jiffies, osd->lru_ttl)) 1118 if (time_before(jiffies, osd->lru_ttl))
1108 break; 1119 break;
1109 __remove_osd(osdc, osd); 1120 remove_osd(osdc, osd);
1110 } 1121 }
1111 mutex_unlock(&osdc->request_mutex); 1122 mutex_unlock(&osdc->request_mutex);
1112} 1123}
@@ -1121,8 +1132,7 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1121 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 1132 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
1122 if (list_empty(&osd->o_requests) && 1133 if (list_empty(&osd->o_requests) &&
1123 list_empty(&osd->o_linger_requests)) { 1134 list_empty(&osd->o_linger_requests)) {
1124 __remove_osd(osdc, osd); 1135 remove_osd(osdc, osd);
1125
1126 return -ENODEV; 1136 return -ENODEV;
1127 } 1137 }
1128 1138
@@ -1926,6 +1936,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
1926{ 1936{
1927 struct rb_node *p, *n; 1937 struct rb_node *p, *n;
1928 1938
1939 dout("%s %p\n", __func__, osdc);
1929 for (p = rb_first(&osdc->osds); p; p = n) { 1940 for (p = rb_first(&osdc->osds); p; p = n) {
1930 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); 1941 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1931 1942