aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2015-02-19 17:14:42 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2015-02-19 17:14:42 -0500
commit4533f6e27a366ecc3da4876074ebfe0cc0ea4f0f (patch)
tree8b6f1aeeda991e6a1ce98702d7cc35d2d2a444b1
parent89d3fa45b4add00cd0056361a2498e978cb1e119 (diff)
parent0f5417cea6cfeafd5cdec4223df63ca79918fdea (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph changes from Sage Weil: "On the RBD side, there is a conversion to blk-mq from Christoph, several long-standing bug fixes from Ilya, and some cleanup from Rickard Strandqvist. On the CephFS side there is a long list of fixes from Zheng, including improved session handling, a few IO path fixes, some dcache management correctness fixes, and several blocking while !TASK_RUNNING fixes. The core code gets a few cleanups and Chaitanya has added support for TCP_NODELAY (which has been used on the server side for ages but we somehow missed on the kernel client). There is also an update to MAINTAINERS to fix up some email addresses and reflect that Ilya and Zheng are doing most of the maintenance for RBD and CephFS these days. Do not be surprised to see a pull request come from one of them in the future if I am unavailable for some reason" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (27 commits) MAINTAINERS: update Ceph and RBD maintainers libceph: kfree() in put_osd() shouldn't depend on authorizer libceph: fix double __remove_osd() problem rbd: convert to blk-mq ceph: return error for traceless reply race ceph: fix dentry leaks ceph: re-send requests when MDS enters reconnecting stage ceph: show nocephx_require_signatures and notcp_nodelay options libceph: tcp_nodelay support rbd: do not treat standalone as flatten ceph: fix atomic_open snapdir ceph: properly mark empty directory as complete client: include kernel version in client metadata ceph: provide seperate {inode,file}_operations for snapdir ceph: fix request time stamp encoding ceph: fix reading inline data when i_size > PAGE_SIZE ceph: avoid block operation when !TASK_RUNNING (ceph_mdsc_close_sessions) ceph: avoid block operation when !TASK_RUNNING (ceph_get_caps) ceph: avoid block operation when !TASK_RUNNING (ceph_mdsc_sync) rbd: fix error paths in rbd_dev_refresh() ...
-rw-r--r--MAINTAINERS7
-rw-r--r--drivers/block/rbd.c193
-rw-r--r--fs/ceph/acl.c14
-rw-r--r--fs/ceph/addr.c19
-rw-r--r--fs/ceph/caps.c127
-rw-r--r--fs/ceph/dir.c33
-rw-r--r--fs/ceph/file.c37
-rw-r--r--fs/ceph/inode.c41
-rw-r--r--fs/ceph/mds_client.c127
-rw-r--r--fs/ceph/mds_client.h2
-rw-r--r--fs/ceph/snap.c54
-rw-r--r--fs/ceph/super.c4
-rw-r--r--fs/ceph/super.h5
-rw-r--r--include/linux/ceph/ceph_fs.h37
-rw-r--r--include/linux/ceph/libceph.h3
-rw-r--r--include/linux/ceph/messenger.h4
-rw-r--r--include/linux/ceph/mon_client.h9
-rw-r--r--net/ceph/ceph_common.c16
-rw-r--r--net/ceph/ceph_strings.c14
-rw-r--r--net/ceph/debugfs.c2
-rw-r--r--net/ceph/messenger.c14
-rw-r--r--net/ceph/mon_client.c139
-rw-r--r--net/ceph/osd_client.c31
23 files changed, 444 insertions, 488 deletions
diff --git a/MAINTAINERS b/MAINTAINERS
index 1921ed58d1a0..7cfcee4e2bea 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -2433,7 +2433,8 @@ F: arch/powerpc/oprofile/*cell*
2433F: arch/powerpc/platforms/cell/ 2433F: arch/powerpc/platforms/cell/
2434 2434
2435CEPH DISTRIBUTED FILE SYSTEM CLIENT 2435CEPH DISTRIBUTED FILE SYSTEM CLIENT
2436M: Sage Weil <sage@inktank.com> 2436M: Yan, Zheng <zyan@redhat.com>
2437M: Sage Weil <sage@redhat.com>
2437L: ceph-devel@vger.kernel.org 2438L: ceph-devel@vger.kernel.org
2438W: http://ceph.com/ 2439W: http://ceph.com/
2439T: git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git 2440T: git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git
@@ -7998,8 +7999,8 @@ S: Supported
7998F: drivers/net/wireless/ath/wcn36xx/ 7999F: drivers/net/wireless/ath/wcn36xx/
7999 8000
8000RADOS BLOCK DEVICE (RBD) 8001RADOS BLOCK DEVICE (RBD)
8001M: Yehuda Sadeh <yehuda@inktank.com> 8002M: Ilya Dryomov <idryomov@gmail.com>
8002M: Sage Weil <sage@inktank.com> 8003M: Sage Weil <sage@redhat.com>
8003M: Alex Elder <elder@kernel.org> 8004M: Alex Elder <elder@kernel.org>
8004M: ceph-devel@vger.kernel.org 8005M: ceph-devel@vger.kernel.org
8005W: http://ceph.com/ 8006W: http://ceph.com/
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 8a86b62466f7..b40af3203089 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -38,6 +38,7 @@
38#include <linux/kernel.h> 38#include <linux/kernel.h>
39#include <linux/device.h> 39#include <linux/device.h>
40#include <linux/module.h> 40#include <linux/module.h>
41#include <linux/blk-mq.h>
41#include <linux/fs.h> 42#include <linux/fs.h>
42#include <linux/blkdev.h> 43#include <linux/blkdev.h>
43#include <linux/slab.h> 44#include <linux/slab.h>
@@ -340,9 +341,7 @@ struct rbd_device {
340 341
341 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 342 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
342 343
343 struct list_head rq_queue; /* incoming rq queue */
344 spinlock_t lock; /* queue, flags, open_count */ 344 spinlock_t lock; /* queue, flags, open_count */
345 struct work_struct rq_work;
346 345
347 struct rbd_image_header header; 346 struct rbd_image_header header;
348 unsigned long flags; /* possibly lock protected */ 347 unsigned long flags; /* possibly lock protected */
@@ -360,6 +359,9 @@ struct rbd_device {
360 atomic_t parent_ref; 359 atomic_t parent_ref;
361 struct rbd_device *parent; 360 struct rbd_device *parent;
362 361
362 /* Block layer tags. */
363 struct blk_mq_tag_set tag_set;
364
363 /* protects updating the header */ 365 /* protects updating the header */
364 struct rw_semaphore header_rwsem; 366 struct rw_semaphore header_rwsem;
365 367
@@ -1817,7 +1819,8 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1817 1819
1818 /* 1820 /*
1819 * We support a 64-bit length, but ultimately it has to be 1821 * We support a 64-bit length, but ultimately it has to be
1820 * passed to blk_end_request(), which takes an unsigned int. 1822 * passed to the block layer, which just supports a 32-bit
1823 * length field.
1821 */ 1824 */
1822 obj_request->xferred = osd_req->r_reply_op_len[0]; 1825 obj_request->xferred = osd_req->r_reply_op_len[0];
1823 rbd_assert(obj_request->xferred < (u64)UINT_MAX); 1826 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
@@ -2275,7 +2278,10 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2275 more = obj_request->which < img_request->obj_request_count - 1; 2278 more = obj_request->which < img_request->obj_request_count - 1;
2276 } else { 2279 } else {
2277 rbd_assert(img_request->rq != NULL); 2280 rbd_assert(img_request->rq != NULL);
2278 more = blk_end_request(img_request->rq, result, xferred); 2281
2282 more = blk_update_request(img_request->rq, result, xferred);
2283 if (!more)
2284 __blk_mq_end_request(img_request->rq, result);
2279 } 2285 }
2280 2286
2281 return more; 2287 return more;
@@ -3304,8 +3310,10 @@ out:
3304 return ret; 3310 return ret;
3305} 3311}
3306 3312
3307static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq) 3313static void rbd_queue_workfn(struct work_struct *work)
3308{ 3314{
3315 struct request *rq = blk_mq_rq_from_pdu(work);
3316 struct rbd_device *rbd_dev = rq->q->queuedata;
3309 struct rbd_img_request *img_request; 3317 struct rbd_img_request *img_request;
3310 struct ceph_snap_context *snapc = NULL; 3318 struct ceph_snap_context *snapc = NULL;
3311 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT; 3319 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
@@ -3314,6 +3322,13 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
3314 u64 mapping_size; 3322 u64 mapping_size;
3315 int result; 3323 int result;
3316 3324
3325 if (rq->cmd_type != REQ_TYPE_FS) {
3326 dout("%s: non-fs request type %d\n", __func__,
3327 (int) rq->cmd_type);
3328 result = -EIO;
3329 goto err;
3330 }
3331
3317 if (rq->cmd_flags & REQ_DISCARD) 3332 if (rq->cmd_flags & REQ_DISCARD)
3318 op_type = OBJ_OP_DISCARD; 3333 op_type = OBJ_OP_DISCARD;
3319 else if (rq->cmd_flags & REQ_WRITE) 3334 else if (rq->cmd_flags & REQ_WRITE)
@@ -3359,6 +3374,8 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
3359 goto err_rq; /* Shouldn't happen */ 3374 goto err_rq; /* Shouldn't happen */
3360 } 3375 }
3361 3376
3377 blk_mq_start_request(rq);
3378
3362 down_read(&rbd_dev->header_rwsem); 3379 down_read(&rbd_dev->header_rwsem);
3363 mapping_size = rbd_dev->mapping.size; 3380 mapping_size = rbd_dev->mapping.size;
3364 if (op_type != OBJ_OP_READ) { 3381 if (op_type != OBJ_OP_READ) {
@@ -3404,53 +3421,18 @@ err_rq:
3404 rbd_warn(rbd_dev, "%s %llx at %llx result %d", 3421 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3405 obj_op_name(op_type), length, offset, result); 3422 obj_op_name(op_type), length, offset, result);
3406 ceph_put_snap_context(snapc); 3423 ceph_put_snap_context(snapc);
3407 blk_end_request_all(rq, result); 3424err:
3425 blk_mq_end_request(rq, result);
3408} 3426}
3409 3427
3410static void rbd_request_workfn(struct work_struct *work) 3428static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
3429 const struct blk_mq_queue_data *bd)
3411{ 3430{
3412 struct rbd_device *rbd_dev = 3431 struct request *rq = bd->rq;
3413 container_of(work, struct rbd_device, rq_work); 3432 struct work_struct *work = blk_mq_rq_to_pdu(rq);
3414 struct request *rq, *next;
3415 LIST_HEAD(requests);
3416
3417 spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
3418 list_splice_init(&rbd_dev->rq_queue, &requests);
3419 spin_unlock_irq(&rbd_dev->lock);
3420 3433
3421 list_for_each_entry_safe(rq, next, &requests, queuelist) { 3434 queue_work(rbd_wq, work);
3422 list_del_init(&rq->queuelist); 3435 return BLK_MQ_RQ_QUEUE_OK;
3423 rbd_handle_request(rbd_dev, rq);
3424 }
3425}
3426
3427/*
3428 * Called with q->queue_lock held and interrupts disabled, possibly on
3429 * the way to schedule(). Do not sleep here!
3430 */
3431static void rbd_request_fn(struct request_queue *q)
3432{
3433 struct rbd_device *rbd_dev = q->queuedata;
3434 struct request *rq;
3435 int queued = 0;
3436
3437 rbd_assert(rbd_dev);
3438
3439 while ((rq = blk_fetch_request(q))) {
3440 /* Ignore any non-FS requests that filter through. */
3441 if (rq->cmd_type != REQ_TYPE_FS) {
3442 dout("%s: non-fs request type %d\n", __func__,
3443 (int) rq->cmd_type);
3444 __blk_end_request_all(rq, 0);
3445 continue;
3446 }
3447
3448 list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
3449 queued++;
3450 }
3451
3452 if (queued)
3453 queue_work(rbd_wq, &rbd_dev->rq_work);
3454} 3436}
3455 3437
3456/* 3438/*
@@ -3511,6 +3493,7 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
3511 del_gendisk(disk); 3493 del_gendisk(disk);
3512 if (disk->queue) 3494 if (disk->queue)
3513 blk_cleanup_queue(disk->queue); 3495 blk_cleanup_queue(disk->queue);
3496 blk_mq_free_tag_set(&rbd_dev->tag_set);
3514 } 3497 }
3515 put_disk(disk); 3498 put_disk(disk);
3516} 3499}
@@ -3694,7 +3677,7 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3694 3677
3695 ret = rbd_dev_header_info(rbd_dev); 3678 ret = rbd_dev_header_info(rbd_dev);
3696 if (ret) 3679 if (ret)
3697 return ret; 3680 goto out;
3698 3681
3699 /* 3682 /*
3700 * If there is a parent, see if it has disappeared due to the 3683 * If there is a parent, see if it has disappeared due to the
@@ -3703,30 +3686,46 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3703 if (rbd_dev->parent) { 3686 if (rbd_dev->parent) {
3704 ret = rbd_dev_v2_parent_info(rbd_dev); 3687 ret = rbd_dev_v2_parent_info(rbd_dev);
3705 if (ret) 3688 if (ret)
3706 return ret; 3689 goto out;
3707 } 3690 }
3708 3691
3709 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) { 3692 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3710 if (rbd_dev->mapping.size != rbd_dev->header.image_size) 3693 rbd_dev->mapping.size = rbd_dev->header.image_size;
3711 rbd_dev->mapping.size = rbd_dev->header.image_size;
3712 } else { 3694 } else {
3713 /* validate mapped snapshot's EXISTS flag */ 3695 /* validate mapped snapshot's EXISTS flag */
3714 rbd_exists_validate(rbd_dev); 3696 rbd_exists_validate(rbd_dev);
3715 } 3697 }
3716 3698
3699out:
3717 up_write(&rbd_dev->header_rwsem); 3700 up_write(&rbd_dev->header_rwsem);
3718 3701 if (!ret && mapping_size != rbd_dev->mapping.size)
3719 if (mapping_size != rbd_dev->mapping.size)
3720 rbd_dev_update_size(rbd_dev); 3702 rbd_dev_update_size(rbd_dev);
3721 3703
3704 return ret;
3705}
3706
3707static int rbd_init_request(void *data, struct request *rq,
3708 unsigned int hctx_idx, unsigned int request_idx,
3709 unsigned int numa_node)
3710{
3711 struct work_struct *work = blk_mq_rq_to_pdu(rq);
3712
3713 INIT_WORK(work, rbd_queue_workfn);
3722 return 0; 3714 return 0;
3723} 3715}
3724 3716
3717static struct blk_mq_ops rbd_mq_ops = {
3718 .queue_rq = rbd_queue_rq,
3719 .map_queue = blk_mq_map_queue,
3720 .init_request = rbd_init_request,
3721};
3722
3725static int rbd_init_disk(struct rbd_device *rbd_dev) 3723static int rbd_init_disk(struct rbd_device *rbd_dev)
3726{ 3724{
3727 struct gendisk *disk; 3725 struct gendisk *disk;
3728 struct request_queue *q; 3726 struct request_queue *q;
3729 u64 segment_size; 3727 u64 segment_size;
3728 int err;
3730 3729
3731 /* create gendisk info */ 3730 /* create gendisk info */
3732 disk = alloc_disk(single_major ? 3731 disk = alloc_disk(single_major ?
@@ -3744,10 +3743,25 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
3744 disk->fops = &rbd_bd_ops; 3743 disk->fops = &rbd_bd_ops;
3745 disk->private_data = rbd_dev; 3744 disk->private_data = rbd_dev;
3746 3745
3747 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock); 3746 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
3748 if (!q) 3747 rbd_dev->tag_set.ops = &rbd_mq_ops;
3748 rbd_dev->tag_set.queue_depth = BLKDEV_MAX_RQ;
3749 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
3750 rbd_dev->tag_set.flags =
3751 BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
3752 rbd_dev->tag_set.nr_hw_queues = 1;
3753 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
3754
3755 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
3756 if (err)
3749 goto out_disk; 3757 goto out_disk;
3750 3758
3759 q = blk_mq_init_queue(&rbd_dev->tag_set);
3760 if (IS_ERR(q)) {
3761 err = PTR_ERR(q);
3762 goto out_tag_set;
3763 }
3764
3751 /* We use the default size, but let's be explicit about it. */ 3765 /* We use the default size, but let's be explicit about it. */
3752 blk_queue_physical_block_size(q, SECTOR_SIZE); 3766 blk_queue_physical_block_size(q, SECTOR_SIZE);
3753 3767
@@ -3773,10 +3787,11 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
3773 rbd_dev->disk = disk; 3787 rbd_dev->disk = disk;
3774 3788
3775 return 0; 3789 return 0;
3790out_tag_set:
3791 blk_mq_free_tag_set(&rbd_dev->tag_set);
3776out_disk: 3792out_disk:
3777 put_disk(disk); 3793 put_disk(disk);
3778 3794 return err;
3779 return -ENOMEM;
3780} 3795}
3781 3796
3782/* 3797/*
@@ -4033,8 +4048,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4033 return NULL; 4048 return NULL;
4034 4049
4035 spin_lock_init(&rbd_dev->lock); 4050 spin_lock_init(&rbd_dev->lock);
4036 INIT_LIST_HEAD(&rbd_dev->rq_queue);
4037 INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
4038 rbd_dev->flags = 0; 4051 rbd_dev->flags = 0;
4039 atomic_set(&rbd_dev->parent_ref, 0); 4052 atomic_set(&rbd_dev->parent_ref, 0);
4040 INIT_LIST_HEAD(&rbd_dev->node); 4053 INIT_LIST_HEAD(&rbd_dev->node);
@@ -4274,32 +4287,22 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4274 } 4287 }
4275 4288
4276 /* 4289 /*
4277 * We always update the parent overlap. If it's zero we 4290 * We always update the parent overlap. If it's zero we issue
4278 * treat it specially. 4291 * a warning, as we will proceed as if there was no parent.
4279 */ 4292 */
4280 rbd_dev->parent_overlap = overlap;
4281 if (!overlap) { 4293 if (!overlap) {
4282
4283 /* A null parent_spec indicates it's the initial probe */
4284
4285 if (parent_spec) { 4294 if (parent_spec) {
4286 /* 4295 /* refresh, careful to warn just once */
4287 * The overlap has become zero, so the clone 4296 if (rbd_dev->parent_overlap)
4288 * must have been resized down to 0 at some 4297 rbd_warn(rbd_dev,
4289 * point. Treat this the same as a flatten. 4298 "clone now standalone (overlap became 0)");
4290 */
4291 rbd_dev_parent_put(rbd_dev);
4292 pr_info("%s: clone image now standalone\n",
4293 rbd_dev->disk->disk_name);
4294 } else { 4299 } else {
4295 /* 4300 /* initial probe */
4296 * For the initial probe, if we find the 4301 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
4297 * overlap is zero we just pretend there was
4298 * no parent image.
4299 */
4300 rbd_warn(rbd_dev, "ignoring parent with overlap 0");
4301 } 4302 }
4302 } 4303 }
4304 rbd_dev->parent_overlap = overlap;
4305
4303out: 4306out:
4304 ret = 0; 4307 ret = 0;
4305out_err: 4308out_err:
@@ -4771,36 +4774,6 @@ static inline size_t next_token(const char **buf)
4771} 4774}
4772 4775
4773/* 4776/*
4774 * Finds the next token in *buf, and if the provided token buffer is
4775 * big enough, copies the found token into it. The result, if
4776 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4777 * must be terminated with '\0' on entry.
4778 *
4779 * Returns the length of the token found (not including the '\0').
4780 * Return value will be 0 if no token is found, and it will be >=
4781 * token_size if the token would not fit.
4782 *
4783 * The *buf pointer will be updated to point beyond the end of the
4784 * found token. Note that this occurs even if the token buffer is
4785 * too small to hold it.
4786 */
4787static inline size_t copy_token(const char **buf,
4788 char *token,
4789 size_t token_size)
4790{
4791 size_t len;
4792
4793 len = next_token(buf);
4794 if (len < token_size) {
4795 memcpy(token, *buf, len);
4796 *(token + len) = '\0';
4797 }
4798 *buf += len;
4799
4800 return len;
4801}
4802
4803/*
4804 * Finds the next token in *buf, dynamically allocates a buffer big 4777 * Finds the next token in *buf, dynamically allocates a buffer big
4805 * enough to hold a copy of it, and copies the token into the new 4778 * enough to hold a copy of it, and copies the token into the new
4806 * buffer. The copy is guaranteed to be terminated with '\0'. Note 4779 * buffer. The copy is guaranteed to be terminated with '\0'. Note
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 5bd853ba44ff..64fa248343f6 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -40,20 +40,6 @@ static inline void ceph_set_cached_acl(struct inode *inode,
40 spin_unlock(&ci->i_ceph_lock); 40 spin_unlock(&ci->i_ceph_lock);
41} 41}
42 42
43static inline struct posix_acl *ceph_get_cached_acl(struct inode *inode,
44 int type)
45{
46 struct ceph_inode_info *ci = ceph_inode(inode);
47 struct posix_acl *acl = ACL_NOT_CACHED;
48
49 spin_lock(&ci->i_ceph_lock);
50 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0))
51 acl = get_cached_acl(inode, type);
52 spin_unlock(&ci->i_ceph_lock);
53
54 return acl;
55}
56
57struct posix_acl *ceph_get_acl(struct inode *inode, int type) 43struct posix_acl *ceph_get_acl(struct inode *inode, int type)
58{ 44{
59 int size; 45 int size;
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 24be059fd1f8..fd5599d32362 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -196,17 +196,22 @@ static int readpage_nounlock(struct file *filp, struct page *page)
196 u64 len = PAGE_CACHE_SIZE; 196 u64 len = PAGE_CACHE_SIZE;
197 197
198 if (off >= i_size_read(inode)) { 198 if (off >= i_size_read(inode)) {
199 zero_user_segment(page, err, PAGE_CACHE_SIZE); 199 zero_user_segment(page, 0, PAGE_CACHE_SIZE);
200 SetPageUptodate(page); 200 SetPageUptodate(page);
201 return 0; 201 return 0;
202 } 202 }
203 203
204 /* 204 if (ci->i_inline_version != CEPH_INLINE_NONE) {
205 * Uptodate inline data should have been added into page cache 205 /*
206 * while getting Fcr caps. 206 * Uptodate inline data should have been added
207 */ 207 * into page cache while getting Fcr caps.
208 if (ci->i_inline_version != CEPH_INLINE_NONE) 208 */
209 return -EINVAL; 209 if (off == 0)
210 return -EINVAL;
211 zero_user_segment(page, 0, PAGE_CACHE_SIZE);
212 SetPageUptodate(page);
213 return 0;
214 }
210 215
211 err = ceph_readpage_from_fscache(inode, page); 216 err = ceph_readpage_from_fscache(inode, page);
212 if (err == 0) 217 if (err == 0)
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index b93c631c6c87..8172775428a0 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -577,7 +577,6 @@ void ceph_add_cap(struct inode *inode,
577 struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc, 577 struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
578 realmino); 578 realmino);
579 if (realm) { 579 if (realm) {
580 ceph_get_snap_realm(mdsc, realm);
581 spin_lock(&realm->inodes_with_caps_lock); 580 spin_lock(&realm->inodes_with_caps_lock);
582 ci->i_snap_realm = realm; 581 ci->i_snap_realm = realm;
583 list_add(&ci->i_snap_realm_item, 582 list_add(&ci->i_snap_realm_item,
@@ -1451,8 +1450,8 @@ static int __mark_caps_flushing(struct inode *inode,
1451 spin_lock(&mdsc->cap_dirty_lock); 1450 spin_lock(&mdsc->cap_dirty_lock);
1452 list_del_init(&ci->i_dirty_item); 1451 list_del_init(&ci->i_dirty_item);
1453 1452
1454 ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
1455 if (list_empty(&ci->i_flushing_item)) { 1453 if (list_empty(&ci->i_flushing_item)) {
1454 ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
1456 list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); 1455 list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
1457 mdsc->num_cap_flushing++; 1456 mdsc->num_cap_flushing++;
1458 dout(" inode %p now flushing seq %lld\n", inode, 1457 dout(" inode %p now flushing seq %lld\n", inode,
@@ -2073,17 +2072,16 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
2073 * requested from the MDS. 2072 * requested from the MDS.
2074 */ 2073 */
2075static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, 2074static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2076 loff_t endoff, int *got, struct page **pinned_page, 2075 loff_t endoff, int *got, int *check_max, int *err)
2077 int *check_max, int *err)
2078{ 2076{
2079 struct inode *inode = &ci->vfs_inode; 2077 struct inode *inode = &ci->vfs_inode;
2080 int ret = 0; 2078 int ret = 0;
2081 int have, implemented, _got = 0; 2079 int have, implemented;
2082 int file_wanted; 2080 int file_wanted;
2083 2081
2084 dout("get_cap_refs %p need %s want %s\n", inode, 2082 dout("get_cap_refs %p need %s want %s\n", inode,
2085 ceph_cap_string(need), ceph_cap_string(want)); 2083 ceph_cap_string(need), ceph_cap_string(want));
2086again: 2084
2087 spin_lock(&ci->i_ceph_lock); 2085 spin_lock(&ci->i_ceph_lock);
2088 2086
2089 /* make sure file is actually open */ 2087 /* make sure file is actually open */
@@ -2138,50 +2136,34 @@ again:
2138 inode, ceph_cap_string(have), ceph_cap_string(not), 2136 inode, ceph_cap_string(have), ceph_cap_string(not),
2139 ceph_cap_string(revoking)); 2137 ceph_cap_string(revoking));
2140 if ((revoking & not) == 0) { 2138 if ((revoking & not) == 0) {
2141 _got = need | (have & want); 2139 *got = need | (have & want);
2142 __take_cap_refs(ci, _got); 2140 __take_cap_refs(ci, *got);
2143 ret = 1; 2141 ret = 1;
2144 } 2142 }
2145 } else { 2143 } else {
2144 int session_readonly = false;
2145 if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) {
2146 struct ceph_mds_session *s = ci->i_auth_cap->session;
2147 spin_lock(&s->s_cap_lock);
2148 session_readonly = s->s_readonly;
2149 spin_unlock(&s->s_cap_lock);
2150 }
2151 if (session_readonly) {
2152 dout("get_cap_refs %p needed %s but mds%d readonly\n",
2153 inode, ceph_cap_string(need), ci->i_auth_cap->mds);
2154 *err = -EROFS;
2155 ret = 1;
2156 goto out_unlock;
2157 }
2158
2146 dout("get_cap_refs %p have %s needed %s\n", inode, 2159 dout("get_cap_refs %p have %s needed %s\n", inode,
2147 ceph_cap_string(have), ceph_cap_string(need)); 2160 ceph_cap_string(have), ceph_cap_string(need));
2148 } 2161 }
2149out_unlock: 2162out_unlock:
2150 spin_unlock(&ci->i_ceph_lock); 2163 spin_unlock(&ci->i_ceph_lock);
2151 2164
2152 if (ci->i_inline_version != CEPH_INLINE_NONE &&
2153 (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
2154 i_size_read(inode) > 0) {
2155 int ret1;
2156 struct page *page = find_get_page(inode->i_mapping, 0);
2157 if (page) {
2158 if (PageUptodate(page)) {
2159 *pinned_page = page;
2160 goto out;
2161 }
2162 page_cache_release(page);
2163 }
2164 /*
2165 * drop cap refs first because getattr while holding
2166 * caps refs can cause deadlock.
2167 */
2168 ceph_put_cap_refs(ci, _got);
2169 _got = 0;
2170
2171 /* getattr request will bring inline data into page cache */
2172 ret1 = __ceph_do_getattr(inode, NULL,
2173 CEPH_STAT_CAP_INLINE_DATA, true);
2174 if (ret1 >= 0) {
2175 ret = 0;
2176 goto again;
2177 }
2178 *err = ret1;
2179 ret = 1;
2180 }
2181out:
2182 dout("get_cap_refs %p ret %d got %s\n", inode, 2165 dout("get_cap_refs %p ret %d got %s\n", inode,
2183 ret, ceph_cap_string(_got)); 2166 ret, ceph_cap_string(*got));
2184 *got = _got;
2185 return ret; 2167 return ret;
2186} 2168}
2187 2169
@@ -2221,22 +2203,52 @@ static void check_max_size(struct inode *inode, loff_t endoff)
2221int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, 2203int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
2222 loff_t endoff, int *got, struct page **pinned_page) 2204 loff_t endoff, int *got, struct page **pinned_page)
2223{ 2205{
2224 int check_max, ret, err; 2206 int _got, check_max, ret, err = 0;
2225 2207
2226retry: 2208retry:
2227 if (endoff > 0) 2209 if (endoff > 0)
2228 check_max_size(&ci->vfs_inode, endoff); 2210 check_max_size(&ci->vfs_inode, endoff);
2211 _got = 0;
2229 check_max = 0; 2212 check_max = 0;
2230 err = 0;
2231 ret = wait_event_interruptible(ci->i_cap_wq, 2213 ret = wait_event_interruptible(ci->i_cap_wq,
2232 try_get_cap_refs(ci, need, want, endoff, 2214 try_get_cap_refs(ci, need, want, endoff,
2233 got, pinned_page, 2215 &_got, &check_max, &err));
2234 &check_max, &err));
2235 if (err) 2216 if (err)
2236 ret = err; 2217 ret = err;
2218 if (ret < 0)
2219 return ret;
2220
2237 if (check_max) 2221 if (check_max)
2238 goto retry; 2222 goto retry;
2239 return ret; 2223
2224 if (ci->i_inline_version != CEPH_INLINE_NONE &&
2225 (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
2226 i_size_read(&ci->vfs_inode) > 0) {
2227 struct page *page = find_get_page(ci->vfs_inode.i_mapping, 0);
2228 if (page) {
2229 if (PageUptodate(page)) {
2230 *pinned_page = page;
2231 goto out;
2232 }
2233 page_cache_release(page);
2234 }
2235 /*
2236 * drop cap refs first because getattr while holding
2237 * caps refs can cause deadlock.
2238 */
2239 ceph_put_cap_refs(ci, _got);
2240 _got = 0;
2241
2242 /* getattr request will bring inline data into page cache */
2243 ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
2244 CEPH_STAT_CAP_INLINE_DATA, true);
2245 if (ret < 0)
2246 return ret;
2247 goto retry;
2248 }
2249out:
2250 *got = _got;
2251 return 0;
2240} 2252}
2241 2253
2242/* 2254/*
@@ -2432,13 +2444,13 @@ static void invalidate_aliases(struct inode *inode)
2432 */ 2444 */
2433static void handle_cap_grant(struct ceph_mds_client *mdsc, 2445static void handle_cap_grant(struct ceph_mds_client *mdsc,
2434 struct inode *inode, struct ceph_mds_caps *grant, 2446 struct inode *inode, struct ceph_mds_caps *grant,
2435 void *snaptrace, int snaptrace_len,
2436 u64 inline_version, 2447 u64 inline_version,
2437 void *inline_data, int inline_len, 2448 void *inline_data, int inline_len,
2438 struct ceph_buffer *xattr_buf, 2449 struct ceph_buffer *xattr_buf,
2439 struct ceph_mds_session *session, 2450 struct ceph_mds_session *session,
2440 struct ceph_cap *cap, int issued) 2451 struct ceph_cap *cap, int issued)
2441 __releases(ci->i_ceph_lock) 2452 __releases(ci->i_ceph_lock)
2453 __releases(mdsc->snap_rwsem)
2442{ 2454{
2443 struct ceph_inode_info *ci = ceph_inode(inode); 2455 struct ceph_inode_info *ci = ceph_inode(inode);
2444 int mds = session->s_mds; 2456 int mds = session->s_mds;
@@ -2639,10 +2651,6 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
2639 spin_unlock(&ci->i_ceph_lock); 2651 spin_unlock(&ci->i_ceph_lock);
2640 2652
2641 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { 2653 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
2642 down_write(&mdsc->snap_rwsem);
2643 ceph_update_snap_trace(mdsc, snaptrace,
2644 snaptrace + snaptrace_len, false);
2645 downgrade_write(&mdsc->snap_rwsem);
2646 kick_flushing_inode_caps(mdsc, session, inode); 2654 kick_flushing_inode_caps(mdsc, session, inode);
2647 up_read(&mdsc->snap_rwsem); 2655 up_read(&mdsc->snap_rwsem);
2648 if (newcaps & ~issued) 2656 if (newcaps & ~issued)
@@ -3052,6 +3060,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3052 struct ceph_cap *cap; 3060 struct ceph_cap *cap;
3053 struct ceph_mds_caps *h; 3061 struct ceph_mds_caps *h;
3054 struct ceph_mds_cap_peer *peer = NULL; 3062 struct ceph_mds_cap_peer *peer = NULL;
3063 struct ceph_snap_realm *realm;
3055 int mds = session->s_mds; 3064 int mds = session->s_mds;
3056 int op, issued; 3065 int op, issued;
3057 u32 seq, mseq; 3066 u32 seq, mseq;
@@ -3153,11 +3162,23 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3153 goto done_unlocked; 3162 goto done_unlocked;
3154 3163
3155 case CEPH_CAP_OP_IMPORT: 3164 case CEPH_CAP_OP_IMPORT:
3165 realm = NULL;
3166 if (snaptrace_len) {
3167 down_write(&mdsc->snap_rwsem);
3168 ceph_update_snap_trace(mdsc, snaptrace,
3169 snaptrace + snaptrace_len,
3170 false, &realm);
3171 downgrade_write(&mdsc->snap_rwsem);
3172 } else {
3173 down_read(&mdsc->snap_rwsem);
3174 }
3156 handle_cap_import(mdsc, inode, h, peer, session, 3175 handle_cap_import(mdsc, inode, h, peer, session,
3157 &cap, &issued); 3176 &cap, &issued);
3158 handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len, 3177 handle_cap_grant(mdsc, inode, h,
3159 inline_version, inline_data, inline_len, 3178 inline_version, inline_data, inline_len,
3160 msg->middle, session, cap, issued); 3179 msg->middle, session, cap, issued);
3180 if (realm)
3181 ceph_put_snap_realm(mdsc, realm);
3161 goto done_unlocked; 3182 goto done_unlocked;
3162 } 3183 }
3163 3184
@@ -3177,7 +3198,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3177 case CEPH_CAP_OP_GRANT: 3198 case CEPH_CAP_OP_GRANT:
3178 __ceph_caps_issued(ci, &issued); 3199 __ceph_caps_issued(ci, &issued);
3179 issued |= __ceph_caps_dirty(ci); 3200 issued |= __ceph_caps_dirty(ci);
3180 handle_cap_grant(mdsc, inode, h, NULL, 0, 3201 handle_cap_grant(mdsc, inode, h,
3181 inline_version, inline_data, inline_len, 3202 inline_version, inline_data, inline_len,
3182 msg->middle, session, cap, issued); 3203 msg->middle, session, cap, issued);
3183 goto done_unlocked; 3204 goto done_unlocked;
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index c241603764fd..0411dbb15815 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -26,8 +26,6 @@
26 * point by name. 26 * point by name.
27 */ 27 */
28 28
29const struct inode_operations ceph_dir_iops;
30const struct file_operations ceph_dir_fops;
31const struct dentry_operations ceph_dentry_ops; 29const struct dentry_operations ceph_dentry_ops;
32 30
33/* 31/*
@@ -672,13 +670,17 @@ int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry)
672 /* 670 /*
673 * We created the item, then did a lookup, and found 671 * We created the item, then did a lookup, and found
674 * it was already linked to another inode we already 672 * it was already linked to another inode we already
675 * had in our cache (and thus got spliced). Link our 673 * had in our cache (and thus got spliced). To not
676 * dentry to that inode, but don't hash it, just in 674 * confuse VFS (especially when inode is a directory),
677 * case the VFS wants to dereference it. 675 * we don't link our dentry to that inode, return an
676 * error instead.
677 *
678 * This event should be rare and it happens only when
679 * we talk to old MDS. Recent MDS does not send traceless
680 * reply for request that creates new inode.
678 */ 681 */
679 BUG_ON(!result->d_inode); 682 d_drop(result);
680 d_instantiate(dentry, result->d_inode); 683 return -ESTALE;
681 return 0;
682 } 684 }
683 return PTR_ERR(result); 685 return PTR_ERR(result);
684} 686}
@@ -1335,6 +1337,13 @@ const struct file_operations ceph_dir_fops = {
1335 .fsync = ceph_dir_fsync, 1337 .fsync = ceph_dir_fsync,
1336}; 1338};
1337 1339
1340const struct file_operations ceph_snapdir_fops = {
1341 .iterate = ceph_readdir,
1342 .llseek = ceph_dir_llseek,
1343 .open = ceph_open,
1344 .release = ceph_release,
1345};
1346
1338const struct inode_operations ceph_dir_iops = { 1347const struct inode_operations ceph_dir_iops = {
1339 .lookup = ceph_lookup, 1348 .lookup = ceph_lookup,
1340 .permission = ceph_permission, 1349 .permission = ceph_permission,
@@ -1357,6 +1366,14 @@ const struct inode_operations ceph_dir_iops = {
1357 .atomic_open = ceph_atomic_open, 1366 .atomic_open = ceph_atomic_open,
1358}; 1367};
1359 1368
1369const struct inode_operations ceph_snapdir_iops = {
1370 .lookup = ceph_lookup,
1371 .permission = ceph_permission,
1372 .getattr = ceph_getattr,
1373 .mkdir = ceph_mkdir,
1374 .rmdir = ceph_unlink,
1375};
1376
1360const struct dentry_operations ceph_dentry_ops = { 1377const struct dentry_operations ceph_dentry_ops = {
1361 .d_revalidate = ceph_d_revalidate, 1378 .d_revalidate = ceph_d_revalidate,
1362 .d_release = ceph_d_release, 1379 .d_release = ceph_d_release,
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 905986dd4c3c..a3d774b35149 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -275,10 +275,10 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
275 err = ceph_mdsc_do_request(mdsc, 275 err = ceph_mdsc_do_request(mdsc,
276 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, 276 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
277 req); 277 req);
278 err = ceph_handle_snapdir(req, dentry, err);
278 if (err) 279 if (err)
279 goto out_req; 280 goto out_req;
280 281
281 err = ceph_handle_snapdir(req, dentry, err);
282 if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) 282 if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
283 err = ceph_handle_notrace_create(dir, dentry); 283 err = ceph_handle_notrace_create(dir, dentry);
284 284
@@ -392,13 +392,14 @@ more:
392 if (ret >= 0) { 392 if (ret >= 0) {
393 int didpages; 393 int didpages;
394 if (was_short && (pos + ret < inode->i_size)) { 394 if (was_short && (pos + ret < inode->i_size)) {
395 u64 tmp = min(this_len - ret, 395 int zlen = min(this_len - ret,
396 inode->i_size - pos - ret); 396 inode->i_size - pos - ret);
397 int zoff = (o_direct ? buf_align : io_align) +
398 read + ret;
397 dout(" zero gap %llu to %llu\n", 399 dout(" zero gap %llu to %llu\n",
398 pos + ret, pos + ret + tmp); 400 pos + ret, pos + ret + zlen);
399 ceph_zero_page_vector_range(page_align + read + ret, 401 ceph_zero_page_vector_range(zoff, zlen, pages);
400 tmp, pages); 402 ret += zlen;
401 ret += tmp;
402 } 403 }
403 404
404 didpages = (page_align + ret) >> PAGE_CACHE_SHIFT; 405 didpages = (page_align + ret) >> PAGE_CACHE_SHIFT;
@@ -878,28 +879,34 @@ again:
878 879
879 i_size = i_size_read(inode); 880 i_size = i_size_read(inode);
880 if (retry_op == READ_INLINE) { 881 if (retry_op == READ_INLINE) {
881 /* does not support inline data > PAGE_SIZE */ 882 BUG_ON(ret > 0 || read > 0);
882 if (i_size > PAGE_CACHE_SIZE) { 883 if (iocb->ki_pos < i_size &&
883 ret = -EIO; 884 iocb->ki_pos < PAGE_CACHE_SIZE) {
884 } else if (iocb->ki_pos < i_size) {
885 loff_t end = min_t(loff_t, i_size, 885 loff_t end = min_t(loff_t, i_size,
886 iocb->ki_pos + len); 886 iocb->ki_pos + len);
887 end = min_t(loff_t, end, PAGE_CACHE_SIZE);
887 if (statret < end) 888 if (statret < end)
888 zero_user_segment(page, statret, end); 889 zero_user_segment(page, statret, end);
889 ret = copy_page_to_iter(page, 890 ret = copy_page_to_iter(page,
890 iocb->ki_pos & ~PAGE_MASK, 891 iocb->ki_pos & ~PAGE_MASK,
891 end - iocb->ki_pos, to); 892 end - iocb->ki_pos, to);
892 iocb->ki_pos += ret; 893 iocb->ki_pos += ret;
893 } else { 894 read += ret;
894 ret = 0; 895 }
896 if (iocb->ki_pos < i_size && read < len) {
897 size_t zlen = min_t(size_t, len - read,
898 i_size - iocb->ki_pos);
899 ret = iov_iter_zero(zlen, to);
900 iocb->ki_pos += ret;
901 read += ret;
895 } 902 }
896 __free_pages(page, 0); 903 __free_pages(page, 0);
897 return ret; 904 return read;
898 } 905 }
899 906
900 /* hit EOF or hole? */ 907 /* hit EOF or hole? */
901 if (retry_op == CHECK_EOF && iocb->ki_pos < i_size && 908 if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
902 ret < len) { 909 ret < len) {
903 dout("sync_read hit hole, ppos %lld < size %lld" 910 dout("sync_read hit hole, ppos %lld < size %lld"
904 ", reading more\n", iocb->ki_pos, 911 ", reading more\n", iocb->ki_pos,
905 inode->i_size); 912 inode->i_size);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 6b5173605154..119c43c80638 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -82,8 +82,8 @@ struct inode *ceph_get_snapdir(struct inode *parent)
82 inode->i_mode = parent->i_mode; 82 inode->i_mode = parent->i_mode;
83 inode->i_uid = parent->i_uid; 83 inode->i_uid = parent->i_uid;
84 inode->i_gid = parent->i_gid; 84 inode->i_gid = parent->i_gid;
85 inode->i_op = &ceph_dir_iops; 85 inode->i_op = &ceph_snapdir_iops;
86 inode->i_fop = &ceph_dir_fops; 86 inode->i_fop = &ceph_snapdir_fops;
87 ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */ 87 ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */
88 ci->i_rbytes = 0; 88 ci->i_rbytes = 0;
89 return inode; 89 return inode;
@@ -838,30 +838,31 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
838 ceph_vinop(inode), inode->i_mode); 838 ceph_vinop(inode), inode->i_mode);
839 } 839 }
840 840
841 /* set dir completion flag? */
842 if (S_ISDIR(inode->i_mode) &&
843 ci->i_files == 0 && ci->i_subdirs == 0 &&
844 ceph_snap(inode) == CEPH_NOSNAP &&
845 (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
846 (issued & CEPH_CAP_FILE_EXCL) == 0 &&
847 !__ceph_dir_is_complete(ci)) {
848 dout(" marking %p complete (empty)\n", inode);
849 __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
850 ci->i_ordered_count);
851 }
852
853 /* were we issued a capability? */ 841 /* were we issued a capability? */
854 if (info->cap.caps) { 842 if (info->cap.caps) {
855 if (ceph_snap(inode) == CEPH_NOSNAP) { 843 if (ceph_snap(inode) == CEPH_NOSNAP) {
844 unsigned caps = le32_to_cpu(info->cap.caps);
856 ceph_add_cap(inode, session, 845 ceph_add_cap(inode, session,
857 le64_to_cpu(info->cap.cap_id), 846 le64_to_cpu(info->cap.cap_id),
858 cap_fmode, 847 cap_fmode, caps,
859 le32_to_cpu(info->cap.caps),
860 le32_to_cpu(info->cap.wanted), 848 le32_to_cpu(info->cap.wanted),
861 le32_to_cpu(info->cap.seq), 849 le32_to_cpu(info->cap.seq),
862 le32_to_cpu(info->cap.mseq), 850 le32_to_cpu(info->cap.mseq),
863 le64_to_cpu(info->cap.realm), 851 le64_to_cpu(info->cap.realm),
864 info->cap.flags, &new_cap); 852 info->cap.flags, &new_cap);
853
854 /* set dir completion flag? */
855 if (S_ISDIR(inode->i_mode) &&
856 ci->i_files == 0 && ci->i_subdirs == 0 &&
857 (caps & CEPH_CAP_FILE_SHARED) &&
858 (issued & CEPH_CAP_FILE_EXCL) == 0 &&
859 !__ceph_dir_is_complete(ci)) {
860 dout(" marking %p complete (empty)\n", inode);
861 __ceph_dir_set_complete(ci,
862 atomic_read(&ci->i_release_count),
863 ci->i_ordered_count);
864 }
865
865 wake = true; 866 wake = true;
866 } else { 867 } else {
867 dout(" %p got snap_caps %s\n", inode, 868 dout(" %p got snap_caps %s\n", inode,
@@ -1446,12 +1447,14 @@ retry_lookup:
1446 } 1447 }
1447 1448
1448 if (!dn->d_inode) { 1449 if (!dn->d_inode) {
1449 dn = splice_dentry(dn, in, NULL); 1450 struct dentry *realdn = splice_dentry(dn, in, NULL);
1450 if (IS_ERR(dn)) { 1451 if (IS_ERR(realdn)) {
1451 err = PTR_ERR(dn); 1452 err = PTR_ERR(realdn);
1453 d_drop(dn);
1452 dn = NULL; 1454 dn = NULL;
1453 goto next_item; 1455 goto next_item;
1454 } 1456 }
1457 dn = realdn;
1455 } 1458 }
1456 1459
1457 di = dn->d_fsdata; 1460 di = dn->d_fsdata;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 5f62fb7a5d0a..71c073f38e54 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -480,6 +480,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
480 mdsc->max_sessions = newmax; 480 mdsc->max_sessions = newmax;
481 } 481 }
482 mdsc->sessions[mds] = s; 482 mdsc->sessions[mds] = s;
483 atomic_inc(&mdsc->num_sessions);
483 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 484 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */
484 485
485 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 486 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
@@ -503,6 +504,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc,
503 mdsc->sessions[s->s_mds] = NULL; 504 mdsc->sessions[s->s_mds] = NULL;
504 ceph_con_close(&s->s_con); 505 ceph_con_close(&s->s_con);
505 ceph_put_mds_session(s); 506 ceph_put_mds_session(s);
507 atomic_dec(&mdsc->num_sessions);
506} 508}
507 509
508/* 510/*
@@ -842,8 +844,9 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
842 struct ceph_options *opt = mdsc->fsc->client->options; 844 struct ceph_options *opt = mdsc->fsc->client->options;
843 void *p; 845 void *p;
844 846
845 const char* metadata[3][2] = { 847 const char* metadata[][2] = {
846 {"hostname", utsname()->nodename}, 848 {"hostname", utsname()->nodename},
849 {"kernel_version", utsname()->release},
847 {"entity_id", opt->name ? opt->name : ""}, 850 {"entity_id", opt->name ? opt->name : ""},
848 {NULL, NULL} 851 {NULL, NULL}
849 }; 852 };
@@ -1464,19 +1467,33 @@ out_unlocked:
1464 return err; 1467 return err;
1465} 1468}
1466 1469
1470static int check_cap_flush(struct inode *inode, u64 want_flush_seq)
1471{
1472 struct ceph_inode_info *ci = ceph_inode(inode);
1473 int ret;
1474 spin_lock(&ci->i_ceph_lock);
1475 if (ci->i_flushing_caps)
1476 ret = ci->i_cap_flush_seq >= want_flush_seq;
1477 else
1478 ret = 1;
1479 spin_unlock(&ci->i_ceph_lock);
1480 return ret;
1481}
1482
1467/* 1483/*
1468 * flush all dirty inode data to disk. 1484 * flush all dirty inode data to disk.
1469 * 1485 *
1470 * returns true if we've flushed through want_flush_seq 1486 * returns true if we've flushed through want_flush_seq
1471 */ 1487 */
1472static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) 1488static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1473{ 1489{
1474 int mds, ret = 1; 1490 int mds;
1475 1491
1476 dout("check_cap_flush want %lld\n", want_flush_seq); 1492 dout("check_cap_flush want %lld\n", want_flush_seq);
1477 mutex_lock(&mdsc->mutex); 1493 mutex_lock(&mdsc->mutex);
1478 for (mds = 0; ret && mds < mdsc->max_sessions; mds++) { 1494 for (mds = 0; mds < mdsc->max_sessions; mds++) {
1479 struct ceph_mds_session *session = mdsc->sessions[mds]; 1495 struct ceph_mds_session *session = mdsc->sessions[mds];
1496 struct inode *inode = NULL;
1480 1497
1481 if (!session) 1498 if (!session)
1482 continue; 1499 continue;
@@ -1489,29 +1506,29 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1489 list_entry(session->s_cap_flushing.next, 1506 list_entry(session->s_cap_flushing.next,
1490 struct ceph_inode_info, 1507 struct ceph_inode_info,
1491 i_flushing_item); 1508 i_flushing_item);
1492 struct inode *inode = &ci->vfs_inode;
1493 1509
1494 spin_lock(&ci->i_ceph_lock); 1510 if (!check_cap_flush(&ci->vfs_inode, want_flush_seq)) {
1495 if (ci->i_cap_flush_seq <= want_flush_seq) {
1496 dout("check_cap_flush still flushing %p " 1511 dout("check_cap_flush still flushing %p "
1497 "seq %lld <= %lld to mds%d\n", inode, 1512 "seq %lld <= %lld to mds%d\n",
1498 ci->i_cap_flush_seq, want_flush_seq, 1513 &ci->vfs_inode, ci->i_cap_flush_seq,
1499 session->s_mds); 1514 want_flush_seq, session->s_mds);
1500 ret = 0; 1515 inode = igrab(&ci->vfs_inode);
1501 } 1516 }
1502 spin_unlock(&ci->i_ceph_lock);
1503 } 1517 }
1504 mutex_unlock(&session->s_mutex); 1518 mutex_unlock(&session->s_mutex);
1505 ceph_put_mds_session(session); 1519 ceph_put_mds_session(session);
1506 1520
1507 if (!ret) 1521 if (inode) {
1508 return ret; 1522 wait_event(mdsc->cap_flushing_wq,
1523 check_cap_flush(inode, want_flush_seq));
1524 iput(inode);
1525 }
1526
1509 mutex_lock(&mdsc->mutex); 1527 mutex_lock(&mdsc->mutex);
1510 } 1528 }
1511 1529
1512 mutex_unlock(&mdsc->mutex); 1530 mutex_unlock(&mdsc->mutex);
1513 dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq); 1531 dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1514 return ret;
1515} 1532}
1516 1533
1517/* 1534/*
@@ -1923,7 +1940,11 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1923 head->num_releases = cpu_to_le16(releases); 1940 head->num_releases = cpu_to_le16(releases);
1924 1941
1925 /* time stamp */ 1942 /* time stamp */
1926 ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp)); 1943 {
1944 struct ceph_timespec ts;
1945 ceph_encode_timespec(&ts, &req->r_stamp);
1946 ceph_encode_copy(&p, &ts, sizeof(ts));
1947 }
1927 1948
1928 BUG_ON(p > end); 1949 BUG_ON(p > end);
1929 msg->front.iov_len = p - msg->front.iov_base; 1950 msg->front.iov_len = p - msg->front.iov_base;
@@ -2012,7 +2033,11 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
2012 2033
2013 /* time stamp */ 2034 /* time stamp */
2014 p = msg->front.iov_base + req->r_request_release_offset; 2035 p = msg->front.iov_base + req->r_request_release_offset;
2015 ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp)); 2036 {
2037 struct ceph_timespec ts;
2038 ceph_encode_timespec(&ts, &req->r_stamp);
2039 ceph_encode_copy(&p, &ts, sizeof(ts));
2040 }
2016 2041
2017 msg->front.iov_len = p - msg->front.iov_base; 2042 msg->front.iov_len = p - msg->front.iov_base;
2018 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2043 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
@@ -2159,6 +2184,8 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2159 p = rb_next(p); 2184 p = rb_next(p);
2160 if (req->r_got_unsafe) 2185 if (req->r_got_unsafe)
2161 continue; 2186 continue;
2187 if (req->r_attempts > 0)
2188 continue; /* only new requests */
2162 if (req->r_session && 2189 if (req->r_session &&
2163 req->r_session->s_mds == mds) { 2190 req->r_session->s_mds == mds) {
2164 dout(" kicking tid %llu\n", req->r_tid); 2191 dout(" kicking tid %llu\n", req->r_tid);
@@ -2286,6 +2313,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2286 struct ceph_mds_request *req; 2313 struct ceph_mds_request *req;
2287 struct ceph_mds_reply_head *head = msg->front.iov_base; 2314 struct ceph_mds_reply_head *head = msg->front.iov_base;
2288 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 2315 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
2316 struct ceph_snap_realm *realm;
2289 u64 tid; 2317 u64 tid;
2290 int err, result; 2318 int err, result;
2291 int mds = session->s_mds; 2319 int mds = session->s_mds;
@@ -2401,11 +2429,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2401 } 2429 }
2402 2430
2403 /* snap trace */ 2431 /* snap trace */
2432 realm = NULL;
2404 if (rinfo->snapblob_len) { 2433 if (rinfo->snapblob_len) {
2405 down_write(&mdsc->snap_rwsem); 2434 down_write(&mdsc->snap_rwsem);
2406 ceph_update_snap_trace(mdsc, rinfo->snapblob, 2435 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2407 rinfo->snapblob + rinfo->snapblob_len, 2436 rinfo->snapblob + rinfo->snapblob_len,
2408 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP); 2437 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2438 &realm);
2409 downgrade_write(&mdsc->snap_rwsem); 2439 downgrade_write(&mdsc->snap_rwsem);
2410 } else { 2440 } else {
2411 down_read(&mdsc->snap_rwsem); 2441 down_read(&mdsc->snap_rwsem);
@@ -2423,6 +2453,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2423 mutex_unlock(&req->r_fill_mutex); 2453 mutex_unlock(&req->r_fill_mutex);
2424 2454
2425 up_read(&mdsc->snap_rwsem); 2455 up_read(&mdsc->snap_rwsem);
2456 if (realm)
2457 ceph_put_snap_realm(mdsc, realm);
2426out_err: 2458out_err:
2427 mutex_lock(&mdsc->mutex); 2459 mutex_lock(&mdsc->mutex);
2428 if (!req->r_aborted) { 2460 if (!req->r_aborted) {
@@ -2487,6 +2519,7 @@ static void handle_forward(struct ceph_mds_client *mdsc,
2487 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 2519 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2488 BUG_ON(req->r_err); 2520 BUG_ON(req->r_err);
2489 BUG_ON(req->r_got_result); 2521 BUG_ON(req->r_got_result);
2522 req->r_attempts = 0;
2490 req->r_num_fwd = fwd_seq; 2523 req->r_num_fwd = fwd_seq;
2491 req->r_resend_mds = next_mds; 2524 req->r_resend_mds = next_mds;
2492 put_request_session(req); 2525 put_request_session(req);
@@ -2580,6 +2613,14 @@ static void handle_session(struct ceph_mds_session *session,
2580 send_flushmsg_ack(mdsc, session, seq); 2613 send_flushmsg_ack(mdsc, session, seq);
2581 break; 2614 break;
2582 2615
2616 case CEPH_SESSION_FORCE_RO:
2617 dout("force_session_readonly %p\n", session);
2618 spin_lock(&session->s_cap_lock);
2619 session->s_readonly = true;
2620 spin_unlock(&session->s_cap_lock);
2621 wake_up_session_caps(session, 0);
2622 break;
2623
2583 default: 2624 default:
2584 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 2625 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2585 WARN_ON(1); 2626 WARN_ON(1);
@@ -2610,6 +2651,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2610 struct ceph_mds_session *session) 2651 struct ceph_mds_session *session)
2611{ 2652{
2612 struct ceph_mds_request *req, *nreq; 2653 struct ceph_mds_request *req, *nreq;
2654 struct rb_node *p;
2613 int err; 2655 int err;
2614 2656
2615 dout("replay_unsafe_requests mds%d\n", session->s_mds); 2657 dout("replay_unsafe_requests mds%d\n", session->s_mds);
@@ -2622,6 +2664,28 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2622 ceph_con_send(&session->s_con, req->r_request); 2664 ceph_con_send(&session->s_con, req->r_request);
2623 } 2665 }
2624 } 2666 }
2667
2668 /*
2669 * also re-send old requests when MDS enters reconnect stage. So that MDS
2670 * can process completed request in clientreplay stage.
2671 */
2672 p = rb_first(&mdsc->request_tree);
2673 while (p) {
2674 req = rb_entry(p, struct ceph_mds_request, r_node);
2675 p = rb_next(p);
2676 if (req->r_got_unsafe)
2677 continue;
2678 if (req->r_attempts == 0)
2679 continue; /* only old requests */
2680 if (req->r_session &&
2681 req->r_session->s_mds == session->s_mds) {
2682 err = __prepare_send_request(mdsc, req, session->s_mds);
2683 if (!err) {
2684 ceph_msg_get(req->r_request);
2685 ceph_con_send(&session->s_con, req->r_request);
2686 }
2687 }
2688 }
2625 mutex_unlock(&mdsc->mutex); 2689 mutex_unlock(&mdsc->mutex);
2626} 2690}
2627 2691
@@ -2787,6 +2851,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2787 spin_unlock(&session->s_gen_ttl_lock); 2851 spin_unlock(&session->s_gen_ttl_lock);
2788 2852
2789 spin_lock(&session->s_cap_lock); 2853 spin_lock(&session->s_cap_lock);
2854 /* don't know if session is readonly */
2855 session->s_readonly = 0;
2790 /* 2856 /*
2791 * notify __ceph_remove_cap() that we are composing cap reconnect. 2857 * notify __ceph_remove_cap() that we are composing cap reconnect.
2792 * If a cap get released before being added to the cap reconnect, 2858 * If a cap get released before being added to the cap reconnect,
@@ -2933,9 +2999,6 @@ static void check_new_map(struct ceph_mds_client *mdsc,
2933 mutex_unlock(&s->s_mutex); 2999 mutex_unlock(&s->s_mutex);
2934 s->s_state = CEPH_MDS_SESSION_RESTARTING; 3000 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2935 } 3001 }
2936
2937 /* kick any requests waiting on the recovering mds */
2938 kick_requests(mdsc, i);
2939 } else if (oldstate == newstate) { 3002 } else if (oldstate == newstate) {
2940 continue; /* nothing new with this mds */ 3003 continue; /* nothing new with this mds */
2941 } 3004 }
@@ -3295,6 +3358,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
3295 init_waitqueue_head(&mdsc->session_close_wq); 3358 init_waitqueue_head(&mdsc->session_close_wq);
3296 INIT_LIST_HEAD(&mdsc->waiting_for_map); 3359 INIT_LIST_HEAD(&mdsc->waiting_for_map);
3297 mdsc->sessions = NULL; 3360 mdsc->sessions = NULL;
3361 atomic_set(&mdsc->num_sessions, 0);
3298 mdsc->max_sessions = 0; 3362 mdsc->max_sessions = 0;
3299 mdsc->stopping = 0; 3363 mdsc->stopping = 0;
3300 init_rwsem(&mdsc->snap_rwsem); 3364 init_rwsem(&mdsc->snap_rwsem);
@@ -3428,14 +3492,17 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3428 dout("sync\n"); 3492 dout("sync\n");
3429 mutex_lock(&mdsc->mutex); 3493 mutex_lock(&mdsc->mutex);
3430 want_tid = mdsc->last_tid; 3494 want_tid = mdsc->last_tid;
3431 want_flush = mdsc->cap_flush_seq;
3432 mutex_unlock(&mdsc->mutex); 3495 mutex_unlock(&mdsc->mutex);
3433 dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3434 3496
3435 ceph_flush_dirty_caps(mdsc); 3497 ceph_flush_dirty_caps(mdsc);
3498 spin_lock(&mdsc->cap_dirty_lock);
3499 want_flush = mdsc->cap_flush_seq;
3500 spin_unlock(&mdsc->cap_dirty_lock);
3501
3502 dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3436 3503
3437 wait_unsafe_requests(mdsc, want_tid); 3504 wait_unsafe_requests(mdsc, want_tid);
3438 wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush)); 3505 wait_caps_flush(mdsc, want_flush);
3439} 3506}
3440 3507
3441/* 3508/*
@@ -3443,17 +3510,9 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3443 */ 3510 */
3444static bool done_closing_sessions(struct ceph_mds_client *mdsc) 3511static bool done_closing_sessions(struct ceph_mds_client *mdsc)
3445{ 3512{
3446 int i, n = 0;
3447
3448 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) 3513 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3449 return true; 3514 return true;
3450 3515 return atomic_read(&mdsc->num_sessions) == 0;
3451 mutex_lock(&mdsc->mutex);
3452 for (i = 0; i < mdsc->max_sessions; i++)
3453 if (mdsc->sessions[i])
3454 n++;
3455 mutex_unlock(&mdsc->mutex);
3456 return n == 0;
3457} 3516}
3458 3517
3459/* 3518/*
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index e2817d00f7d9..1875b5d985c6 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -137,6 +137,7 @@ struct ceph_mds_session {
137 int s_nr_caps, s_trim_caps; 137 int s_nr_caps, s_trim_caps;
138 int s_num_cap_releases; 138 int s_num_cap_releases;
139 int s_cap_reconnect; 139 int s_cap_reconnect;
140 int s_readonly;
140 struct list_head s_cap_releases; /* waiting cap_release messages */ 141 struct list_head s_cap_releases; /* waiting cap_release messages */
141 struct list_head s_cap_releases_done; /* ready to send */ 142 struct list_head s_cap_releases_done; /* ready to send */
142 struct ceph_cap *s_cap_iterator; 143 struct ceph_cap *s_cap_iterator;
@@ -272,6 +273,7 @@ struct ceph_mds_client {
272 struct list_head waiting_for_map; 273 struct list_head waiting_for_map;
273 274
274 struct ceph_mds_session **sessions; /* NULL for mds if no session */ 275 struct ceph_mds_session **sessions; /* NULL for mds if no session */
276 atomic_t num_sessions;
275 int max_sessions; /* len of s_mds_sessions */ 277 int max_sessions; /* len of s_mds_sessions */
276 int stopping; /* true if shutting down */ 278 int stopping; /* true if shutting down */
277 279
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index ce35fbd4ba5d..a97e39f09ba6 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -70,13 +70,11 @@ void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
70 * safe. we do need to protect against concurrent empty list 70 * safe. we do need to protect against concurrent empty list
71 * additions, however. 71 * additions, however.
72 */ 72 */
73 if (atomic_read(&realm->nref) == 0) { 73 if (atomic_inc_return(&realm->nref) == 1) {
74 spin_lock(&mdsc->snap_empty_lock); 74 spin_lock(&mdsc->snap_empty_lock);
75 list_del_init(&realm->empty_item); 75 list_del_init(&realm->empty_item);
76 spin_unlock(&mdsc->snap_empty_lock); 76 spin_unlock(&mdsc->snap_empty_lock);
77 } 77 }
78
79 atomic_inc(&realm->nref);
80} 78}
81 79
82static void __insert_snap_realm(struct rb_root *root, 80static void __insert_snap_realm(struct rb_root *root,
@@ -116,7 +114,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
116 if (!realm) 114 if (!realm)
117 return ERR_PTR(-ENOMEM); 115 return ERR_PTR(-ENOMEM);
118 116
119 atomic_set(&realm->nref, 0); /* tree does not take a ref */ 117 atomic_set(&realm->nref, 1); /* for caller */
120 realm->ino = ino; 118 realm->ino = ino;
121 INIT_LIST_HEAD(&realm->children); 119 INIT_LIST_HEAD(&realm->children);
122 INIT_LIST_HEAD(&realm->child_item); 120 INIT_LIST_HEAD(&realm->child_item);
@@ -134,8 +132,8 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
134 * 132 *
135 * caller must hold snap_rwsem for write. 133 * caller must hold snap_rwsem for write.
136 */ 134 */
137struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc, 135static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
138 u64 ino) 136 u64 ino)
139{ 137{
140 struct rb_node *n = mdsc->snap_realms.rb_node; 138 struct rb_node *n = mdsc->snap_realms.rb_node;
141 struct ceph_snap_realm *r; 139 struct ceph_snap_realm *r;
@@ -154,6 +152,16 @@ struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
154 return NULL; 152 return NULL;
155} 153}
156 154
155struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
156 u64 ino)
157{
158 struct ceph_snap_realm *r;
159 r = __lookup_snap_realm(mdsc, ino);
160 if (r)
161 ceph_get_snap_realm(mdsc, r);
162 return r;
163}
164
157static void __put_snap_realm(struct ceph_mds_client *mdsc, 165static void __put_snap_realm(struct ceph_mds_client *mdsc,
158 struct ceph_snap_realm *realm); 166 struct ceph_snap_realm *realm);
159 167
@@ -273,7 +281,6 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
273 } 281 }
274 realm->parent_ino = parentino; 282 realm->parent_ino = parentino;
275 realm->parent = parent; 283 realm->parent = parent;
276 ceph_get_snap_realm(mdsc, parent);
277 list_add(&realm->child_item, &parent->children); 284 list_add(&realm->child_item, &parent->children);
278 return 1; 285 return 1;
279} 286}
@@ -631,12 +638,14 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
631 * Caller must hold snap_rwsem for write. 638 * Caller must hold snap_rwsem for write.
632 */ 639 */
633int ceph_update_snap_trace(struct ceph_mds_client *mdsc, 640int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
634 void *p, void *e, bool deletion) 641 void *p, void *e, bool deletion,
642 struct ceph_snap_realm **realm_ret)
635{ 643{
636 struct ceph_mds_snap_realm *ri; /* encoded */ 644 struct ceph_mds_snap_realm *ri; /* encoded */
637 __le64 *snaps; /* encoded */ 645 __le64 *snaps; /* encoded */
638 __le64 *prior_parent_snaps; /* encoded */ 646 __le64 *prior_parent_snaps; /* encoded */
639 struct ceph_snap_realm *realm; 647 struct ceph_snap_realm *realm = NULL;
648 struct ceph_snap_realm *first_realm = NULL;
640 int invalidate = 0; 649 int invalidate = 0;
641 int err = -ENOMEM; 650 int err = -ENOMEM;
642 LIST_HEAD(dirty_realms); 651 LIST_HEAD(dirty_realms);
@@ -704,13 +713,18 @@ more:
704 dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino, 713 dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
705 realm, invalidate, p, e); 714 realm, invalidate, p, e);
706 715
707 if (p < e)
708 goto more;
709
710 /* invalidate when we reach the _end_ (root) of the trace */ 716 /* invalidate when we reach the _end_ (root) of the trace */
711 if (invalidate) 717 if (invalidate && p >= e)
712 rebuild_snap_realms(realm); 718 rebuild_snap_realms(realm);
713 719
720 if (!first_realm)
721 first_realm = realm;
722 else
723 ceph_put_snap_realm(mdsc, realm);
724
725 if (p < e)
726 goto more;
727
714 /* 728 /*
715 * queue cap snaps _after_ we've built the new snap contexts, 729 * queue cap snaps _after_ we've built the new snap contexts,
716 * so that i_head_snapc can be set appropriately. 730 * so that i_head_snapc can be set appropriately.
@@ -721,12 +735,21 @@ more:
721 queue_realm_cap_snaps(realm); 735 queue_realm_cap_snaps(realm);
722 } 736 }
723 737
738 if (realm_ret)
739 *realm_ret = first_realm;
740 else
741 ceph_put_snap_realm(mdsc, first_realm);
742
724 __cleanup_empty_realms(mdsc); 743 __cleanup_empty_realms(mdsc);
725 return 0; 744 return 0;
726 745
727bad: 746bad:
728 err = -EINVAL; 747 err = -EINVAL;
729fail: 748fail:
749 if (realm && !IS_ERR(realm))
750 ceph_put_snap_realm(mdsc, realm);
751 if (first_realm)
752 ceph_put_snap_realm(mdsc, first_realm);
730 pr_err("update_snap_trace error %d\n", err); 753 pr_err("update_snap_trace error %d\n", err);
731 return err; 754 return err;
732} 755}
@@ -844,7 +867,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
844 if (IS_ERR(realm)) 867 if (IS_ERR(realm))
845 goto out; 868 goto out;
846 } 869 }
847 ceph_get_snap_realm(mdsc, realm);
848 870
849 dout("splitting snap_realm %llx %p\n", realm->ino, realm); 871 dout("splitting snap_realm %llx %p\n", realm->ino, realm);
850 for (i = 0; i < num_split_inos; i++) { 872 for (i = 0; i < num_split_inos; i++) {
@@ -905,7 +927,7 @@ skip_inode:
905 /* we may have taken some of the old realm's children. */ 927 /* we may have taken some of the old realm's children. */
906 for (i = 0; i < num_split_realms; i++) { 928 for (i = 0; i < num_split_realms; i++) {
907 struct ceph_snap_realm *child = 929 struct ceph_snap_realm *child =
908 ceph_lookup_snap_realm(mdsc, 930 __lookup_snap_realm(mdsc,
909 le64_to_cpu(split_realms[i])); 931 le64_to_cpu(split_realms[i]));
910 if (!child) 932 if (!child)
911 continue; 933 continue;
@@ -918,7 +940,7 @@ skip_inode:
918 * snap, we can avoid queueing cap_snaps. 940 * snap, we can avoid queueing cap_snaps.
919 */ 941 */
920 ceph_update_snap_trace(mdsc, p, e, 942 ceph_update_snap_trace(mdsc, p, e,
921 op == CEPH_SNAP_OP_DESTROY); 943 op == CEPH_SNAP_OP_DESTROY, NULL);
922 944
923 if (op == CEPH_SNAP_OP_SPLIT) 945 if (op == CEPH_SNAP_OP_SPLIT)
924 /* we took a reference when we created the realm, above */ 946 /* we took a reference when we created the realm, above */
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 5ae62587a71d..a63997b8bcff 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -414,6 +414,10 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
414 seq_puts(m, ",noshare"); 414 seq_puts(m, ",noshare");
415 if (opt->flags & CEPH_OPT_NOCRC) 415 if (opt->flags & CEPH_OPT_NOCRC)
416 seq_puts(m, ",nocrc"); 416 seq_puts(m, ",nocrc");
417 if (opt->flags & CEPH_OPT_NOMSGAUTH)
418 seq_puts(m, ",nocephx_require_signatures");
419 if ((opt->flags & CEPH_OPT_TCP_NODELAY) == 0)
420 seq_puts(m, ",notcp_nodelay");
417 421
418 if (opt->name) 422 if (opt->name)
419 seq_printf(m, ",name=%s", opt->name); 423 seq_printf(m, ",name=%s", opt->name);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index e1aa32d0759d..04c8124ed30e 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -693,7 +693,8 @@ extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
693extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc, 693extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
694 struct ceph_snap_realm *realm); 694 struct ceph_snap_realm *realm);
695extern int ceph_update_snap_trace(struct ceph_mds_client *m, 695extern int ceph_update_snap_trace(struct ceph_mds_client *m,
696 void *p, void *e, bool deletion); 696 void *p, void *e, bool deletion,
697 struct ceph_snap_realm **realm_ret);
697extern void ceph_handle_snap(struct ceph_mds_client *mdsc, 698extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
698 struct ceph_mds_session *session, 699 struct ceph_mds_session *session,
699 struct ceph_msg *msg); 700 struct ceph_msg *msg);
@@ -892,7 +893,9 @@ extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
892int ceph_uninline_data(struct file *filp, struct page *locked_page); 893int ceph_uninline_data(struct file *filp, struct page *locked_page);
893/* dir.c */ 894/* dir.c */
894extern const struct file_operations ceph_dir_fops; 895extern const struct file_operations ceph_dir_fops;
896extern const struct file_operations ceph_snapdir_fops;
895extern const struct inode_operations ceph_dir_iops; 897extern const struct inode_operations ceph_dir_iops;
898extern const struct inode_operations ceph_snapdir_iops;
896extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops, 899extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
897 ceph_snapdir_dentry_ops; 900 ceph_snapdir_dentry_ops;
898 901
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index c0dadaac26e3..31eb03d0c766 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -158,17 +158,6 @@ enum {
158}; 158};
159 159
160 160
161/* pool operations */
162enum {
163 POOL_OP_CREATE = 0x01,
164 POOL_OP_DELETE = 0x02,
165 POOL_OP_AUID_CHANGE = 0x03,
166 POOL_OP_CREATE_SNAP = 0x11,
167 POOL_OP_DELETE_SNAP = 0x12,
168 POOL_OP_CREATE_UNMANAGED_SNAP = 0x21,
169 POOL_OP_DELETE_UNMANAGED_SNAP = 0x22,
170};
171
172struct ceph_mon_request_header { 161struct ceph_mon_request_header {
173 __le64 have_version; 162 __le64 have_version;
174 __le16 session_mon; 163 __le16 session_mon;
@@ -191,31 +180,6 @@ struct ceph_mon_statfs_reply {
191 struct ceph_statfs st; 180 struct ceph_statfs st;
192} __attribute__ ((packed)); 181} __attribute__ ((packed));
193 182
194const char *ceph_pool_op_name(int op);
195
196struct ceph_mon_poolop {
197 struct ceph_mon_request_header monhdr;
198 struct ceph_fsid fsid;
199 __le32 pool;
200 __le32 op;
201 __le64 auid;
202 __le64 snapid;
203 __le32 name_len;
204} __attribute__ ((packed));
205
206struct ceph_mon_poolop_reply {
207 struct ceph_mon_request_header monhdr;
208 struct ceph_fsid fsid;
209 __le32 reply_code;
210 __le32 epoch;
211 char has_data;
212 char data[0];
213} __attribute__ ((packed));
214
215struct ceph_mon_unmanaged_snap {
216 __le64 snapid;
217} __attribute__ ((packed));
218
219struct ceph_osd_getmap { 183struct ceph_osd_getmap {
220 struct ceph_mon_request_header monhdr; 184 struct ceph_mon_request_header monhdr;
221 struct ceph_fsid fsid; 185 struct ceph_fsid fsid;
@@ -307,6 +271,7 @@ enum {
307 CEPH_SESSION_RECALL_STATE, 271 CEPH_SESSION_RECALL_STATE,
308 CEPH_SESSION_FLUSHMSG, 272 CEPH_SESSION_FLUSHMSG,
309 CEPH_SESSION_FLUSHMSG_ACK, 273 CEPH_SESSION_FLUSHMSG_ACK,
274 CEPH_SESSION_FORCE_RO,
310}; 275};
311 276
312extern const char *ceph_session_op_name(int op); 277extern const char *ceph_session_op_name(int op);
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 8b11a79ca1cb..16fff9608848 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -30,8 +30,9 @@
30#define CEPH_OPT_MYIP (1<<2) /* specified my ip */ 30#define CEPH_OPT_MYIP (1<<2) /* specified my ip */
31#define CEPH_OPT_NOCRC (1<<3) /* no data crc on writes */ 31#define CEPH_OPT_NOCRC (1<<3) /* no data crc on writes */
32#define CEPH_OPT_NOMSGAUTH (1<<4) /* not require cephx message signature */ 32#define CEPH_OPT_NOMSGAUTH (1<<4) /* not require cephx message signature */
33#define CEPH_OPT_TCP_NODELAY (1<<5) /* TCP_NODELAY on TCP sockets */
33 34
34#define CEPH_OPT_DEFAULT (0) 35#define CEPH_OPT_DEFAULT (CEPH_OPT_TCP_NODELAY)
35 36
36#define ceph_set_opt(client, opt) \ 37#define ceph_set_opt(client, opt) \
37 (client)->options->flags |= CEPH_OPT_##opt; 38 (client)->options->flags |= CEPH_OPT_##opt;
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index d9d396c16503..e15499422fdc 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -57,6 +57,7 @@ struct ceph_messenger {
57 57
58 atomic_t stopping; 58 atomic_t stopping;
59 bool nocrc; 59 bool nocrc;
60 bool tcp_nodelay;
60 61
61 /* 62 /*
62 * the global_seq counts connections i (attempt to) initiate 63 * the global_seq counts connections i (attempt to) initiate
@@ -264,7 +265,8 @@ extern void ceph_messenger_init(struct ceph_messenger *msgr,
264 struct ceph_entity_addr *myaddr, 265 struct ceph_entity_addr *myaddr,
265 u64 supported_features, 266 u64 supported_features,
266 u64 required_features, 267 u64 required_features,
267 bool nocrc); 268 bool nocrc,
269 bool tcp_nodelay);
268 270
269extern void ceph_con_init(struct ceph_connection *con, void *private, 271extern void ceph_con_init(struct ceph_connection *con, void *private,
270 const struct ceph_connection_operations *ops, 272 const struct ceph_connection_operations *ops,
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index deb47e45ac7c..81810dc21f06 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -40,7 +40,7 @@ struct ceph_mon_request {
40}; 40};
41 41
42/* 42/*
43 * ceph_mon_generic_request is being used for the statfs, poolop and 43 * ceph_mon_generic_request is being used for the statfs and
44 * mon_get_version requests which are being done a bit differently 44 * mon_get_version requests which are being done a bit differently
45 * because we need to get data back to the caller 45 * because we need to get data back to the caller
46 */ 46 */
@@ -50,7 +50,6 @@ struct ceph_mon_generic_request {
50 struct rb_node node; 50 struct rb_node node;
51 int result; 51 int result;
52 void *buf; 52 void *buf;
53 int buf_len;
54 struct completion completion; 53 struct completion completion;
55 struct ceph_msg *request; /* original request */ 54 struct ceph_msg *request; /* original request */
56 struct ceph_msg *reply; /* and reply */ 55 struct ceph_msg *reply; /* and reply */
@@ -117,10 +116,4 @@ extern int ceph_monc_open_session(struct ceph_mon_client *monc);
117 116
118extern int ceph_monc_validate_auth(struct ceph_mon_client *monc); 117extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
119 118
120extern int ceph_monc_create_snapid(struct ceph_mon_client *monc,
121 u32 pool, u64 *snapid);
122
123extern int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
124 u32 pool, u64 snapid);
125
126#endif 119#endif
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 5d5ab67f516d..ec565508e904 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -239,6 +239,8 @@ enum {
239 Opt_nocrc, 239 Opt_nocrc,
240 Opt_cephx_require_signatures, 240 Opt_cephx_require_signatures,
241 Opt_nocephx_require_signatures, 241 Opt_nocephx_require_signatures,
242 Opt_tcp_nodelay,
243 Opt_notcp_nodelay,
242}; 244};
243 245
244static match_table_t opt_tokens = { 246static match_table_t opt_tokens = {
@@ -259,6 +261,8 @@ static match_table_t opt_tokens = {
259 {Opt_nocrc, "nocrc"}, 261 {Opt_nocrc, "nocrc"},
260 {Opt_cephx_require_signatures, "cephx_require_signatures"}, 262 {Opt_cephx_require_signatures, "cephx_require_signatures"},
261 {Opt_nocephx_require_signatures, "nocephx_require_signatures"}, 263 {Opt_nocephx_require_signatures, "nocephx_require_signatures"},
264 {Opt_tcp_nodelay, "tcp_nodelay"},
265 {Opt_notcp_nodelay, "notcp_nodelay"},
262 {-1, NULL} 266 {-1, NULL}
263}; 267};
264 268
@@ -457,6 +461,7 @@ ceph_parse_options(char *options, const char *dev_name,
457 case Opt_nocrc: 461 case Opt_nocrc:
458 opt->flags |= CEPH_OPT_NOCRC; 462 opt->flags |= CEPH_OPT_NOCRC;
459 break; 463 break;
464
460 case Opt_cephx_require_signatures: 465 case Opt_cephx_require_signatures:
461 opt->flags &= ~CEPH_OPT_NOMSGAUTH; 466 opt->flags &= ~CEPH_OPT_NOMSGAUTH;
462 break; 467 break;
@@ -464,6 +469,13 @@ ceph_parse_options(char *options, const char *dev_name,
464 opt->flags |= CEPH_OPT_NOMSGAUTH; 469 opt->flags |= CEPH_OPT_NOMSGAUTH;
465 break; 470 break;
466 471
472 case Opt_tcp_nodelay:
473 opt->flags |= CEPH_OPT_TCP_NODELAY;
474 break;
475 case Opt_notcp_nodelay:
476 opt->flags &= ~CEPH_OPT_TCP_NODELAY;
477 break;
478
467 default: 479 default:
468 BUG_ON(token); 480 BUG_ON(token);
469 } 481 }
@@ -518,10 +530,12 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
518 /* msgr */ 530 /* msgr */
519 if (ceph_test_opt(client, MYIP)) 531 if (ceph_test_opt(client, MYIP))
520 myaddr = &client->options->my_addr; 532 myaddr = &client->options->my_addr;
533
521 ceph_messenger_init(&client->msgr, myaddr, 534 ceph_messenger_init(&client->msgr, myaddr,
522 client->supported_features, 535 client->supported_features,
523 client->required_features, 536 client->required_features,
524 ceph_test_opt(client, NOCRC)); 537 ceph_test_opt(client, NOCRC),
538 ceph_test_opt(client, TCP_NODELAY));
525 539
526 /* subsystems */ 540 /* subsystems */
527 err = ceph_monc_init(&client->monc, client); 541 err = ceph_monc_init(&client->monc, client);
diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c
index 30560202f57b..139a9cb19b0c 100644
--- a/net/ceph/ceph_strings.c
+++ b/net/ceph/ceph_strings.c
@@ -42,17 +42,3 @@ const char *ceph_osd_state_name(int s)
42 return "???"; 42 return "???";
43 } 43 }
44} 44}
45
46const char *ceph_pool_op_name(int op)
47{
48 switch (op) {
49 case POOL_OP_CREATE: return "create";
50 case POOL_OP_DELETE: return "delete";
51 case POOL_OP_AUID_CHANGE: return "auid change";
52 case POOL_OP_CREATE_SNAP: return "create snap";
53 case POOL_OP_DELETE_SNAP: return "delete snap";
54 case POOL_OP_CREATE_UNMANAGED_SNAP: return "create unmanaged snap";
55 case POOL_OP_DELETE_UNMANAGED_SNAP: return "delete unmanaged snap";
56 }
57 return "???";
58}
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index d2d525529f87..14d9995097cc 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -127,8 +127,6 @@ static int monc_show(struct seq_file *s, void *p)
127 op = le16_to_cpu(req->request->hdr.type); 127 op = le16_to_cpu(req->request->hdr.type);
128 if (op == CEPH_MSG_STATFS) 128 if (op == CEPH_MSG_STATFS)
129 seq_printf(s, "%llu statfs\n", req->tid); 129 seq_printf(s, "%llu statfs\n", req->tid);
130 else if (op == CEPH_MSG_POOLOP)
131 seq_printf(s, "%llu poolop\n", req->tid);
132 else if (op == CEPH_MSG_MON_GET_VERSION) 130 else if (op == CEPH_MSG_MON_GET_VERSION)
133 seq_printf(s, "%llu mon_get_version", req->tid); 131 seq_printf(s, "%llu mon_get_version", req->tid);
134 else 132 else
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 33a2f201e460..6b3f54ed65ba 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -510,6 +510,16 @@ static int ceph_tcp_connect(struct ceph_connection *con)
510 return ret; 510 return ret;
511 } 511 }
512 512
513 if (con->msgr->tcp_nodelay) {
514 int optval = 1;
515
516 ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY,
517 (char *)&optval, sizeof(optval));
518 if (ret)
519 pr_err("kernel_setsockopt(TCP_NODELAY) failed: %d",
520 ret);
521 }
522
513 sk_set_memalloc(sock->sk); 523 sk_set_memalloc(sock->sk);
514 524
515 con->sock = sock; 525 con->sock = sock;
@@ -2922,7 +2932,8 @@ void ceph_messenger_init(struct ceph_messenger *msgr,
2922 struct ceph_entity_addr *myaddr, 2932 struct ceph_entity_addr *myaddr,
2923 u64 supported_features, 2933 u64 supported_features,
2924 u64 required_features, 2934 u64 required_features,
2925 bool nocrc) 2935 bool nocrc,
2936 bool tcp_nodelay)
2926{ 2937{
2927 msgr->supported_features = supported_features; 2938 msgr->supported_features = supported_features;
2928 msgr->required_features = required_features; 2939 msgr->required_features = required_features;
@@ -2937,6 +2948,7 @@ void ceph_messenger_init(struct ceph_messenger *msgr,
2937 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); 2948 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
2938 encode_my_addr(msgr); 2949 encode_my_addr(msgr);
2939 msgr->nocrc = nocrc; 2950 msgr->nocrc = nocrc;
2951 msgr->tcp_nodelay = tcp_nodelay;
2940 2952
2941 atomic_set(&msgr->stopping, 0); 2953 atomic_set(&msgr->stopping, 0);
2942 2954
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index f2148e22b148..2b3cf05e87b0 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -410,7 +410,7 @@ out_unlocked:
410} 410}
411 411
412/* 412/*
413 * generic requests (e.g., statfs, poolop) 413 * generic requests (currently statfs, mon_get_version)
414 */ 414 */
415static struct ceph_mon_generic_request *__lookup_generic_req( 415static struct ceph_mon_generic_request *__lookup_generic_req(
416 struct ceph_mon_client *monc, u64 tid) 416 struct ceph_mon_client *monc, u64 tid)
@@ -569,7 +569,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
569 return; 569 return;
570 570
571bad: 571bad:
572 pr_err("corrupt generic reply, tid %llu\n", tid); 572 pr_err("corrupt statfs reply, tid %llu\n", tid);
573 ceph_msg_dump(msg); 573 ceph_msg_dump(msg);
574} 574}
575 575
@@ -588,7 +588,6 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
588 588
589 kref_init(&req->kref); 589 kref_init(&req->kref);
590 req->buf = buf; 590 req->buf = buf;
591 req->buf_len = sizeof(*buf);
592 init_completion(&req->completion); 591 init_completion(&req->completion);
593 592
594 err = -ENOMEM; 593 err = -ENOMEM;
@@ -611,7 +610,7 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
611 err = do_generic_request(monc, req); 610 err = do_generic_request(monc, req);
612 611
613out: 612out:
614 kref_put(&req->kref, release_generic_request); 613 put_generic_request(req);
615 return err; 614 return err;
616} 615}
617EXPORT_SYMBOL(ceph_monc_do_statfs); 616EXPORT_SYMBOL(ceph_monc_do_statfs);
@@ -647,7 +646,7 @@ static void handle_get_version_reply(struct ceph_mon_client *monc,
647 646
648 return; 647 return;
649bad: 648bad:
650 pr_err("corrupt mon_get_version reply\n"); 649 pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
651 ceph_msg_dump(msg); 650 ceph_msg_dump(msg);
652} 651}
653 652
@@ -670,7 +669,6 @@ int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
670 669
671 kref_init(&req->kref); 670 kref_init(&req->kref);
672 req->buf = newest; 671 req->buf = newest;
673 req->buf_len = sizeof(*newest);
674 init_completion(&req->completion); 672 init_completion(&req->completion);
675 673
676 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, 674 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
@@ -701,134 +699,12 @@ int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
701 699
702 mutex_unlock(&monc->mutex); 700 mutex_unlock(&monc->mutex);
703out: 701out:
704 kref_put(&req->kref, release_generic_request); 702 put_generic_request(req);
705 return err; 703 return err;
706} 704}
707EXPORT_SYMBOL(ceph_monc_do_get_version); 705EXPORT_SYMBOL(ceph_monc_do_get_version);
708 706
709/* 707/*
710 * pool ops
711 */
712static int get_poolop_reply_buf(const char *src, size_t src_len,
713 char *dst, size_t dst_len)
714{
715 u32 buf_len;
716
717 if (src_len != sizeof(u32) + dst_len)
718 return -EINVAL;
719
720 buf_len = le32_to_cpu(*(__le32 *)src);
721 if (buf_len != dst_len)
722 return -EINVAL;
723
724 memcpy(dst, src + sizeof(u32), dst_len);
725 return 0;
726}
727
728static void handle_poolop_reply(struct ceph_mon_client *monc,
729 struct ceph_msg *msg)
730{
731 struct ceph_mon_generic_request *req;
732 struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
733 u64 tid = le64_to_cpu(msg->hdr.tid);
734
735 if (msg->front.iov_len < sizeof(*reply))
736 goto bad;
737 dout("handle_poolop_reply %p tid %llu\n", msg, tid);
738
739 mutex_lock(&monc->mutex);
740 req = __lookup_generic_req(monc, tid);
741 if (req) {
742 if (req->buf_len &&
743 get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
744 msg->front.iov_len - sizeof(*reply),
745 req->buf, req->buf_len) < 0) {
746 mutex_unlock(&monc->mutex);
747 goto bad;
748 }
749 req->result = le32_to_cpu(reply->reply_code);
750 get_generic_request(req);
751 }
752 mutex_unlock(&monc->mutex);
753 if (req) {
754 complete(&req->completion);
755 put_generic_request(req);
756 }
757 return;
758
759bad:
760 pr_err("corrupt generic reply, tid %llu\n", tid);
761 ceph_msg_dump(msg);
762}
763
764/*
765 * Do a synchronous pool op.
766 */
767static int do_poolop(struct ceph_mon_client *monc, u32 op,
768 u32 pool, u64 snapid,
769 char *buf, int len)
770{
771 struct ceph_mon_generic_request *req;
772 struct ceph_mon_poolop *h;
773 int err;
774
775 req = kzalloc(sizeof(*req), GFP_NOFS);
776 if (!req)
777 return -ENOMEM;
778
779 kref_init(&req->kref);
780 req->buf = buf;
781 req->buf_len = len;
782 init_completion(&req->completion);
783
784 err = -ENOMEM;
785 req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS,
786 true);
787 if (!req->request)
788 goto out;
789 req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS,
790 true);
791 if (!req->reply)
792 goto out;
793
794 /* fill out request */
795 req->request->hdr.version = cpu_to_le16(2);
796 h = req->request->front.iov_base;
797 h->monhdr.have_version = 0;
798 h->monhdr.session_mon = cpu_to_le16(-1);
799 h->monhdr.session_mon_tid = 0;
800 h->fsid = monc->monmap->fsid;
801 h->pool = cpu_to_le32(pool);
802 h->op = cpu_to_le32(op);
803 h->auid = 0;
804 h->snapid = cpu_to_le64(snapid);
805 h->name_len = 0;
806
807 err = do_generic_request(monc, req);
808
809out:
810 kref_put(&req->kref, release_generic_request);
811 return err;
812}
813
814int ceph_monc_create_snapid(struct ceph_mon_client *monc,
815 u32 pool, u64 *snapid)
816{
817 return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
818 pool, 0, (char *)snapid, sizeof(*snapid));
819
820}
821EXPORT_SYMBOL(ceph_monc_create_snapid);
822
823int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
824 u32 pool, u64 snapid)
825{
826 return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
827 pool, snapid, NULL, 0);
828
829}
830
831/*
832 * Resend pending generic requests. 708 * Resend pending generic requests.
833 */ 709 */
834static void __resend_generic_request(struct ceph_mon_client *monc) 710static void __resend_generic_request(struct ceph_mon_client *monc)
@@ -1112,10 +988,6 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1112 handle_get_version_reply(monc, msg); 988 handle_get_version_reply(monc, msg);
1113 break; 989 break;
1114 990
1115 case CEPH_MSG_POOLOP_REPLY:
1116 handle_poolop_reply(monc, msg);
1117 break;
1118
1119 case CEPH_MSG_MON_MAP: 991 case CEPH_MSG_MON_MAP:
1120 ceph_monc_handle_map(monc, msg); 992 ceph_monc_handle_map(monc, msg);
1121 break; 993 break;
@@ -1154,7 +1026,6 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1154 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1026 case CEPH_MSG_MON_SUBSCRIBE_ACK:
1155 m = ceph_msg_get(monc->m_subscribe_ack); 1027 m = ceph_msg_get(monc->m_subscribe_ack);
1156 break; 1028 break;
1157 case CEPH_MSG_POOLOP_REPLY:
1158 case CEPH_MSG_STATFS_REPLY: 1029 case CEPH_MSG_STATFS_REPLY:
1159 return get_generic_reply(con, hdr, skip); 1030 return get_generic_reply(con, hdr, skip);
1160 case CEPH_MSG_AUTH_REPLY: 1031 case CEPH_MSG_AUTH_REPLY:
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 53299c7b0ca4..41a4abc7e98e 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1035,10 +1035,11 @@ static void put_osd(struct ceph_osd *osd)
1035{ 1035{
1036 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 1036 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
1037 atomic_read(&osd->o_ref) - 1); 1037 atomic_read(&osd->o_ref) - 1);
1038 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { 1038 if (atomic_dec_and_test(&osd->o_ref)) {
1039 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 1039 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
1040 1040
1041 ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer); 1041 if (osd->o_auth.authorizer)
1042 ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer);
1042 kfree(osd); 1043 kfree(osd);
1043 } 1044 }
1044} 1045}
@@ -1048,14 +1049,24 @@ static void put_osd(struct ceph_osd *osd)
1048 */ 1049 */
1049static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 1050static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1050{ 1051{
1051 dout("__remove_osd %p\n", osd); 1052 dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
1052 WARN_ON(!list_empty(&osd->o_requests)); 1053 WARN_ON(!list_empty(&osd->o_requests));
1053 WARN_ON(!list_empty(&osd->o_linger_requests)); 1054 WARN_ON(!list_empty(&osd->o_linger_requests));
1054 1055
1055 rb_erase(&osd->o_node, &osdc->osds);
1056 list_del_init(&osd->o_osd_lru); 1056 list_del_init(&osd->o_osd_lru);
1057 ceph_con_close(&osd->o_con); 1057 rb_erase(&osd->o_node, &osdc->osds);
1058 put_osd(osd); 1058 RB_CLEAR_NODE(&osd->o_node);
1059}
1060
1061static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1062{
1063 dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
1064
1065 if (!RB_EMPTY_NODE(&osd->o_node)) {
1066 ceph_con_close(&osd->o_con);
1067 __remove_osd(osdc, osd);
1068 put_osd(osd);
1069 }
1059} 1070}
1060 1071
1061static void remove_all_osds(struct ceph_osd_client *osdc) 1072static void remove_all_osds(struct ceph_osd_client *osdc)
@@ -1065,7 +1076,7 @@ static void remove_all_osds(struct ceph_osd_client *osdc)
1065 while (!RB_EMPTY_ROOT(&osdc->osds)) { 1076 while (!RB_EMPTY_ROOT(&osdc->osds)) {
1066 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 1077 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
1067 struct ceph_osd, o_node); 1078 struct ceph_osd, o_node);
1068 __remove_osd(osdc, osd); 1079 remove_osd(osdc, osd);
1069 } 1080 }
1070 mutex_unlock(&osdc->request_mutex); 1081 mutex_unlock(&osdc->request_mutex);
1071} 1082}
@@ -1106,7 +1117,7 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
1106 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 1117 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
1107 if (time_before(jiffies, osd->lru_ttl)) 1118 if (time_before(jiffies, osd->lru_ttl))
1108 break; 1119 break;
1109 __remove_osd(osdc, osd); 1120 remove_osd(osdc, osd);
1110 } 1121 }
1111 mutex_unlock(&osdc->request_mutex); 1122 mutex_unlock(&osdc->request_mutex);
1112} 1123}
@@ -1121,8 +1132,7 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1121 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 1132 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
1122 if (list_empty(&osd->o_requests) && 1133 if (list_empty(&osd->o_requests) &&
1123 list_empty(&osd->o_linger_requests)) { 1134 list_empty(&osd->o_linger_requests)) {
1124 __remove_osd(osdc, osd); 1135 remove_osd(osdc, osd);
1125
1126 return -ENODEV; 1136 return -ENODEV;
1127 } 1137 }
1128 1138
@@ -1926,6 +1936,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
1926{ 1936{
1927 struct rb_node *p, *n; 1937 struct rb_node *p, *n;
1928 1938
1939 dout("%s %p\n", __func__, osdc);
1929 for (p = rb_first(&osdc->osds); p; p = n) { 1940 for (p = rb_first(&osdc->osds); p; p = n) {
1930 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); 1941 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1931 1942