aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--drivers/block/rbd.c174
-rw-r--r--fs/ceph/addr.c88
-rw-r--r--fs/ceph/caps.c102
-rw-r--r--fs/ceph/file.c4
-rw-r--r--fs/ceph/inode.c14
-rw-r--r--fs/ceph/locks.c2
-rw-r--r--fs/ceph/mds_client.c6
-rw-r--r--fs/ceph/mdsmap.c42
-rw-r--r--fs/ceph/super.c2
-rw-r--r--fs/ceph/super.h4
-rw-r--r--fs/ceph/xattr.c9
-rw-r--r--include/linux/ceph/decode.h5
-rw-r--r--include/linux/ceph/osd_client.h1
-rw-r--r--net/ceph/auth_none.c6
-rw-r--r--net/ceph/osd_client.c63
15 files changed, 277 insertions, 245 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index aff789d6fccd..4ad2ad9a5bb0 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -372,7 +372,7 @@ enum rbd_dev_flags {
372 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ 372 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
373}; 373};
374 374
375static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ 375static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
376 376
377static LIST_HEAD(rbd_dev_list); /* devices */ 377static LIST_HEAD(rbd_dev_list); /* devices */
378static DEFINE_SPINLOCK(rbd_dev_list_lock); 378static DEFINE_SPINLOCK(rbd_dev_list_lock);
@@ -489,10 +489,8 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
489 if (removing) 489 if (removing)
490 return -ENOENT; 490 return -ENOENT;
491 491
492 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
493 (void) get_device(&rbd_dev->dev); 492 (void) get_device(&rbd_dev->dev);
494 set_device_ro(bdev, rbd_dev->mapping.read_only); 493 set_device_ro(bdev, rbd_dev->mapping.read_only);
495 mutex_unlock(&ctl_mutex);
496 494
497 return 0; 495 return 0;
498} 496}
@@ -507,9 +505,7 @@ static void rbd_release(struct gendisk *disk, fmode_t mode)
507 spin_unlock_irq(&rbd_dev->lock); 505 spin_unlock_irq(&rbd_dev->lock);
508 rbd_assert(open_count_before > 0); 506 rbd_assert(open_count_before > 0);
509 507
510 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
511 put_device(&rbd_dev->dev); 508 put_device(&rbd_dev->dev);
512 mutex_unlock(&ctl_mutex);
513} 509}
514 510
515static const struct block_device_operations rbd_bd_ops = { 511static const struct block_device_operations rbd_bd_ops = {
@@ -520,7 +516,7 @@ static const struct block_device_operations rbd_bd_ops = {
520 516
521/* 517/*
522 * Initialize an rbd client instance. Success or not, this function 518 * Initialize an rbd client instance. Success or not, this function
523 * consumes ceph_opts. 519 * consumes ceph_opts. Caller holds client_mutex.
524 */ 520 */
525static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) 521static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
526{ 522{
@@ -535,30 +531,25 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
535 kref_init(&rbdc->kref); 531 kref_init(&rbdc->kref);
536 INIT_LIST_HEAD(&rbdc->node); 532 INIT_LIST_HEAD(&rbdc->node);
537 533
538 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
539
540 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0); 534 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
541 if (IS_ERR(rbdc->client)) 535 if (IS_ERR(rbdc->client))
542 goto out_mutex; 536 goto out_rbdc;
543 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ 537 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
544 538
545 ret = ceph_open_session(rbdc->client); 539 ret = ceph_open_session(rbdc->client);
546 if (ret < 0) 540 if (ret < 0)
547 goto out_err; 541 goto out_client;
548 542
549 spin_lock(&rbd_client_list_lock); 543 spin_lock(&rbd_client_list_lock);
550 list_add_tail(&rbdc->node, &rbd_client_list); 544 list_add_tail(&rbdc->node, &rbd_client_list);
551 spin_unlock(&rbd_client_list_lock); 545 spin_unlock(&rbd_client_list_lock);
552 546
553 mutex_unlock(&ctl_mutex);
554 dout("%s: rbdc %p\n", __func__, rbdc); 547 dout("%s: rbdc %p\n", __func__, rbdc);
555 548
556 return rbdc; 549 return rbdc;
557 550out_client:
558out_err:
559 ceph_destroy_client(rbdc->client); 551 ceph_destroy_client(rbdc->client);
560out_mutex: 552out_rbdc:
561 mutex_unlock(&ctl_mutex);
562 kfree(rbdc); 553 kfree(rbdc);
563out_opt: 554out_opt:
564 if (ceph_opts) 555 if (ceph_opts)
@@ -682,11 +673,13 @@ static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
682{ 673{
683 struct rbd_client *rbdc; 674 struct rbd_client *rbdc;
684 675
676 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
685 rbdc = rbd_client_find(ceph_opts); 677 rbdc = rbd_client_find(ceph_opts);
686 if (rbdc) /* using an existing client */ 678 if (rbdc) /* using an existing client */
687 ceph_destroy_options(ceph_opts); 679 ceph_destroy_options(ceph_opts);
688 else 680 else
689 rbdc = rbd_client_create(ceph_opts); 681 rbdc = rbd_client_create(ceph_opts);
682 mutex_unlock(&client_mutex);
690 683
691 return rbdc; 684 return rbdc;
692} 685}
@@ -840,7 +833,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
840 833
841 /* We won't fail any more, fill in the header */ 834 /* We won't fail any more, fill in the header */
842 835
843 down_write(&rbd_dev->header_rwsem);
844 if (first_time) { 836 if (first_time) {
845 header->object_prefix = object_prefix; 837 header->object_prefix = object_prefix;
846 header->obj_order = ondisk->options.order; 838 header->obj_order = ondisk->options.order;
@@ -869,8 +861,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
869 if (rbd_dev->mapping.size != header->image_size) 861 if (rbd_dev->mapping.size != header->image_size)
870 rbd_dev->mapping.size = header->image_size; 862 rbd_dev->mapping.size = header->image_size;
871 863
872 up_write(&rbd_dev->header_rwsem);
873
874 return 0; 864 return 0;
875out_2big: 865out_2big:
876 ret = -EIO; 866 ret = -EIO;
@@ -1126,6 +1116,7 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
1126 buf = bvec_kmap_irq(bv, &flags); 1116 buf = bvec_kmap_irq(bv, &flags);
1127 memset(buf + remainder, 0, 1117 memset(buf + remainder, 0,
1128 bv->bv_len - remainder); 1118 bv->bv_len - remainder);
1119 flush_dcache_page(bv->bv_page);
1129 bvec_kunmap_irq(buf, &flags); 1120 bvec_kunmap_irq(buf, &flags);
1130 } 1121 }
1131 pos += bv->bv_len; 1122 pos += bv->bv_len;
@@ -1153,11 +1144,12 @@ static void zero_pages(struct page **pages, u64 offset, u64 end)
1153 unsigned long flags; 1144 unsigned long flags;
1154 void *kaddr; 1145 void *kaddr;
1155 1146
1156 page_offset = (size_t)(offset & ~PAGE_MASK); 1147 page_offset = offset & ~PAGE_MASK;
1157 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset)); 1148 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1158 local_irq_save(flags); 1149 local_irq_save(flags);
1159 kaddr = kmap_atomic(*page); 1150 kaddr = kmap_atomic(*page);
1160 memset(kaddr + page_offset, 0, length); 1151 memset(kaddr + page_offset, 0, length);
1152 flush_dcache_page(*page);
1161 kunmap_atomic(kaddr); 1153 kunmap_atomic(kaddr);
1162 local_irq_restore(flags); 1154 local_irq_restore(flags);
1163 1155
@@ -2171,9 +2163,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
2171 struct rbd_obj_request *obj_request = NULL; 2163 struct rbd_obj_request *obj_request = NULL;
2172 struct rbd_obj_request *next_obj_request; 2164 struct rbd_obj_request *next_obj_request;
2173 bool write_request = img_request_write_test(img_request); 2165 bool write_request = img_request_write_test(img_request);
2174 struct bio *bio_list; 2166 struct bio *bio_list = 0;
2175 unsigned int bio_offset = 0; 2167 unsigned int bio_offset = 0;
2176 struct page **pages; 2168 struct page **pages = 0;
2177 u64 img_offset; 2169 u64 img_offset;
2178 u64 resid; 2170 u64 resid;
2179 u16 opcode; 2171 u16 opcode;
@@ -2535,6 +2527,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2535 */ 2527 */
2536 orig_request = obj_request->obj_request; 2528 orig_request = obj_request->obj_request;
2537 obj_request->obj_request = NULL; 2529 obj_request->obj_request = NULL;
2530 rbd_obj_request_put(orig_request);
2538 rbd_assert(orig_request); 2531 rbd_assert(orig_request);
2539 rbd_assert(orig_request->img_request); 2532 rbd_assert(orig_request->img_request);
2540 2533
@@ -2555,7 +2548,6 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2555 if (!rbd_dev->parent_overlap) { 2548 if (!rbd_dev->parent_overlap) {
2556 struct ceph_osd_client *osdc; 2549 struct ceph_osd_client *osdc;
2557 2550
2558 rbd_obj_request_put(orig_request);
2559 osdc = &rbd_dev->rbd_client->client->osdc; 2551 osdc = &rbd_dev->rbd_client->client->osdc;
2560 result = rbd_obj_request_submit(osdc, orig_request); 2552 result = rbd_obj_request_submit(osdc, orig_request);
2561 if (!result) 2553 if (!result)
@@ -2585,7 +2577,6 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2585out: 2577out:
2586 if (orig_request->result) 2578 if (orig_request->result)
2587 rbd_obj_request_complete(orig_request); 2579 rbd_obj_request_complete(orig_request);
2588 rbd_obj_request_put(orig_request);
2589} 2580}
2590 2581
2591static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) 2582static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
@@ -2859,7 +2850,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2859 (unsigned int)opcode); 2850 (unsigned int)opcode);
2860 ret = rbd_dev_refresh(rbd_dev); 2851 ret = rbd_dev_refresh(rbd_dev);
2861 if (ret) 2852 if (ret)
2862 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret); 2853 rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
2863 2854
2864 rbd_obj_notify_ack(rbd_dev, notify_id); 2855 rbd_obj_notify_ack(rbd_dev, notify_id);
2865} 2856}
@@ -3339,8 +3330,8 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3339 int ret; 3330 int ret;
3340 3331
3341 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 3332 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3333 down_write(&rbd_dev->header_rwsem);
3342 mapping_size = rbd_dev->mapping.size; 3334 mapping_size = rbd_dev->mapping.size;
3343 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3344 if (rbd_dev->image_format == 1) 3335 if (rbd_dev->image_format == 1)
3345 ret = rbd_dev_v1_header_info(rbd_dev); 3336 ret = rbd_dev_v1_header_info(rbd_dev);
3346 else 3337 else
@@ -3349,7 +3340,8 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3349 /* If it's a mapped snapshot, validate its EXISTS flag */ 3340 /* If it's a mapped snapshot, validate its EXISTS flag */
3350 3341
3351 rbd_exists_validate(rbd_dev); 3342 rbd_exists_validate(rbd_dev);
3352 mutex_unlock(&ctl_mutex); 3343 up_write(&rbd_dev->header_rwsem);
3344
3353 if (mapping_size != rbd_dev->mapping.size) { 3345 if (mapping_size != rbd_dev->mapping.size) {
3354 sector_t size; 3346 sector_t size;
3355 3347
@@ -3813,6 +3805,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3813 void *end; 3805 void *end;
3814 u64 pool_id; 3806 u64 pool_id;
3815 char *image_id; 3807 char *image_id;
3808 u64 snap_id;
3816 u64 overlap; 3809 u64 overlap;
3817 int ret; 3810 int ret;
3818 3811
@@ -3872,24 +3865,56 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3872 (unsigned long long)pool_id, U32_MAX); 3865 (unsigned long long)pool_id, U32_MAX);
3873 goto out_err; 3866 goto out_err;
3874 } 3867 }
3875 parent_spec->pool_id = pool_id;
3876 3868
3877 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 3869 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3878 if (IS_ERR(image_id)) { 3870 if (IS_ERR(image_id)) {
3879 ret = PTR_ERR(image_id); 3871 ret = PTR_ERR(image_id);
3880 goto out_err; 3872 goto out_err;
3881 } 3873 }
3882 parent_spec->image_id = image_id; 3874 ceph_decode_64_safe(&p, end, snap_id, out_err);
3883 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3884 ceph_decode_64_safe(&p, end, overlap, out_err); 3875 ceph_decode_64_safe(&p, end, overlap, out_err);
3885 3876
3886 if (overlap) { 3877 /*
3887 rbd_spec_put(rbd_dev->parent_spec); 3878 * The parent won't change (except when the clone is
3879 * flattened, already handled that). So we only need to
3880 * record the parent spec we have not already done so.
3881 */
3882 if (!rbd_dev->parent_spec) {
3883 parent_spec->pool_id = pool_id;
3884 parent_spec->image_id = image_id;
3885 parent_spec->snap_id = snap_id;
3888 rbd_dev->parent_spec = parent_spec; 3886 rbd_dev->parent_spec = parent_spec;
3889 parent_spec = NULL; /* rbd_dev now owns this */ 3887 parent_spec = NULL; /* rbd_dev now owns this */
3890 rbd_dev->parent_overlap = overlap; 3888 }
3891 } else { 3889
3892 rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n"); 3890 /*
3891 * We always update the parent overlap. If it's zero we
3892 * treat it specially.
3893 */
3894 rbd_dev->parent_overlap = overlap;
3895 smp_mb();
3896 if (!overlap) {
3897
3898 /* A null parent_spec indicates it's the initial probe */
3899
3900 if (parent_spec) {
3901 /*
3902 * The overlap has become zero, so the clone
3903 * must have been resized down to 0 at some
3904 * point. Treat this the same as a flatten.
3905 */
3906 rbd_dev_parent_put(rbd_dev);
3907 pr_info("%s: clone image now standalone\n",
3908 rbd_dev->disk->disk_name);
3909 } else {
3910 /*
3911 * For the initial probe, if we find the
3912 * overlap is zero we just pretend there was
3913 * no parent image.
3914 */
3915 rbd_warn(rbd_dev, "ignoring parent of "
3916 "clone with overlap 0\n");
3917 }
3893 } 3918 }
3894out: 3919out:
3895 ret = 0; 3920 ret = 0;
@@ -4245,16 +4270,14 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4245 bool first_time = rbd_dev->header.object_prefix == NULL; 4270 bool first_time = rbd_dev->header.object_prefix == NULL;
4246 int ret; 4271 int ret;
4247 4272
4248 down_write(&rbd_dev->header_rwsem);
4249
4250 ret = rbd_dev_v2_image_size(rbd_dev); 4273 ret = rbd_dev_v2_image_size(rbd_dev);
4251 if (ret) 4274 if (ret)
4252 goto out; 4275 return ret;
4253 4276
4254 if (first_time) { 4277 if (first_time) {
4255 ret = rbd_dev_v2_header_onetime(rbd_dev); 4278 ret = rbd_dev_v2_header_onetime(rbd_dev);
4256 if (ret) 4279 if (ret)
4257 goto out; 4280 return ret;
4258 } 4281 }
4259 4282
4260 /* 4283 /*
@@ -4269,7 +4292,7 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4269 4292
4270 ret = rbd_dev_v2_parent_info(rbd_dev); 4293 ret = rbd_dev_v2_parent_info(rbd_dev);
4271 if (ret) 4294 if (ret)
4272 goto out; 4295 return ret;
4273 4296
4274 /* 4297 /*
4275 * Print a warning if this is the initial probe and 4298 * Print a warning if this is the initial probe and
@@ -4290,8 +4313,6 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4290 4313
4291 ret = rbd_dev_v2_snap_context(rbd_dev); 4314 ret = rbd_dev_v2_snap_context(rbd_dev);
4292 dout("rbd_dev_v2_snap_context returned %d\n", ret); 4315 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4293out:
4294 up_write(&rbd_dev->header_rwsem);
4295 4316
4296 return ret; 4317 return ret;
4297} 4318}
@@ -4301,8 +4322,6 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4301 struct device *dev; 4322 struct device *dev;
4302 int ret; 4323 int ret;
4303 4324
4304 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4305
4306 dev = &rbd_dev->dev; 4325 dev = &rbd_dev->dev;
4307 dev->bus = &rbd_bus_type; 4326 dev->bus = &rbd_bus_type;
4308 dev->type = &rbd_device_type; 4327 dev->type = &rbd_device_type;
@@ -4311,8 +4330,6 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4311 dev_set_name(dev, "%d", rbd_dev->dev_id); 4330 dev_set_name(dev, "%d", rbd_dev->dev_id);
4312 ret = device_register(dev); 4331 ret = device_register(dev);
4313 4332
4314 mutex_unlock(&ctl_mutex);
4315
4316 return ret; 4333 return ret;
4317} 4334}
4318 4335
@@ -5059,23 +5076,6 @@ err_out_module:
5059 return (ssize_t)rc; 5076 return (ssize_t)rc;
5060} 5077}
5061 5078
5062static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
5063{
5064 struct list_head *tmp;
5065 struct rbd_device *rbd_dev;
5066
5067 spin_lock(&rbd_dev_list_lock);
5068 list_for_each(tmp, &rbd_dev_list) {
5069 rbd_dev = list_entry(tmp, struct rbd_device, node);
5070 if (rbd_dev->dev_id == dev_id) {
5071 spin_unlock(&rbd_dev_list_lock);
5072 return rbd_dev;
5073 }
5074 }
5075 spin_unlock(&rbd_dev_list_lock);
5076 return NULL;
5077}
5078
5079static void rbd_dev_device_release(struct device *dev) 5079static void rbd_dev_device_release(struct device *dev)
5080{ 5080{
5081 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5081 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
@@ -5120,8 +5120,10 @@ static ssize_t rbd_remove(struct bus_type *bus,
5120 size_t count) 5120 size_t count)
5121{ 5121{
5122 struct rbd_device *rbd_dev = NULL; 5122 struct rbd_device *rbd_dev = NULL;
5123 int target_id; 5123 struct list_head *tmp;
5124 int dev_id;
5124 unsigned long ul; 5125 unsigned long ul;
5126 bool already = false;
5125 int ret; 5127 int ret;
5126 5128
5127 ret = strict_strtoul(buf, 10, &ul); 5129 ret = strict_strtoul(buf, 10, &ul);
@@ -5129,37 +5131,40 @@ static ssize_t rbd_remove(struct bus_type *bus,
5129 return ret; 5131 return ret;
5130 5132
5131 /* convert to int; abort if we lost anything in the conversion */ 5133 /* convert to int; abort if we lost anything in the conversion */
5132 target_id = (int) ul; 5134 dev_id = (int)ul;
5133 if (target_id != ul) 5135 if (dev_id != ul)
5134 return -EINVAL; 5136 return -EINVAL;
5135 5137
5136 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 5138 ret = -ENOENT;
5137 5139 spin_lock(&rbd_dev_list_lock);
5138 rbd_dev = __rbd_get_dev(target_id); 5140 list_for_each(tmp, &rbd_dev_list) {
5139 if (!rbd_dev) { 5141 rbd_dev = list_entry(tmp, struct rbd_device, node);
5140 ret = -ENOENT; 5142 if (rbd_dev->dev_id == dev_id) {
5141 goto done; 5143 ret = 0;
5144 break;
5145 }
5146 }
5147 if (!ret) {
5148 spin_lock_irq(&rbd_dev->lock);
5149 if (rbd_dev->open_count)
5150 ret = -EBUSY;
5151 else
5152 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5153 &rbd_dev->flags);
5154 spin_unlock_irq(&rbd_dev->lock);
5142 } 5155 }
5156 spin_unlock(&rbd_dev_list_lock);
5157 if (ret < 0 || already)
5158 return ret;
5143 5159
5144 spin_lock_irq(&rbd_dev->lock);
5145 if (rbd_dev->open_count)
5146 ret = -EBUSY;
5147 else
5148 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5149 spin_unlock_irq(&rbd_dev->lock);
5150 if (ret < 0)
5151 goto done;
5152 rbd_bus_del_dev(rbd_dev); 5160 rbd_bus_del_dev(rbd_dev);
5153 ret = rbd_dev_header_watch_sync(rbd_dev, false); 5161 ret = rbd_dev_header_watch_sync(rbd_dev, false);
5154 if (ret) 5162 if (ret)
5155 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret); 5163 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
5156 rbd_dev_image_release(rbd_dev); 5164 rbd_dev_image_release(rbd_dev);
5157 module_put(THIS_MODULE); 5165 module_put(THIS_MODULE);
5158 ret = count;
5159done:
5160 mutex_unlock(&ctl_mutex);
5161 5166
5162 return ret; 5167 return count;
5163} 5168}
5164 5169
5165/* 5170/*
@@ -5267,6 +5272,7 @@ static void __exit rbd_exit(void)
5267module_init(rbd_init); 5272module_init(rbd_init);
5268module_exit(rbd_exit); 5273module_exit(rbd_exit);
5269 5274
5275MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
5270MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); 5276MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5271MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); 5277MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5272MODULE_DESCRIPTION("rados block device"); 5278MODULE_DESCRIPTION("rados block device");
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 38b5c1bc6776..5318a3b704f6 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -439,13 +439,12 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
439 struct ceph_inode_info *ci; 439 struct ceph_inode_info *ci;
440 struct ceph_fs_client *fsc; 440 struct ceph_fs_client *fsc;
441 struct ceph_osd_client *osdc; 441 struct ceph_osd_client *osdc;
442 loff_t page_off = page_offset(page);
443 int len = PAGE_CACHE_SIZE;
444 loff_t i_size;
445 int err = 0;
446 struct ceph_snap_context *snapc, *oldest; 442 struct ceph_snap_context *snapc, *oldest;
447 u64 snap_size = 0; 443 loff_t page_off = page_offset(page);
448 long writeback_stat; 444 long writeback_stat;
445 u64 truncate_size, snap_size = 0;
446 u32 truncate_seq;
447 int err = 0, len = PAGE_CACHE_SIZE;
449 448
450 dout("writepage %p idx %lu\n", page, page->index); 449 dout("writepage %p idx %lu\n", page, page->index);
451 450
@@ -475,13 +474,20 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
475 } 474 }
476 ceph_put_snap_context(oldest); 475 ceph_put_snap_context(oldest);
477 476
477 spin_lock(&ci->i_ceph_lock);
478 truncate_seq = ci->i_truncate_seq;
479 truncate_size = ci->i_truncate_size;
480 if (!snap_size)
481 snap_size = i_size_read(inode);
482 spin_unlock(&ci->i_ceph_lock);
483
478 /* is this a partial page at end of file? */ 484 /* is this a partial page at end of file? */
479 if (snap_size) 485 if (page_off >= snap_size) {
480 i_size = snap_size; 486 dout("%p page eof %llu\n", page, snap_size);
481 else 487 goto out;
482 i_size = i_size_read(inode); 488 }
483 if (i_size < page_off + len) 489 if (snap_size < page_off + len)
484 len = i_size - page_off; 490 len = snap_size - page_off;
485 491
486 dout("writepage %p page %p index %lu on %llu~%u snapc %p\n", 492 dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
487 inode, page, page->index, page_off, len, snapc); 493 inode, page, page->index, page_off, len, snapc);
@@ -495,7 +501,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
495 err = ceph_osdc_writepages(osdc, ceph_vino(inode), 501 err = ceph_osdc_writepages(osdc, ceph_vino(inode),
496 &ci->i_layout, snapc, 502 &ci->i_layout, snapc,
497 page_off, len, 503 page_off, len,
498 ci->i_truncate_seq, ci->i_truncate_size, 504 truncate_seq, truncate_size,
499 &inode->i_mtime, &page, 1); 505 &inode->i_mtime, &page, 1);
500 if (err < 0) { 506 if (err < 0) {
501 dout("writepage setting page/mapping error %d %p\n", err, page); 507 dout("writepage setting page/mapping error %d %p\n", err, page);
@@ -632,25 +638,6 @@ static void writepages_finish(struct ceph_osd_request *req,
632 ceph_osdc_put_request(req); 638 ceph_osdc_put_request(req);
633} 639}
634 640
635static struct ceph_osd_request *
636ceph_writepages_osd_request(struct inode *inode, u64 offset, u64 *len,
637 struct ceph_snap_context *snapc, int num_ops)
638{
639 struct ceph_fs_client *fsc;
640 struct ceph_inode_info *ci;
641 struct ceph_vino vino;
642
643 fsc = ceph_inode_to_client(inode);
644 ci = ceph_inode(inode);
645 vino = ceph_vino(inode);
646 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
647
648 return ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
649 vino, offset, len, num_ops, CEPH_OSD_OP_WRITE,
650 CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK,
651 snapc, ci->i_truncate_seq, ci->i_truncate_size, true);
652}
653
654/* 641/*
655 * initiate async writeback 642 * initiate async writeback
656 */ 643 */
@@ -659,7 +646,8 @@ static int ceph_writepages_start(struct address_space *mapping,
659{ 646{
660 struct inode *inode = mapping->host; 647 struct inode *inode = mapping->host;
661 struct ceph_inode_info *ci = ceph_inode(inode); 648 struct ceph_inode_info *ci = ceph_inode(inode);
662 struct ceph_fs_client *fsc; 649 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
650 struct ceph_vino vino = ceph_vino(inode);
663 pgoff_t index, start, end; 651 pgoff_t index, start, end;
664 int range_whole = 0; 652 int range_whole = 0;
665 int should_loop = 1; 653 int should_loop = 1;
@@ -671,22 +659,22 @@ static int ceph_writepages_start(struct address_space *mapping,
671 unsigned wsize = 1 << inode->i_blkbits; 659 unsigned wsize = 1 << inode->i_blkbits;
672 struct ceph_osd_request *req = NULL; 660 struct ceph_osd_request *req = NULL;
673 int do_sync; 661 int do_sync;
674 u64 snap_size; 662 u64 truncate_size, snap_size;
663 u32 truncate_seq;
675 664
676 /* 665 /*
677 * Include a 'sync' in the OSD request if this is a data 666 * Include a 'sync' in the OSD request if this is a data
678 * integrity write (e.g., O_SYNC write or fsync()), or if our 667 * integrity write (e.g., O_SYNC write or fsync()), or if our
679 * cap is being revoked. 668 * cap is being revoked.
680 */ 669 */
681 do_sync = wbc->sync_mode == WB_SYNC_ALL; 670 if ((wbc->sync_mode == WB_SYNC_ALL) ||
682 if (ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER)) 671 ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER))
683 do_sync = 1; 672 do_sync = 1;
684 dout("writepages_start %p dosync=%d (mode=%s)\n", 673 dout("writepages_start %p dosync=%d (mode=%s)\n",
685 inode, do_sync, 674 inode, do_sync,
686 wbc->sync_mode == WB_SYNC_NONE ? "NONE" : 675 wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
687 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); 676 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
688 677
689 fsc = ceph_inode_to_client(inode);
690 if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) { 678 if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) {
691 pr_warning("writepage_start %p on forced umount\n", inode); 679 pr_warning("writepage_start %p on forced umount\n", inode);
692 return -EIO; /* we're in a forced umount, don't write! */ 680 return -EIO; /* we're in a forced umount, don't write! */
@@ -729,6 +717,14 @@ retry:
729 snap_size = i_size_read(inode); 717 snap_size = i_size_read(inode);
730 dout(" oldest snapc is %p seq %lld (%d snaps)\n", 718 dout(" oldest snapc is %p seq %lld (%d snaps)\n",
731 snapc, snapc->seq, snapc->num_snaps); 719 snapc, snapc->seq, snapc->num_snaps);
720
721 spin_lock(&ci->i_ceph_lock);
722 truncate_seq = ci->i_truncate_seq;
723 truncate_size = ci->i_truncate_size;
724 if (!snap_size)
725 snap_size = i_size_read(inode);
726 spin_unlock(&ci->i_ceph_lock);
727
732 if (last_snapc && snapc != last_snapc) { 728 if (last_snapc && snapc != last_snapc) {
733 /* if we switched to a newer snapc, restart our scan at the 729 /* if we switched to a newer snapc, restart our scan at the
734 * start of the original file range. */ 730 * start of the original file range. */
@@ -740,7 +736,6 @@ retry:
740 736
741 while (!done && index <= end) { 737 while (!done && index <= end) {
742 int num_ops = do_sync ? 2 : 1; 738 int num_ops = do_sync ? 2 : 1;
743 struct ceph_vino vino;
744 unsigned i; 739 unsigned i;
745 int first; 740 int first;
746 pgoff_t next; 741 pgoff_t next;
@@ -834,17 +829,18 @@ get_more_pages:
834 * that it will use. 829 * that it will use.
835 */ 830 */
836 if (locked_pages == 0) { 831 if (locked_pages == 0) {
837 size_t size;
838
839 BUG_ON(pages); 832 BUG_ON(pages);
840
841 /* prepare async write request */ 833 /* prepare async write request */
842 offset = (u64)page_offset(page); 834 offset = (u64)page_offset(page);
843 len = wsize; 835 len = wsize;
844 req = ceph_writepages_osd_request(inode, 836 req = ceph_osdc_new_request(&fsc->client->osdc,
845 offset, &len, snapc, 837 &ci->i_layout, vino,
846 num_ops); 838 offset, &len, num_ops,
847 839 CEPH_OSD_OP_WRITE,
840 CEPH_OSD_FLAG_WRITE |
841 CEPH_OSD_FLAG_ONDISK,
842 snapc, truncate_seq,
843 truncate_size, true);
848 if (IS_ERR(req)) { 844 if (IS_ERR(req)) {
849 rc = PTR_ERR(req); 845 rc = PTR_ERR(req);
850 unlock_page(page); 846 unlock_page(page);
@@ -855,8 +851,8 @@ get_more_pages:
855 req->r_inode = inode; 851 req->r_inode = inode;
856 852
857 max_pages = calc_pages_for(0, (u64)len); 853 max_pages = calc_pages_for(0, (u64)len);
858 size = max_pages * sizeof (*pages); 854 pages = kmalloc(max_pages * sizeof (*pages),
859 pages = kmalloc(size, GFP_NOFS); 855 GFP_NOFS);
860 if (!pages) { 856 if (!pages) {
861 pool = fsc->wb_pagevec_pool; 857 pool = fsc->wb_pagevec_pool;
862 pages = mempool_alloc(pool, GFP_NOFS); 858 pages = mempool_alloc(pool, GFP_NOFS);
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index da0f9b8a3bcb..25442b40c25a 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -147,7 +147,7 @@ void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
147 spin_unlock(&mdsc->caps_list_lock); 147 spin_unlock(&mdsc->caps_list_lock);
148} 148}
149 149
150int ceph_reserve_caps(struct ceph_mds_client *mdsc, 150void ceph_reserve_caps(struct ceph_mds_client *mdsc,
151 struct ceph_cap_reservation *ctx, int need) 151 struct ceph_cap_reservation *ctx, int need)
152{ 152{
153 int i; 153 int i;
@@ -155,7 +155,6 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
155 int have; 155 int have;
156 int alloc = 0; 156 int alloc = 0;
157 LIST_HEAD(newcaps); 157 LIST_HEAD(newcaps);
158 int ret = 0;
159 158
160 dout("reserve caps ctx=%p need=%d\n", ctx, need); 159 dout("reserve caps ctx=%p need=%d\n", ctx, need);
161 160
@@ -174,14 +173,15 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
174 173
175 for (i = have; i < need; i++) { 174 for (i = have; i < need; i++) {
176 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); 175 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
177 if (!cap) { 176 if (!cap)
178 ret = -ENOMEM; 177 break;
179 goto out_alloc_count;
180 }
181 list_add(&cap->caps_item, &newcaps); 178 list_add(&cap->caps_item, &newcaps);
182 alloc++; 179 alloc++;
183 } 180 }
184 BUG_ON(have + alloc != need); 181 /* we didn't manage to reserve as much as we needed */
182 if (have + alloc != need)
183 pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n",
184 ctx, need, have + alloc);
185 185
186 spin_lock(&mdsc->caps_list_lock); 186 spin_lock(&mdsc->caps_list_lock);
187 mdsc->caps_total_count += alloc; 187 mdsc->caps_total_count += alloc;
@@ -197,13 +197,6 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
197 dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n", 197 dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
198 ctx, mdsc->caps_total_count, mdsc->caps_use_count, 198 ctx, mdsc->caps_total_count, mdsc->caps_use_count,
199 mdsc->caps_reserve_count, mdsc->caps_avail_count); 199 mdsc->caps_reserve_count, mdsc->caps_avail_count);
200 return 0;
201
202out_alloc_count:
203 /* we didn't manage to reserve as much as we needed */
204 pr_warning("reserve caps ctx=%p ENOMEM need=%d got=%d\n",
205 ctx, need, have);
206 return ret;
207} 200}
208 201
209int ceph_unreserve_caps(struct ceph_mds_client *mdsc, 202int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
@@ -612,9 +605,11 @@ retry:
612 __cap_delay_requeue(mdsc, ci); 605 __cap_delay_requeue(mdsc, ci);
613 } 606 }
614 607
615 if (flags & CEPH_CAP_FLAG_AUTH) 608 if (flags & CEPH_CAP_FLAG_AUTH) {
616 ci->i_auth_cap = cap; 609 if (ci->i_auth_cap == NULL ||
617 else if (ci->i_auth_cap == cap) { 610 ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0)
611 ci->i_auth_cap = cap;
612 } else if (ci->i_auth_cap == cap) {
618 ci->i_auth_cap = NULL; 613 ci->i_auth_cap = NULL;
619 spin_lock(&mdsc->cap_dirty_lock); 614 spin_lock(&mdsc->cap_dirty_lock);
620 if (!list_empty(&ci->i_dirty_item)) { 615 if (!list_empty(&ci->i_dirty_item)) {
@@ -695,6 +690,15 @@ int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
695 if (implemented) 690 if (implemented)
696 *implemented |= cap->implemented; 691 *implemented |= cap->implemented;
697 } 692 }
693 /*
694 * exclude caps issued by non-auth MDS, but are been revoking
695 * by the auth MDS. The non-auth MDS should be revoking/exporting
696 * these caps, but the message is delayed.
697 */
698 if (ci->i_auth_cap) {
699 cap = ci->i_auth_cap;
700 have &= ~cap->implemented | cap->issued;
701 }
698 return have; 702 return have;
699} 703}
700 704
@@ -802,22 +806,28 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
802/* 806/*
803 * Return true if mask caps are currently being revoked by an MDS. 807 * Return true if mask caps are currently being revoked by an MDS.
804 */ 808 */
805int ceph_caps_revoking(struct ceph_inode_info *ci, int mask) 809int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
810 struct ceph_cap *ocap, int mask)
806{ 811{
807 struct inode *inode = &ci->vfs_inode;
808 struct ceph_cap *cap; 812 struct ceph_cap *cap;
809 struct rb_node *p; 813 struct rb_node *p;
810 int ret = 0;
811 814
812 spin_lock(&ci->i_ceph_lock);
813 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 815 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
814 cap = rb_entry(p, struct ceph_cap, ci_node); 816 cap = rb_entry(p, struct ceph_cap, ci_node);
815 if (__cap_is_valid(cap) && 817 if (cap != ocap && __cap_is_valid(cap) &&
816 (cap->implemented & ~cap->issued & mask)) { 818 (cap->implemented & ~cap->issued & mask))
817 ret = 1; 819 return 1;
818 break;
819 }
820 } 820 }
821 return 0;
822}
823
824int ceph_caps_revoking(struct ceph_inode_info *ci, int mask)
825{
826 struct inode *inode = &ci->vfs_inode;
827 int ret;
828
829 spin_lock(&ci->i_ceph_lock);
830 ret = __ceph_caps_revoking_other(ci, NULL, mask);
821 spin_unlock(&ci->i_ceph_lock); 831 spin_unlock(&ci->i_ceph_lock);
822 dout("ceph_caps_revoking %p %s = %d\n", inode, 832 dout("ceph_caps_revoking %p %s = %d\n", inode,
823 ceph_cap_string(mask), ret); 833 ceph_cap_string(mask), ret);
@@ -1980,8 +1990,15 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
1980 cap = ci->i_auth_cap; 1990 cap = ci->i_auth_cap;
1981 dout("kick_flushing_inode_caps %p flushing %s flush_seq %lld\n", inode, 1991 dout("kick_flushing_inode_caps %p flushing %s flush_seq %lld\n", inode,
1982 ceph_cap_string(ci->i_flushing_caps), ci->i_cap_flush_seq); 1992 ceph_cap_string(ci->i_flushing_caps), ci->i_cap_flush_seq);
1993
1983 __ceph_flush_snaps(ci, &session, 1); 1994 __ceph_flush_snaps(ci, &session, 1);
1995
1984 if (ci->i_flushing_caps) { 1996 if (ci->i_flushing_caps) {
1997 spin_lock(&mdsc->cap_dirty_lock);
1998 list_move_tail(&ci->i_flushing_item,
1999 &cap->session->s_cap_flushing);
2000 spin_unlock(&mdsc->cap_dirty_lock);
2001
1985 delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, 2002 delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
1986 __ceph_caps_used(ci), 2003 __ceph_caps_used(ci),
1987 __ceph_caps_wanted(ci), 2004 __ceph_caps_wanted(ci),
@@ -2055,7 +2072,11 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2055 /* finish pending truncate */ 2072 /* finish pending truncate */
2056 while (ci->i_truncate_pending) { 2073 while (ci->i_truncate_pending) {
2057 spin_unlock(&ci->i_ceph_lock); 2074 spin_unlock(&ci->i_ceph_lock);
2058 __ceph_do_pending_vmtruncate(inode, !(need & CEPH_CAP_FILE_WR)); 2075 if (!(need & CEPH_CAP_FILE_WR))
2076 mutex_lock(&inode->i_mutex);
2077 __ceph_do_pending_vmtruncate(inode);
2078 if (!(need & CEPH_CAP_FILE_WR))
2079 mutex_unlock(&inode->i_mutex);
2059 spin_lock(&ci->i_ceph_lock); 2080 spin_lock(&ci->i_ceph_lock);
2060 } 2081 }
2061 2082
@@ -2473,6 +2494,11 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2473 } else { 2494 } else {
2474 dout("grant: %s -> %s\n", ceph_cap_string(cap->issued), 2495 dout("grant: %s -> %s\n", ceph_cap_string(cap->issued),
2475 ceph_cap_string(newcaps)); 2496 ceph_cap_string(newcaps));
2497 /* non-auth MDS is revoking the newly grant caps ? */
2498 if (cap == ci->i_auth_cap &&
2499 __ceph_caps_revoking_other(ci, cap, newcaps))
2500 check_caps = 2;
2501
2476 cap->issued = newcaps; 2502 cap->issued = newcaps;
2477 cap->implemented |= newcaps; /* add bits only, to 2503 cap->implemented |= newcaps; /* add bits only, to
2478 * avoid stepping on a 2504 * avoid stepping on a
@@ -3042,21 +3068,19 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
3042 (cap->issued & unless) == 0)) { 3068 (cap->issued & unless) == 0)) {
3043 if ((cap->issued & drop) && 3069 if ((cap->issued & drop) &&
3044 (cap->issued & unless) == 0) { 3070 (cap->issued & unless) == 0) {
3045 dout("encode_inode_release %p cap %p %s -> " 3071 int wanted = __ceph_caps_wanted(ci);
3046 "%s\n", inode, cap, 3072 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0)
3073 wanted |= cap->mds_wanted;
3074 dout("encode_inode_release %p cap %p "
3075 "%s -> %s, wanted %s -> %s\n", inode, cap,
3047 ceph_cap_string(cap->issued), 3076 ceph_cap_string(cap->issued),
3048 ceph_cap_string(cap->issued & ~drop)); 3077 ceph_cap_string(cap->issued & ~drop),
3078 ceph_cap_string(cap->mds_wanted),
3079 ceph_cap_string(wanted));
3080
3049 cap->issued &= ~drop; 3081 cap->issued &= ~drop;
3050 cap->implemented &= ~drop; 3082 cap->implemented &= ~drop;
3051 if (ci->i_ceph_flags & CEPH_I_NODELAY) { 3083 cap->mds_wanted = wanted;
3052 int wanted = __ceph_caps_wanted(ci);
3053 dout(" wanted %s -> %s (act %s)\n",
3054 ceph_cap_string(cap->mds_wanted),
3055 ceph_cap_string(cap->mds_wanted &
3056 ~wanted),
3057 ceph_cap_string(wanted));
3058 cap->mds_wanted &= wanted;
3059 }
3060 } else { 3084 } else {
3061 dout("encode_inode_release %p cap %p %s" 3085 dout("encode_inode_release %p cap %p %s"
3062 " (force)\n", inode, cap, 3086 " (force)\n", inode, cap,
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 16c989d3e23c..2ddf061c1c4a 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -716,7 +716,6 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
716 if (ceph_snap(inode) != CEPH_NOSNAP) 716 if (ceph_snap(inode) != CEPH_NOSNAP)
717 return -EROFS; 717 return -EROFS;
718 718
719 sb_start_write(inode->i_sb);
720 mutex_lock(&inode->i_mutex); 719 mutex_lock(&inode->i_mutex);
721 hold_mutex = true; 720 hold_mutex = true;
722 721
@@ -809,7 +808,6 @@ retry_snap:
809out: 808out:
810 if (hold_mutex) 809 if (hold_mutex)
811 mutex_unlock(&inode->i_mutex); 810 mutex_unlock(&inode->i_mutex);
812 sb_end_write(inode->i_sb);
813 current->backing_dev_info = NULL; 811 current->backing_dev_info = NULL;
814 812
815 return written ? written : err; 813 return written ? written : err;
@@ -824,7 +822,7 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
824 int ret; 822 int ret;
825 823
826 mutex_lock(&inode->i_mutex); 824 mutex_lock(&inode->i_mutex);
827 __ceph_do_pending_vmtruncate(inode, false); 825 __ceph_do_pending_vmtruncate(inode);
828 826
829 if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) { 827 if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
830 ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); 828 ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index bd2289a4f40d..f3a2abf28a77 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1465,7 +1465,9 @@ static void ceph_vmtruncate_work(struct work_struct *work)
1465 struct inode *inode = &ci->vfs_inode; 1465 struct inode *inode = &ci->vfs_inode;
1466 1466
1467 dout("vmtruncate_work %p\n", inode); 1467 dout("vmtruncate_work %p\n", inode);
1468 __ceph_do_pending_vmtruncate(inode, true); 1468 mutex_lock(&inode->i_mutex);
1469 __ceph_do_pending_vmtruncate(inode);
1470 mutex_unlock(&inode->i_mutex);
1469 iput(inode); 1471 iput(inode);
1470} 1472}
1471 1473
@@ -1492,7 +1494,7 @@ void ceph_queue_vmtruncate(struct inode *inode)
1492 * Make sure any pending truncation is applied before doing anything 1494 * Make sure any pending truncation is applied before doing anything
1493 * that may depend on it. 1495 * that may depend on it.
1494 */ 1496 */
1495void __ceph_do_pending_vmtruncate(struct inode *inode, bool needlock) 1497void __ceph_do_pending_vmtruncate(struct inode *inode)
1496{ 1498{
1497 struct ceph_inode_info *ci = ceph_inode(inode); 1499 struct ceph_inode_info *ci = ceph_inode(inode);
1498 u64 to; 1500 u64 to;
@@ -1525,11 +1527,7 @@ retry:
1525 ci->i_truncate_pending, to); 1527 ci->i_truncate_pending, to);
1526 spin_unlock(&ci->i_ceph_lock); 1528 spin_unlock(&ci->i_ceph_lock);
1527 1529
1528 if (needlock)
1529 mutex_lock(&inode->i_mutex);
1530 truncate_inode_pages(inode->i_mapping, to); 1530 truncate_inode_pages(inode->i_mapping, to);
1531 if (needlock)
1532 mutex_unlock(&inode->i_mutex);
1533 1531
1534 spin_lock(&ci->i_ceph_lock); 1532 spin_lock(&ci->i_ceph_lock);
1535 if (to == ci->i_truncate_size) { 1533 if (to == ci->i_truncate_size) {
@@ -1588,7 +1586,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1588 if (ceph_snap(inode) != CEPH_NOSNAP) 1586 if (ceph_snap(inode) != CEPH_NOSNAP)
1589 return -EROFS; 1587 return -EROFS;
1590 1588
1591 __ceph_do_pending_vmtruncate(inode, false); 1589 __ceph_do_pending_vmtruncate(inode);
1592 1590
1593 err = inode_change_ok(inode, attr); 1591 err = inode_change_ok(inode, attr);
1594 if (err != 0) 1592 if (err != 0)
@@ -1770,7 +1768,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1770 ceph_cap_string(dirtied), mask); 1768 ceph_cap_string(dirtied), mask);
1771 1769
1772 ceph_mdsc_put_request(req); 1770 ceph_mdsc_put_request(req);
1773 __ceph_do_pending_vmtruncate(inode, false); 1771 __ceph_do_pending_vmtruncate(inode);
1774 return err; 1772 return err;
1775out: 1773out:
1776 spin_unlock(&ci->i_ceph_lock); 1774 spin_unlock(&ci->i_ceph_lock);
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index 690f73f42425..ae6d14e82b0f 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -169,7 +169,7 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
169} 169}
170 170
171/** 171/**
172 * Must be called with BKL already held. Fills in the passed 172 * Must be called with lock_flocks() already held. Fills in the passed
173 * counter variables, so you can prepare pagelist metadata before calling 173 * counter variables, so you can prepare pagelist metadata before calling
174 * ceph_encode_locks. 174 * ceph_encode_locks.
175 */ 175 */
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 99890b02a10b..187bf214444d 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1391,6 +1391,7 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc,
1391 num = le32_to_cpu(head->num); 1391 num = le32_to_cpu(head->num);
1392 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num); 1392 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
1393 head->num = cpu_to_le32(0); 1393 head->num = cpu_to_le32(0);
1394 msg->front.iov_len = sizeof(*head);
1394 session->s_num_cap_releases += num; 1395 session->s_num_cap_releases += num;
1395 1396
1396 /* requeue completed messages */ 1397 /* requeue completed messages */
@@ -2454,6 +2455,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2454 spin_lock(&ci->i_ceph_lock); 2455 spin_lock(&ci->i_ceph_lock);
2455 cap->seq = 0; /* reset cap seq */ 2456 cap->seq = 0; /* reset cap seq */
2456 cap->issue_seq = 0; /* and issue_seq */ 2457 cap->issue_seq = 0; /* and issue_seq */
2458 cap->mseq = 0; /* and migrate_seq */
2457 2459
2458 if (recon_state->flock) { 2460 if (recon_state->flock) {
2459 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 2461 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
@@ -3040,8 +3042,10 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
3040 fsc->mdsc = mdsc; 3042 fsc->mdsc = mdsc;
3041 mutex_init(&mdsc->mutex); 3043 mutex_init(&mdsc->mutex);
3042 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 3044 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3043 if (mdsc->mdsmap == NULL) 3045 if (mdsc->mdsmap == NULL) {
3046 kfree(mdsc);
3044 return -ENOMEM; 3047 return -ENOMEM;
3048 }
3045 3049
3046 init_completion(&mdsc->safe_umount_waiters); 3050 init_completion(&mdsc->safe_umount_waiters);
3047 init_waitqueue_head(&mdsc->session_close_wq); 3051 init_waitqueue_head(&mdsc->session_close_wq);
diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c
index 9278dec9e940..132b64eeecd4 100644
--- a/fs/ceph/mdsmap.c
+++ b/fs/ceph/mdsmap.c
@@ -92,6 +92,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
92 u32 num_export_targets; 92 u32 num_export_targets;
93 void *pexport_targets = NULL; 93 void *pexport_targets = NULL;
94 struct ceph_timespec laggy_since; 94 struct ceph_timespec laggy_since;
95 struct ceph_mds_info *info;
95 96
96 ceph_decode_need(p, end, sizeof(u64)*2 + 1 + sizeof(u32), bad); 97 ceph_decode_need(p, end, sizeof(u64)*2 + 1 + sizeof(u32), bad);
97 global_id = ceph_decode_64(p); 98 global_id = ceph_decode_64(p);
@@ -126,24 +127,27 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
126 i+1, n, global_id, mds, inc, 127 i+1, n, global_id, mds, inc,
127 ceph_pr_addr(&addr.in_addr), 128 ceph_pr_addr(&addr.in_addr),
128 ceph_mds_state_name(state)); 129 ceph_mds_state_name(state));
129 if (mds >= 0 && mds < m->m_max_mds && state > 0) { 130
130 m->m_info[mds].global_id = global_id; 131 if (mds < 0 || mds >= m->m_max_mds || state <= 0)
131 m->m_info[mds].state = state; 132 continue;
132 m->m_info[mds].addr = addr; 133
133 m->m_info[mds].laggy = 134 info = &m->m_info[mds];
134 (laggy_since.tv_sec != 0 || 135 info->global_id = global_id;
135 laggy_since.tv_nsec != 0); 136 info->state = state;
136 m->m_info[mds].num_export_targets = num_export_targets; 137 info->addr = addr;
137 if (num_export_targets) { 138 info->laggy = (laggy_since.tv_sec != 0 ||
138 m->m_info[mds].export_targets = 139 laggy_since.tv_nsec != 0);
139 kcalloc(num_export_targets, sizeof(u32), 140 info->num_export_targets = num_export_targets;
140 GFP_NOFS); 141 if (num_export_targets) {
141 for (j = 0; j < num_export_targets; j++) 142 info->export_targets = kcalloc(num_export_targets,
142 m->m_info[mds].export_targets[j] = 143 sizeof(u32), GFP_NOFS);
143 ceph_decode_32(&pexport_targets); 144 if (info->export_targets == NULL)
144 } else { 145 goto badmem;
145 m->m_info[mds].export_targets = NULL; 146 for (j = 0; j < num_export_targets; j++)
146 } 147 info->export_targets[j] =
148 ceph_decode_32(&pexport_targets);
149 } else {
150 info->export_targets = NULL;
147 } 151 }
148 } 152 }
149 153
@@ -170,7 +174,7 @@ bad:
170 DUMP_PREFIX_OFFSET, 16, 1, 174 DUMP_PREFIX_OFFSET, 16, 1,
171 start, end - start, true); 175 start, end - start, true);
172 ceph_mdsmap_destroy(m); 176 ceph_mdsmap_destroy(m);
173 return ERR_PTR(-EINVAL); 177 return ERR_PTR(err);
174} 178}
175 179
176void ceph_mdsmap_destroy(struct ceph_mdsmap *m) 180void ceph_mdsmap_destroy(struct ceph_mdsmap *m)
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 7d377c9a5e35..6627b26a800c 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -357,7 +357,7 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
357 } 357 }
358 err = -EINVAL; 358 err = -EINVAL;
359 dev_name_end--; /* back up to ':' separator */ 359 dev_name_end--; /* back up to ':' separator */
360 if (*dev_name_end != ':') { 360 if (dev_name_end < dev_name || *dev_name_end != ':') {
361 pr_err("device name is missing path (no : separator in %s)\n", 361 pr_err("device name is missing path (no : separator in %s)\n",
362 dev_name); 362 dev_name);
363 goto out; 363 goto out;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 7ccfdb4aea2e..cbded572345e 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -534,7 +534,7 @@ extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci);
534extern void ceph_caps_init(struct ceph_mds_client *mdsc); 534extern void ceph_caps_init(struct ceph_mds_client *mdsc);
535extern void ceph_caps_finalize(struct ceph_mds_client *mdsc); 535extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
536extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta); 536extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
537extern int ceph_reserve_caps(struct ceph_mds_client *mdsc, 537extern void ceph_reserve_caps(struct ceph_mds_client *mdsc,
538 struct ceph_cap_reservation *ctx, int need); 538 struct ceph_cap_reservation *ctx, int need);
539extern int ceph_unreserve_caps(struct ceph_mds_client *mdsc, 539extern int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
540 struct ceph_cap_reservation *ctx); 540 struct ceph_cap_reservation *ctx);
@@ -692,7 +692,7 @@ extern int ceph_readdir_prepopulate(struct ceph_mds_request *req,
692extern int ceph_inode_holds_cap(struct inode *inode, int mask); 692extern int ceph_inode_holds_cap(struct inode *inode, int mask);
693 693
694extern int ceph_inode_set_size(struct inode *inode, loff_t size); 694extern int ceph_inode_set_size(struct inode *inode, loff_t size);
695extern void __ceph_do_pending_vmtruncate(struct inode *inode, bool needlock); 695extern void __ceph_do_pending_vmtruncate(struct inode *inode);
696extern void ceph_queue_vmtruncate(struct inode *inode); 696extern void ceph_queue_vmtruncate(struct inode *inode);
697 697
698extern void ceph_queue_invalidate(struct inode *inode); 698extern void ceph_queue_invalidate(struct inode *inode);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 9b6b2b6dd164..be661d8f532a 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -675,17 +675,18 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
675 if (!ceph_is_valid_xattr(name)) 675 if (!ceph_is_valid_xattr(name))
676 return -ENODATA; 676 return -ENODATA;
677 677
678 spin_lock(&ci->i_ceph_lock);
679 dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
680 ci->i_xattrs.version, ci->i_xattrs.index_version);
681 678
682 /* let's see if a virtual xattr was requested */ 679 /* let's see if a virtual xattr was requested */
683 vxattr = ceph_match_vxattr(inode, name); 680 vxattr = ceph_match_vxattr(inode, name);
684 if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) { 681 if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) {
685 err = vxattr->getxattr_cb(ci, value, size); 682 err = vxattr->getxattr_cb(ci, value, size);
686 goto out; 683 return err;
687 } 684 }
688 685
686 spin_lock(&ci->i_ceph_lock);
687 dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
688 ci->i_xattrs.version, ci->i_xattrs.index_version);
689
689 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) && 690 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) &&
690 (ci->i_xattrs.index_version >= ci->i_xattrs.version)) { 691 (ci->i_xattrs.index_version >= ci->i_xattrs.version)) {
691 goto get_xattr; 692 goto get_xattr;
diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h
index 379f71508995..0442c3d800f0 100644
--- a/include/linux/ceph/decode.h
+++ b/include/linux/ceph/decode.h
@@ -160,11 +160,6 @@ static inline void ceph_decode_timespec(struct timespec *ts,
160static inline void ceph_encode_timespec(struct ceph_timespec *tv, 160static inline void ceph_encode_timespec(struct ceph_timespec *tv,
161 const struct timespec *ts) 161 const struct timespec *ts)
162{ 162{
163 BUG_ON(ts->tv_sec < 0);
164 BUG_ON(ts->tv_sec > (__kernel_time_t)U32_MAX);
165 BUG_ON(ts->tv_nsec < 0);
166 BUG_ON(ts->tv_nsec > (long)U32_MAX);
167
168 tv->tv_sec = cpu_to_le32((u32)ts->tv_sec); 163 tv->tv_sec = cpu_to_le32((u32)ts->tv_sec);
169 tv->tv_nsec = cpu_to_le32((u32)ts->tv_nsec); 164 tv->tv_nsec = cpu_to_le32((u32)ts->tv_nsec);
170} 165}
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 186db0bf4951..ce6df39f60ff 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -145,7 +145,6 @@ struct ceph_osd_request {
145 s32 r_reply_op_result[CEPH_OSD_MAX_OP]; 145 s32 r_reply_op_result[CEPH_OSD_MAX_OP];
146 int r_got_reply; 146 int r_got_reply;
147 int r_linger; 147 int r_linger;
148 int r_completed;
149 148
150 struct ceph_osd_client *r_osdc; 149 struct ceph_osd_client *r_osdc;
151 struct kref r_kref; 150 struct kref r_kref;
diff --git a/net/ceph/auth_none.c b/net/ceph/auth_none.c
index 925ca583c09c..8c93fa8d81bc 100644
--- a/net/ceph/auth_none.c
+++ b/net/ceph/auth_none.c
@@ -39,6 +39,11 @@ static int should_authenticate(struct ceph_auth_client *ac)
39 return xi->starting; 39 return xi->starting;
40} 40}
41 41
42static int build_request(struct ceph_auth_client *ac, void *buf, void *end)
43{
44 return 0;
45}
46
42/* 47/*
43 * the generic auth code decode the global_id, and we carry no actual 48 * the generic auth code decode the global_id, and we carry no actual
44 * authenticate state, so nothing happens here. 49 * authenticate state, so nothing happens here.
@@ -106,6 +111,7 @@ static const struct ceph_auth_client_ops ceph_auth_none_ops = {
106 .destroy = destroy, 111 .destroy = destroy,
107 .is_authenticated = is_authenticated, 112 .is_authenticated = is_authenticated,
108 .should_authenticate = should_authenticate, 113 .should_authenticate = should_authenticate,
114 .build_request = build_request,
109 .handle_reply = handle_reply, 115 .handle_reply = handle_reply,
110 .create_authorizer = ceph_auth_none_create_authorizer, 116 .create_authorizer = ceph_auth_none_create_authorizer,
111 .destroy_authorizer = ceph_auth_none_destroy_authorizer, 117 .destroy_authorizer = ceph_auth_none_destroy_authorizer,
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 3a246a6cab47..dd47889adc4a 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -733,12 +733,14 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
733 733
734 object_size = le32_to_cpu(layout->fl_object_size); 734 object_size = le32_to_cpu(layout->fl_object_size);
735 object_base = off - objoff; 735 object_base = off - objoff;
736 if (truncate_size <= object_base) { 736 if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
737 truncate_size = 0; 737 if (truncate_size <= object_base) {
738 } else { 738 truncate_size = 0;
739 truncate_size -= object_base; 739 } else {
740 if (truncate_size > object_size) 740 truncate_size -= object_base;
741 truncate_size = object_size; 741 if (truncate_size > object_size)
742 truncate_size = object_size;
743 }
742 } 744 }
743 745
744 osd_req_op_extent_init(req, 0, opcode, objoff, objlen, 746 osd_req_op_extent_init(req, 0, opcode, objoff, objlen,
@@ -1174,6 +1176,7 @@ static void __register_linger_request(struct ceph_osd_client *osdc,
1174 struct ceph_osd_request *req) 1176 struct ceph_osd_request *req)
1175{ 1177{
1176 dout("__register_linger_request %p\n", req); 1178 dout("__register_linger_request %p\n", req);
1179 ceph_osdc_get_request(req);
1177 list_add_tail(&req->r_linger_item, &osdc->req_linger); 1180 list_add_tail(&req->r_linger_item, &osdc->req_linger);
1178 if (req->r_osd) 1181 if (req->r_osd)
1179 list_add_tail(&req->r_linger_osd, 1182 list_add_tail(&req->r_linger_osd,
@@ -1196,6 +1199,7 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
1196 if (list_empty(&req->r_osd_item)) 1199 if (list_empty(&req->r_osd_item))
1197 req->r_osd = NULL; 1200 req->r_osd = NULL;
1198 } 1201 }
1202 ceph_osdc_put_request(req);
1199} 1203}
1200 1204
1201void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, 1205void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
@@ -1203,9 +1207,8 @@ void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
1203{ 1207{
1204 mutex_lock(&osdc->request_mutex); 1208 mutex_lock(&osdc->request_mutex);
1205 if (req->r_linger) { 1209 if (req->r_linger) {
1206 __unregister_linger_request(osdc, req);
1207 req->r_linger = 0; 1210 req->r_linger = 0;
1208 ceph_osdc_put_request(req); 1211 __unregister_linger_request(osdc, req);
1209 } 1212 }
1210 mutex_unlock(&osdc->request_mutex); 1213 mutex_unlock(&osdc->request_mutex);
1211} 1214}
@@ -1217,11 +1220,6 @@ void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
1217 if (!req->r_linger) { 1220 if (!req->r_linger) {
1218 dout("set_request_linger %p\n", req); 1221 dout("set_request_linger %p\n", req);
1219 req->r_linger = 1; 1222 req->r_linger = 1;
1220 /*
1221 * caller is now responsible for calling
1222 * unregister_linger_request
1223 */
1224 ceph_osdc_get_request(req);
1225 } 1223 }
1226} 1224}
1227EXPORT_SYMBOL(ceph_osdc_set_request_linger); 1225EXPORT_SYMBOL(ceph_osdc_set_request_linger);
@@ -1339,10 +1337,6 @@ static void __send_request(struct ceph_osd_client *osdc,
1339 1337
1340 ceph_msg_get(req->r_request); /* send consumes a ref */ 1338 ceph_msg_get(req->r_request); /* send consumes a ref */
1341 1339
1342 /* Mark the request unsafe if this is the first timet's being sent. */
1343
1344 if (!req->r_sent && req->r_unsafe_callback)
1345 req->r_unsafe_callback(req, true);
1346 req->r_sent = req->r_osd->o_incarnation; 1340 req->r_sent = req->r_osd->o_incarnation;
1347 1341
1348 ceph_con_send(&req->r_osd->o_con, req->r_request); 1342 ceph_con_send(&req->r_osd->o_con, req->r_request);
@@ -1433,8 +1427,6 @@ static void handle_osds_timeout(struct work_struct *work)
1433 1427
1434static void complete_request(struct ceph_osd_request *req) 1428static void complete_request(struct ceph_osd_request *req)
1435{ 1429{
1436 if (req->r_unsafe_callback)
1437 req->r_unsafe_callback(req, false);
1438 complete_all(&req->r_safe_completion); /* fsync waiter */ 1430 complete_all(&req->r_safe_completion); /* fsync waiter */
1439} 1431}
1440 1432
@@ -1526,6 +1518,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1526 for (i = 0; i < numops; i++) 1518 for (i = 0; i < numops; i++)
1527 req->r_reply_op_result[i] = ceph_decode_32(&p); 1519 req->r_reply_op_result[i] = ceph_decode_32(&p);
1528 1520
1521 already_completed = req->r_got_reply;
1522
1529 if (!req->r_got_reply) { 1523 if (!req->r_got_reply) {
1530 1524
1531 req->r_result = result; 1525 req->r_result = result;
@@ -1556,19 +1550,23 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1556 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1550 ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1557 __unregister_request(osdc, req); 1551 __unregister_request(osdc, req);
1558 1552
1559 already_completed = req->r_completed;
1560 req->r_completed = 1;
1561 mutex_unlock(&osdc->request_mutex); 1553 mutex_unlock(&osdc->request_mutex);
1562 if (already_completed)
1563 goto done;
1564 1554
1565 if (req->r_callback) 1555 if (!already_completed) {
1566 req->r_callback(req, msg); 1556 if (req->r_unsafe_callback &&
1567 else 1557 result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK))
1568 complete_all(&req->r_completion); 1558 req->r_unsafe_callback(req, true);
1559 if (req->r_callback)
1560 req->r_callback(req, msg);
1561 else
1562 complete_all(&req->r_completion);
1563 }
1569 1564
1570 if (flags & CEPH_OSD_FLAG_ONDISK) 1565 if (flags & CEPH_OSD_FLAG_ONDISK) {
1566 if (req->r_unsafe_callback && already_completed)
1567 req->r_unsafe_callback(req, false);
1571 complete_request(req); 1568 complete_request(req);
1569 }
1572 1570
1573done: 1571done:
1574 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1572 dout("req=%p req->r_linger=%d\n", req, req->r_linger);
@@ -1633,8 +1631,10 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1633 dout("%p tid %llu restart on osd%d\n", 1631 dout("%p tid %llu restart on osd%d\n",
1634 req, req->r_tid, 1632 req, req->r_tid,
1635 req->r_osd ? req->r_osd->o_osd : -1); 1633 req->r_osd ? req->r_osd->o_osd : -1);
1634 ceph_osdc_get_request(req);
1636 __unregister_request(osdc, req); 1635 __unregister_request(osdc, req);
1637 __register_linger_request(osdc, req); 1636 __register_linger_request(osdc, req);
1637 ceph_osdc_put_request(req);
1638 continue; 1638 continue;
1639 } 1639 }
1640 1640
@@ -2123,7 +2123,6 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
2123 __register_request(osdc, req); 2123 __register_request(osdc, req);
2124 req->r_sent = 0; 2124 req->r_sent = 0;
2125 req->r_got_reply = 0; 2125 req->r_got_reply = 0;
2126 req->r_completed = 0;
2127 rc = __map_request(osdc, req, 0); 2126 rc = __map_request(osdc, req, 0);
2128 if (rc < 0) { 2127 if (rc < 0) {
2129 if (nofail) { 2128 if (nofail) {
@@ -2456,8 +2455,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2456 ceph_msg_revoke_incoming(req->r_reply); 2455 ceph_msg_revoke_incoming(req->r_reply);
2457 2456
2458 if (front > req->r_reply->front.iov_len) { 2457 if (front > req->r_reply->front.iov_len) {
2459 pr_warning("get_reply front %d > preallocated %d\n", 2458 pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n",
2460 front, (int)req->r_reply->front.iov_len); 2459 front, (int)req->r_reply->front.iov_len,
2460 (unsigned int)con->peer_name.type,
2461 le64_to_cpu(con->peer_name.num));
2461 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); 2462 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
2462 if (!m) 2463 if (!m)
2463 goto out; 2464 goto out;