aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--drivers/block/rbd.c109
-rw-r--r--fs/ceph/cache.c2
-rw-r--r--fs/ceph/caps.c76
-rw-r--r--fs/ceph/file.c87
-rw-r--r--fs/ceph/inode.c1
-rw-r--r--fs/ceph/mds_client.c57
-rw-r--r--fs/ceph/mds_client.h3
-rw-r--r--fs/ceph/super.h1
-rw-r--r--include/linux/ceph/libceph.h4
-rw-r--r--include/linux/ceph/messenger.h16
-rw-r--r--net/ceph/auth_x.c36
-rw-r--r--net/ceph/ceph_common.c18
-rw-r--r--net/ceph/crypto.h4
-rw-r--r--net/ceph/messenger.c88
-rw-r--r--net/ceph/osd_client.c34
15 files changed, 314 insertions, 222 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 128e7df5b807..235708c7c46e 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -418,8 +418,6 @@ MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (d
418 418
419static int rbd_img_request_submit(struct rbd_img_request *img_request); 419static int rbd_img_request_submit(struct rbd_img_request *img_request);
420 420
421static void rbd_dev_device_release(struct device *dev);
422
423static ssize_t rbd_add(struct bus_type *bus, const char *buf, 421static ssize_t rbd_add(struct bus_type *bus, const char *buf,
424 size_t count); 422 size_t count);
425static ssize_t rbd_remove(struct bus_type *bus, const char *buf, 423static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
@@ -3991,14 +3989,12 @@ static const struct attribute_group *rbd_attr_groups[] = {
3991 NULL 3989 NULL
3992}; 3990};
3993 3991
3994static void rbd_sysfs_dev_release(struct device *dev) 3992static void rbd_dev_release(struct device *dev);
3995{
3996}
3997 3993
3998static struct device_type rbd_device_type = { 3994static struct device_type rbd_device_type = {
3999 .name = "rbd", 3995 .name = "rbd",
4000 .groups = rbd_attr_groups, 3996 .groups = rbd_attr_groups,
4001 .release = rbd_sysfs_dev_release, 3997 .release = rbd_dev_release,
4002}; 3998};
4003 3999
4004static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) 4000static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
@@ -4041,6 +4037,25 @@ static void rbd_spec_free(struct kref *kref)
4041 kfree(spec); 4037 kfree(spec);
4042} 4038}
4043 4039
4040static void rbd_dev_release(struct device *dev)
4041{
4042 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4043 bool need_put = !!rbd_dev->opts;
4044
4045 rbd_put_client(rbd_dev->rbd_client);
4046 rbd_spec_put(rbd_dev->spec);
4047 kfree(rbd_dev->opts);
4048 kfree(rbd_dev);
4049
4050 /*
4051 * This is racy, but way better than putting module outside of
4052 * the release callback. The race window is pretty small, so
4053 * doing something similar to dm (dm-builtin.c) is overkill.
4054 */
4055 if (need_put)
4056 module_put(THIS_MODULE);
4057}
4058
4044static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, 4059static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4045 struct rbd_spec *spec, 4060 struct rbd_spec *spec,
4046 struct rbd_options *opts) 4061 struct rbd_options *opts)
@@ -4057,6 +4072,11 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4057 INIT_LIST_HEAD(&rbd_dev->node); 4072 INIT_LIST_HEAD(&rbd_dev->node);
4058 init_rwsem(&rbd_dev->header_rwsem); 4073 init_rwsem(&rbd_dev->header_rwsem);
4059 4074
4075 rbd_dev->dev.bus = &rbd_bus_type;
4076 rbd_dev->dev.type = &rbd_device_type;
4077 rbd_dev->dev.parent = &rbd_root_dev;
4078 device_initialize(&rbd_dev->dev);
4079
4060 rbd_dev->rbd_client = rbdc; 4080 rbd_dev->rbd_client = rbdc;
4061 rbd_dev->spec = spec; 4081 rbd_dev->spec = spec;
4062 rbd_dev->opts = opts; 4082 rbd_dev->opts = opts;
@@ -4068,15 +4088,21 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4068 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 4088 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
4069 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id); 4089 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
4070 4090
4091 /*
4092 * If this is a mapping rbd_dev (as opposed to a parent one),
4093 * pin our module. We have a ref from do_rbd_add(), so use
4094 * __module_get().
4095 */
4096 if (rbd_dev->opts)
4097 __module_get(THIS_MODULE);
4098
4071 return rbd_dev; 4099 return rbd_dev;
4072} 4100}
4073 4101
4074static void rbd_dev_destroy(struct rbd_device *rbd_dev) 4102static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4075{ 4103{
4076 rbd_put_client(rbd_dev->rbd_client); 4104 if (rbd_dev)
4077 rbd_spec_put(rbd_dev->spec); 4105 put_device(&rbd_dev->dev);
4078 kfree(rbd_dev->opts);
4079 kfree(rbd_dev);
4080} 4106}
4081 4107
4082/* 4108/*
@@ -4702,27 +4728,6 @@ static int rbd_dev_header_info(struct rbd_device *rbd_dev)
4702 return rbd_dev_v2_header_info(rbd_dev); 4728 return rbd_dev_v2_header_info(rbd_dev);
4703} 4729}
4704 4730
4705static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4706{
4707 struct device *dev;
4708 int ret;
4709
4710 dev = &rbd_dev->dev;
4711 dev->bus = &rbd_bus_type;
4712 dev->type = &rbd_device_type;
4713 dev->parent = &rbd_root_dev;
4714 dev->release = rbd_dev_device_release;
4715 dev_set_name(dev, "%d", rbd_dev->dev_id);
4716 ret = device_register(dev);
4717
4718 return ret;
4719}
4720
4721static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4722{
4723 device_unregister(&rbd_dev->dev);
4724}
4725
4726/* 4731/*
4727 * Get a unique rbd identifier for the given new rbd_dev, and add 4732 * Get a unique rbd identifier for the given new rbd_dev, and add
4728 * the rbd_dev to the global list. 4733 * the rbd_dev to the global list.
@@ -5225,7 +5230,8 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5225 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 5230 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5226 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); 5231 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5227 5232
5228 ret = rbd_bus_add_dev(rbd_dev); 5233 dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
5234 ret = device_add(&rbd_dev->dev);
5229 if (ret) 5235 if (ret)
5230 goto err_out_mapping; 5236 goto err_out_mapping;
5231 5237
@@ -5248,8 +5254,6 @@ err_out_blkdev:
5248 unregister_blkdev(rbd_dev->major, rbd_dev->name); 5254 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5249err_out_id: 5255err_out_id:
5250 rbd_dev_id_put(rbd_dev); 5256 rbd_dev_id_put(rbd_dev);
5251 rbd_dev_mapping_clear(rbd_dev);
5252
5253 return ret; 5257 return ret;
5254} 5258}
5255 5259
@@ -5397,7 +5401,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
5397 struct rbd_spec *spec = NULL; 5401 struct rbd_spec *spec = NULL;
5398 struct rbd_client *rbdc; 5402 struct rbd_client *rbdc;
5399 bool read_only; 5403 bool read_only;
5400 int rc = -ENOMEM; 5404 int rc;
5401 5405
5402 if (!try_module_get(THIS_MODULE)) 5406 if (!try_module_get(THIS_MODULE))
5403 return -ENODEV; 5407 return -ENODEV;
@@ -5405,7 +5409,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
5405 /* parse add command */ 5409 /* parse add command */
5406 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); 5410 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5407 if (rc < 0) 5411 if (rc < 0)
5408 goto err_out_module; 5412 goto out;
5409 5413
5410 rbdc = rbd_get_client(ceph_opts); 5414 rbdc = rbd_get_client(ceph_opts);
5411 if (IS_ERR(rbdc)) { 5415 if (IS_ERR(rbdc)) {
@@ -5432,8 +5436,10 @@ static ssize_t do_rbd_add(struct bus_type *bus,
5432 } 5436 }
5433 5437
5434 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts); 5438 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
5435 if (!rbd_dev) 5439 if (!rbd_dev) {
5440 rc = -ENOMEM;
5436 goto err_out_client; 5441 goto err_out_client;
5442 }
5437 rbdc = NULL; /* rbd_dev now owns this */ 5443 rbdc = NULL; /* rbd_dev now owns this */
5438 spec = NULL; /* rbd_dev now owns this */ 5444 spec = NULL; /* rbd_dev now owns this */
5439 rbd_opts = NULL; /* rbd_dev now owns this */ 5445 rbd_opts = NULL; /* rbd_dev now owns this */
@@ -5458,10 +5464,13 @@ static ssize_t do_rbd_add(struct bus_type *bus,
5458 */ 5464 */
5459 rbd_dev_header_unwatch_sync(rbd_dev); 5465 rbd_dev_header_unwatch_sync(rbd_dev);
5460 rbd_dev_image_release(rbd_dev); 5466 rbd_dev_image_release(rbd_dev);
5461 goto err_out_module; 5467 goto out;
5462 } 5468 }
5463 5469
5464 return count; 5470 rc = count;
5471out:
5472 module_put(THIS_MODULE);
5473 return rc;
5465 5474
5466err_out_rbd_dev: 5475err_out_rbd_dev:
5467 rbd_dev_destroy(rbd_dev); 5476 rbd_dev_destroy(rbd_dev);
@@ -5470,12 +5479,7 @@ err_out_client:
5470err_out_args: 5479err_out_args:
5471 rbd_spec_put(spec); 5480 rbd_spec_put(spec);
5472 kfree(rbd_opts); 5481 kfree(rbd_opts);
5473err_out_module: 5482 goto out;
5474 module_put(THIS_MODULE);
5475
5476 dout("Error adding device %s\n", buf);
5477
5478 return (ssize_t)rc;
5479} 5483}
5480 5484
5481static ssize_t rbd_add(struct bus_type *bus, 5485static ssize_t rbd_add(struct bus_type *bus,
@@ -5495,17 +5499,15 @@ static ssize_t rbd_add_single_major(struct bus_type *bus,
5495 return do_rbd_add(bus, buf, count); 5499 return do_rbd_add(bus, buf, count);
5496} 5500}
5497 5501
5498static void rbd_dev_device_release(struct device *dev) 5502static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5499{ 5503{
5500 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5501
5502 rbd_free_disk(rbd_dev); 5504 rbd_free_disk(rbd_dev);
5503 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 5505 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5506 device_del(&rbd_dev->dev);
5504 rbd_dev_mapping_clear(rbd_dev); 5507 rbd_dev_mapping_clear(rbd_dev);
5505 if (!single_major) 5508 if (!single_major)
5506 unregister_blkdev(rbd_dev->major, rbd_dev->name); 5509 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5507 rbd_dev_id_put(rbd_dev); 5510 rbd_dev_id_put(rbd_dev);
5508 rbd_dev_mapping_clear(rbd_dev);
5509} 5511}
5510 5512
5511static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) 5513static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
@@ -5590,9 +5592,8 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
5590 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting 5592 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5591 * in a potential use after free of rbd_dev->disk or rbd_dev. 5593 * in a potential use after free of rbd_dev->disk or rbd_dev.
5592 */ 5594 */
5593 rbd_bus_del_dev(rbd_dev); 5595 rbd_dev_device_release(rbd_dev);
5594 rbd_dev_image_release(rbd_dev); 5596 rbd_dev_image_release(rbd_dev);
5595 module_put(THIS_MODULE);
5596 5597
5597 return count; 5598 return count;
5598} 5599}
@@ -5663,10 +5664,8 @@ static int rbd_slab_init(void)
5663 if (rbd_segment_name_cache) 5664 if (rbd_segment_name_cache)
5664 return 0; 5665 return 0;
5665out_err: 5666out_err:
5666 if (rbd_obj_request_cache) { 5667 kmem_cache_destroy(rbd_obj_request_cache);
5667 kmem_cache_destroy(rbd_obj_request_cache); 5668 rbd_obj_request_cache = NULL;
5668 rbd_obj_request_cache = NULL;
5669 }
5670 5669
5671 kmem_cache_destroy(rbd_img_request_cache); 5670 kmem_cache_destroy(rbd_img_request_cache);
5672 rbd_img_request_cache = NULL; 5671 rbd_img_request_cache = NULL;
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index 834f9f3723fb..a4766ded1ba7 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -88,7 +88,7 @@ static uint16_t ceph_fscache_inode_get_key(const void *cookie_netfs_data,
88 const struct ceph_inode_info* ci = cookie_netfs_data; 88 const struct ceph_inode_info* ci = cookie_netfs_data;
89 uint16_t klen; 89 uint16_t klen;
90 90
91 /* use ceph virtual inode (id + snaphot) */ 91 /* use ceph virtual inode (id + snapshot) */
92 klen = sizeof(ci->i_vino); 92 klen = sizeof(ci->i_vino);
93 if (klen > maxbuf) 93 if (klen > maxbuf)
94 return 0; 94 return 0;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 27b566874bc1..c69e1253b47b 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1655,9 +1655,8 @@ retry_locked:
1655 !S_ISDIR(inode->i_mode) && /* ignore readdir cache */ 1655 !S_ISDIR(inode->i_mode) && /* ignore readdir cache */
1656 ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ 1656 ci->i_wrbuffer_ref == 0 && /* no dirty pages... */
1657 inode->i_data.nrpages && /* have cached pages */ 1657 inode->i_data.nrpages && /* have cached pages */
1658 (file_wanted == 0 || /* no open files */ 1658 (revoking & (CEPH_CAP_FILE_CACHE|
1659 (revoking & (CEPH_CAP_FILE_CACHE| 1659 CEPH_CAP_FILE_LAZYIO)) && /* or revoking cache */
1660 CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */
1661 !tried_invalidate) { 1660 !tried_invalidate) {
1662 dout("check_caps trying to invalidate on %p\n", inode); 1661 dout("check_caps trying to invalidate on %p\n", inode);
1663 if (try_nonblocking_invalidate(inode) < 0) { 1662 if (try_nonblocking_invalidate(inode) < 0) {
@@ -1971,49 +1970,46 @@ out:
1971} 1970}
1972 1971
1973/* 1972/*
1974 * wait for any uncommitted directory operations to commit. 1973 * wait for any unsafe requests to complete.
1975 */ 1974 */
1976static int unsafe_dirop_wait(struct inode *inode) 1975static int unsafe_request_wait(struct inode *inode)
1977{ 1976{
1978 struct ceph_inode_info *ci = ceph_inode(inode); 1977 struct ceph_inode_info *ci = ceph_inode(inode);
1979 struct list_head *head = &ci->i_unsafe_dirops; 1978 struct ceph_mds_request *req1 = NULL, *req2 = NULL;
1980 struct ceph_mds_request *req; 1979 int ret, err = 0;
1981 u64 last_tid;
1982 int ret = 0;
1983
1984 if (!S_ISDIR(inode->i_mode))
1985 return 0;
1986 1980
1987 spin_lock(&ci->i_unsafe_lock); 1981 spin_lock(&ci->i_unsafe_lock);
1988 if (list_empty(head)) 1982 if (S_ISDIR(inode->i_mode) && !list_empty(&ci->i_unsafe_dirops)) {
1989 goto out; 1983 req1 = list_last_entry(&ci->i_unsafe_dirops,
1990 1984 struct ceph_mds_request,
1991 req = list_last_entry(head, struct ceph_mds_request, 1985 r_unsafe_dir_item);
1992 r_unsafe_dir_item); 1986 ceph_mdsc_get_request(req1);
1993 last_tid = req->r_tid; 1987 }
1994 1988 if (!list_empty(&ci->i_unsafe_iops)) {
1995 do { 1989 req2 = list_last_entry(&ci->i_unsafe_iops,
1996 ceph_mdsc_get_request(req); 1990 struct ceph_mds_request,
1997 spin_unlock(&ci->i_unsafe_lock); 1991 r_unsafe_target_item);
1992 ceph_mdsc_get_request(req2);
1993 }
1994 spin_unlock(&ci->i_unsafe_lock);
1998 1995
1999 dout("unsafe_dirop_wait %p wait on tid %llu (until %llu)\n", 1996 dout("unsafe_requeset_wait %p wait on tid %llu %llu\n",
2000 inode, req->r_tid, last_tid); 1997 inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL);
2001 ret = !wait_for_completion_timeout(&req->r_safe_completion, 1998 if (req1) {
2002 ceph_timeout_jiffies(req->r_timeout)); 1999 ret = !wait_for_completion_timeout(&req1->r_safe_completion,
2000 ceph_timeout_jiffies(req1->r_timeout));
2003 if (ret) 2001 if (ret)
2004 ret = -EIO; /* timed out */ 2002 err = -EIO;
2005 2003 ceph_mdsc_put_request(req1);
2006 ceph_mdsc_put_request(req); 2004 }
2007 2005 if (req2) {
2008 spin_lock(&ci->i_unsafe_lock); 2006 ret = !wait_for_completion_timeout(&req2->r_safe_completion,
2009 if (ret || list_empty(head)) 2007 ceph_timeout_jiffies(req2->r_timeout));
2010 break; 2008 if (ret)
2011 req = list_first_entry(head, struct ceph_mds_request, 2009 err = -EIO;
2012 r_unsafe_dir_item); 2010 ceph_mdsc_put_request(req2);
2013 } while (req->r_tid < last_tid); 2011 }
2014out: 2012 return err;
2015 spin_unlock(&ci->i_unsafe_lock);
2016 return ret;
2017} 2013}
2018 2014
2019int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) 2015int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
@@ -2039,7 +2035,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
2039 dirty = try_flush_caps(inode, &flush_tid); 2035 dirty = try_flush_caps(inode, &flush_tid);
2040 dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); 2036 dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
2041 2037
2042 ret = unsafe_dirop_wait(inode); 2038 ret = unsafe_request_wait(inode);
2043 2039
2044 /* 2040 /*
2045 * only wait on non-file metadata writeback (the mds 2041 * only wait on non-file metadata writeback (the mds
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 0c62868b5c56..3c68e6aee2f0 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -34,6 +34,74 @@
34 * need to wait for MDS acknowledgement. 34 * need to wait for MDS acknowledgement.
35 */ 35 */
36 36
37/*
38 * Calculate the length sum of direct io vectors that can
39 * be combined into one page vector.
40 */
41static size_t dio_get_pagev_size(const struct iov_iter *it)
42{
43 const struct iovec *iov = it->iov;
44 const struct iovec *iovend = iov + it->nr_segs;
45 size_t size;
46
47 size = iov->iov_len - it->iov_offset;
48 /*
49 * An iov can be page vectored when both the current tail
50 * and the next base are page aligned.
51 */
52 while (PAGE_ALIGNED((iov->iov_base + iov->iov_len)) &&
53 (++iov < iovend && PAGE_ALIGNED((iov->iov_base)))) {
54 size += iov->iov_len;
55 }
56 dout("dio_get_pagevlen len = %zu\n", size);
57 return size;
58}
59
60/*
61 * Allocate a page vector based on (@it, @nbytes).
62 * The return value is the tuple describing a page vector,
63 * that is (@pages, @page_align, @num_pages).
64 */
65static struct page **
66dio_get_pages_alloc(const struct iov_iter *it, size_t nbytes,
67 size_t *page_align, int *num_pages)
68{
69 struct iov_iter tmp_it = *it;
70 size_t align;
71 struct page **pages;
72 int ret = 0, idx, npages;
73
74 align = (unsigned long)(it->iov->iov_base + it->iov_offset) &
75 (PAGE_SIZE - 1);
76 npages = calc_pages_for(align, nbytes);
77 pages = kmalloc(sizeof(*pages) * npages, GFP_KERNEL);
78 if (!pages) {
79 pages = vmalloc(sizeof(*pages) * npages);
80 if (!pages)
81 return ERR_PTR(-ENOMEM);
82 }
83
84 for (idx = 0; idx < npages; ) {
85 size_t start;
86 ret = iov_iter_get_pages(&tmp_it, pages + idx, nbytes,
87 npages - idx, &start);
88 if (ret < 0)
89 goto fail;
90
91 iov_iter_advance(&tmp_it, ret);
92 nbytes -= ret;
93 idx += (ret + start + PAGE_SIZE - 1) / PAGE_SIZE;
94 }
95
96 BUG_ON(nbytes != 0);
97 *num_pages = npages;
98 *page_align = align;
99 dout("dio_get_pages_alloc: got %d pages align %zu\n", npages, align);
100 return pages;
101fail:
102 ceph_put_page_vector(pages, idx, false);
103 return ERR_PTR(ret);
104}
37 105
38/* 106/*
39 * Prepare an open request. Preallocate ceph_cap to avoid an 107 * Prepare an open request. Preallocate ceph_cap to avoid an
@@ -458,11 +526,10 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
458 size_t start; 526 size_t start;
459 ssize_t n; 527 ssize_t n;
460 528
461 n = iov_iter_get_pages_alloc(i, &pages, INT_MAX, &start); 529 n = dio_get_pagev_size(i);
462 if (n < 0) 530 pages = dio_get_pages_alloc(i, n, &start, &num_pages);
463 return n; 531 if (IS_ERR(pages))
464 532 return PTR_ERR(pages);
465 num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE;
466 533
467 ret = striped_read(inode, off, n, 534 ret = striped_read(inode, off, n,
468 pages, num_pages, checkeof, 535 pages, num_pages, checkeof,
@@ -592,7 +659,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
592 CEPH_OSD_FLAG_WRITE; 659 CEPH_OSD_FLAG_WRITE;
593 660
594 while (iov_iter_count(from) > 0) { 661 while (iov_iter_count(from) > 0) {
595 u64 len = iov_iter_single_seg_count(from); 662 u64 len = dio_get_pagev_size(from);
596 size_t start; 663 size_t start;
597 ssize_t n; 664 ssize_t n;
598 665
@@ -611,14 +678,14 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
611 678
612 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); 679 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
613 680
614 n = iov_iter_get_pages_alloc(from, &pages, len, &start); 681 n = len;
615 if (unlikely(n < 0)) { 682 pages = dio_get_pages_alloc(from, len, &start, &num_pages);
616 ret = n; 683 if (IS_ERR(pages)) {
617 ceph_osdc_put_request(req); 684 ceph_osdc_put_request(req);
685 ret = PTR_ERR(pages);
618 break; 686 break;
619 } 687 }
620 688
621 num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE;
622 /* 689 /*
623 * throw out any page cache pages in this range. this 690 * throw out any page cache pages in this range. this
624 * may block. 691 * may block.
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 96d2bd829902..498dcfa2dcdb 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -452,6 +452,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
452 452
453 INIT_LIST_HEAD(&ci->i_unsafe_writes); 453 INIT_LIST_HEAD(&ci->i_unsafe_writes);
454 INIT_LIST_HEAD(&ci->i_unsafe_dirops); 454 INIT_LIST_HEAD(&ci->i_unsafe_dirops);
455 INIT_LIST_HEAD(&ci->i_unsafe_iops);
455 spin_lock_init(&ci->i_unsafe_lock); 456 spin_lock_init(&ci->i_unsafe_lock);
456 457
457 ci->i_snap_realm = NULL; 458 ci->i_snap_realm = NULL;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 51cb02da75d9..e7b130a637f9 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -633,13 +633,8 @@ static void __register_request(struct ceph_mds_client *mdsc,
633 mdsc->oldest_tid = req->r_tid; 633 mdsc->oldest_tid = req->r_tid;
634 634
635 if (dir) { 635 if (dir) {
636 struct ceph_inode_info *ci = ceph_inode(dir);
637
638 ihold(dir); 636 ihold(dir);
639 spin_lock(&ci->i_unsafe_lock);
640 req->r_unsafe_dir = dir; 637 req->r_unsafe_dir = dir;
641 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
642 spin_unlock(&ci->i_unsafe_lock);
643 } 638 }
644} 639}
645 640
@@ -665,13 +660,20 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
665 rb_erase(&req->r_node, &mdsc->request_tree); 660 rb_erase(&req->r_node, &mdsc->request_tree);
666 RB_CLEAR_NODE(&req->r_node); 661 RB_CLEAR_NODE(&req->r_node);
667 662
668 if (req->r_unsafe_dir) { 663 if (req->r_unsafe_dir && req->r_got_unsafe) {
669 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 664 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
670
671 spin_lock(&ci->i_unsafe_lock); 665 spin_lock(&ci->i_unsafe_lock);
672 list_del_init(&req->r_unsafe_dir_item); 666 list_del_init(&req->r_unsafe_dir_item);
673 spin_unlock(&ci->i_unsafe_lock); 667 spin_unlock(&ci->i_unsafe_lock);
668 }
669 if (req->r_target_inode && req->r_got_unsafe) {
670 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
671 spin_lock(&ci->i_unsafe_lock);
672 list_del_init(&req->r_unsafe_target_item);
673 spin_unlock(&ci->i_unsafe_lock);
674 }
674 675
676 if (req->r_unsafe_dir) {
675 iput(req->r_unsafe_dir); 677 iput(req->r_unsafe_dir);
676 req->r_unsafe_dir = NULL; 678 req->r_unsafe_dir = NULL;
677 } 679 }
@@ -1430,6 +1432,13 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1430 if ((used | wanted) & CEPH_CAP_ANY_WR) 1432 if ((used | wanted) & CEPH_CAP_ANY_WR)
1431 goto out; 1433 goto out;
1432 } 1434 }
1435 /* The inode has cached pages, but it's no longer used.
1436 * we can safely drop it */
1437 if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1438 !(oissued & CEPH_CAP_FILE_CACHE)) {
1439 used = 0;
1440 oissued = 0;
1441 }
1433 if ((used | wanted) & ~oissued & mine) 1442 if ((used | wanted) & ~oissued & mine)
1434 goto out; /* we need these caps */ 1443 goto out; /* we need these caps */
1435 1444
@@ -1438,7 +1447,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1438 /* we aren't the only cap.. just remove us */ 1447 /* we aren't the only cap.. just remove us */
1439 __ceph_remove_cap(cap, true); 1448 __ceph_remove_cap(cap, true);
1440 } else { 1449 } else {
1441 /* try to drop referring dentries */ 1450 /* try dropping referring dentries */
1442 spin_unlock(&ci->i_ceph_lock); 1451 spin_unlock(&ci->i_ceph_lock);
1443 d_prune_aliases(inode); 1452 d_prune_aliases(inode);
1444 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1453 dout("trim_caps_cb %p cap %p pruned, count now %d\n",
@@ -1704,6 +1713,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1704 req->r_started = jiffies; 1713 req->r_started = jiffies;
1705 req->r_resend_mds = -1; 1714 req->r_resend_mds = -1;
1706 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 1715 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1716 INIT_LIST_HEAD(&req->r_unsafe_target_item);
1707 req->r_fmode = -1; 1717 req->r_fmode = -1;
1708 kref_init(&req->r_kref); 1718 kref_init(&req->r_kref);
1709 INIT_LIST_HEAD(&req->r_wait); 1719 INIT_LIST_HEAD(&req->r_wait);
@@ -1935,7 +1945,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1935 1945
1936 len = sizeof(*head) + 1946 len = sizeof(*head) +
1937 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 1947 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
1938 sizeof(struct timespec); 1948 sizeof(struct ceph_timespec);
1939 1949
1940 /* calculate (max) length for cap releases */ 1950 /* calculate (max) length for cap releases */
1941 len += sizeof(struct ceph_mds_request_release) * 1951 len += sizeof(struct ceph_mds_request_release) *
@@ -2477,6 +2487,14 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2477 } else { 2487 } else {
2478 req->r_got_unsafe = true; 2488 req->r_got_unsafe = true;
2479 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 2489 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2490 if (req->r_unsafe_dir) {
2491 struct ceph_inode_info *ci =
2492 ceph_inode(req->r_unsafe_dir);
2493 spin_lock(&ci->i_unsafe_lock);
2494 list_add_tail(&req->r_unsafe_dir_item,
2495 &ci->i_unsafe_dirops);
2496 spin_unlock(&ci->i_unsafe_lock);
2497 }
2480 } 2498 }
2481 2499
2482 dout("handle_reply tid %lld result %d\n", tid, result); 2500 dout("handle_reply tid %lld result %d\n", tid, result);
@@ -2518,6 +2536,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2518 up_read(&mdsc->snap_rwsem); 2536 up_read(&mdsc->snap_rwsem);
2519 if (realm) 2537 if (realm)
2520 ceph_put_snap_realm(mdsc, realm); 2538 ceph_put_snap_realm(mdsc, realm);
2539
2540 if (err == 0 && req->r_got_unsafe && req->r_target_inode) {
2541 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
2542 spin_lock(&ci->i_unsafe_lock);
2543 list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
2544 spin_unlock(&ci->i_unsafe_lock);
2545 }
2521out_err: 2546out_err:
2522 mutex_lock(&mdsc->mutex); 2547 mutex_lock(&mdsc->mutex);
2523 if (!req->r_aborted) { 2548 if (!req->r_aborted) {
@@ -3917,17 +3942,19 @@ static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
3917 return msg; 3942 return msg;
3918} 3943}
3919 3944
3920static int sign_message(struct ceph_connection *con, struct ceph_msg *msg) 3945static int mds_sign_message(struct ceph_msg *msg)
3921{ 3946{
3922 struct ceph_mds_session *s = con->private; 3947 struct ceph_mds_session *s = msg->con->private;
3923 struct ceph_auth_handshake *auth = &s->s_auth; 3948 struct ceph_auth_handshake *auth = &s->s_auth;
3949
3924 return ceph_auth_sign_message(auth, msg); 3950 return ceph_auth_sign_message(auth, msg);
3925} 3951}
3926 3952
3927static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg) 3953static int mds_check_message_signature(struct ceph_msg *msg)
3928{ 3954{
3929 struct ceph_mds_session *s = con->private; 3955 struct ceph_mds_session *s = msg->con->private;
3930 struct ceph_auth_handshake *auth = &s->s_auth; 3956 struct ceph_auth_handshake *auth = &s->s_auth;
3957
3931 return ceph_auth_check_message_signature(auth, msg); 3958 return ceph_auth_check_message_signature(auth, msg);
3932} 3959}
3933 3960
@@ -3940,8 +3967,8 @@ static const struct ceph_connection_operations mds_con_ops = {
3940 .invalidate_authorizer = invalidate_authorizer, 3967 .invalidate_authorizer = invalidate_authorizer,
3941 .peer_reset = peer_reset, 3968 .peer_reset = peer_reset,
3942 .alloc_msg = mds_alloc_msg, 3969 .alloc_msg = mds_alloc_msg,
3943 .sign_message = sign_message, 3970 .sign_message = mds_sign_message,
3944 .check_message_signature = check_message_signature, 3971 .check_message_signature = mds_check_message_signature,
3945}; 3972};
3946 3973
3947/* eof */ 3974/* eof */
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index f575eafe2261..ccf11ef0ca87 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -236,6 +236,9 @@ struct ceph_mds_request {
236 struct inode *r_unsafe_dir; 236 struct inode *r_unsafe_dir;
237 struct list_head r_unsafe_dir_item; 237 struct list_head r_unsafe_dir_item;
238 238
239 /* unsafe requests that modify the target inode */
240 struct list_head r_unsafe_target_item;
241
239 struct ceph_mds_session *r_session; 242 struct ceph_mds_session *r_session;
240 243
241 int r_attempts; /* resend attempts */ 244 int r_attempts; /* resend attempts */
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 2f2460d23a06..75b7d125ce66 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -342,6 +342,7 @@ struct ceph_inode_info {
342 342
343 struct list_head i_unsafe_writes; /* uncommitted sync writes */ 343 struct list_head i_unsafe_writes; /* uncommitted sync writes */
344 struct list_head i_unsafe_dirops; /* uncommitted mds dir ops */ 344 struct list_head i_unsafe_dirops; /* uncommitted mds dir ops */
345 struct list_head i_unsafe_iops; /* uncommitted mds inode ops */
345 spinlock_t i_unsafe_lock; 346 spinlock_t i_unsafe_lock;
346 347
347 struct ceph_snap_realm *i_snap_realm; /* snap realm (if caps) */ 348 struct ceph_snap_realm *i_snap_realm; /* snap realm (if caps) */
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 397c5cd09794..3e3799cdc6e6 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -29,8 +29,9 @@
29#define CEPH_OPT_NOSHARE (1<<1) /* don't share client with other sbs */ 29#define CEPH_OPT_NOSHARE (1<<1) /* don't share client with other sbs */
30#define CEPH_OPT_MYIP (1<<2) /* specified my ip */ 30#define CEPH_OPT_MYIP (1<<2) /* specified my ip */
31#define CEPH_OPT_NOCRC (1<<3) /* no data crc on writes */ 31#define CEPH_OPT_NOCRC (1<<3) /* no data crc on writes */
32#define CEPH_OPT_NOMSGAUTH (1<<4) /* not require cephx message signature */ 32#define CEPH_OPT_NOMSGAUTH (1<<4) /* don't require msg signing feat */
33#define CEPH_OPT_TCP_NODELAY (1<<5) /* TCP_NODELAY on TCP sockets */ 33#define CEPH_OPT_TCP_NODELAY (1<<5) /* TCP_NODELAY on TCP sockets */
34#define CEPH_OPT_NOMSGSIGN (1<<6) /* don't sign msgs */
34 35
35#define CEPH_OPT_DEFAULT (CEPH_OPT_TCP_NODELAY) 36#define CEPH_OPT_DEFAULT (CEPH_OPT_TCP_NODELAY)
36 37
@@ -137,6 +138,7 @@ struct ceph_client {
137#endif 138#endif
138}; 139};
139 140
141#define from_msgr(ms) container_of(ms, struct ceph_client, msgr)
140 142
141 143
142/* 144/*
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index b2371d9b51fa..71b1d6cdcb5d 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -43,10 +43,9 @@ struct ceph_connection_operations {
43 struct ceph_msg * (*alloc_msg) (struct ceph_connection *con, 43 struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
44 struct ceph_msg_header *hdr, 44 struct ceph_msg_header *hdr,
45 int *skip); 45 int *skip);
46 int (*sign_message) (struct ceph_connection *con, struct ceph_msg *msg);
47 46
48 int (*check_message_signature) (struct ceph_connection *con, 47 int (*sign_message) (struct ceph_msg *msg);
49 struct ceph_msg *msg); 48 int (*check_message_signature) (struct ceph_msg *msg);
50}; 49};
51 50
52/* use format string %s%d */ 51/* use format string %s%d */
@@ -58,8 +57,6 @@ struct ceph_messenger {
58 57
59 atomic_t stopping; 58 atomic_t stopping;
60 possible_net_t net; 59 possible_net_t net;
61 bool nocrc;
62 bool tcp_nodelay;
63 60
64 /* 61 /*
65 * the global_seq counts connections i (attempt to) initiate 62 * the global_seq counts connections i (attempt to) initiate
@@ -67,9 +64,6 @@ struct ceph_messenger {
67 */ 64 */
68 u32 global_seq; 65 u32 global_seq;
69 spinlock_t global_seq_lock; 66 spinlock_t global_seq_lock;
70
71 u64 supported_features;
72 u64 required_features;
73}; 67};
74 68
75enum ceph_msg_data_type { 69enum ceph_msg_data_type {
@@ -268,11 +262,7 @@ extern void ceph_msgr_exit(void);
268extern void ceph_msgr_flush(void); 262extern void ceph_msgr_flush(void);
269 263
270extern void ceph_messenger_init(struct ceph_messenger *msgr, 264extern void ceph_messenger_init(struct ceph_messenger *msgr,
271 struct ceph_entity_addr *myaddr, 265 struct ceph_entity_addr *myaddr);
272 u64 supported_features,
273 u64 required_features,
274 bool nocrc,
275 bool tcp_nodelay);
276extern void ceph_messenger_fini(struct ceph_messenger *msgr); 266extern void ceph_messenger_fini(struct ceph_messenger *msgr);
277 267
278extern void ceph_con_init(struct ceph_connection *con, void *private, 268extern void ceph_con_init(struct ceph_connection *con, void *private,
diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c
index ba6eb17226da..10d87753ed87 100644
--- a/net/ceph/auth_x.c
+++ b/net/ceph/auth_x.c
@@ -8,6 +8,7 @@
8 8
9#include <linux/ceph/decode.h> 9#include <linux/ceph/decode.h>
10#include <linux/ceph/auth.h> 10#include <linux/ceph/auth.h>
11#include <linux/ceph/libceph.h>
11#include <linux/ceph/messenger.h> 12#include <linux/ceph/messenger.h>
12 13
13#include "crypto.h" 14#include "crypto.h"
@@ -279,6 +280,15 @@ bad:
279 return -EINVAL; 280 return -EINVAL;
280} 281}
281 282
283static void ceph_x_authorizer_cleanup(struct ceph_x_authorizer *au)
284{
285 ceph_crypto_key_destroy(&au->session_key);
286 if (au->buf) {
287 ceph_buffer_put(au->buf);
288 au->buf = NULL;
289 }
290}
291
282static int ceph_x_build_authorizer(struct ceph_auth_client *ac, 292static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
283 struct ceph_x_ticket_handler *th, 293 struct ceph_x_ticket_handler *th,
284 struct ceph_x_authorizer *au) 294 struct ceph_x_authorizer *au)
@@ -297,7 +307,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
297 ceph_crypto_key_destroy(&au->session_key); 307 ceph_crypto_key_destroy(&au->session_key);
298 ret = ceph_crypto_key_clone(&au->session_key, &th->session_key); 308 ret = ceph_crypto_key_clone(&au->session_key, &th->session_key);
299 if (ret) 309 if (ret)
300 return ret; 310 goto out_au;
301 311
302 maxlen = sizeof(*msg_a) + sizeof(msg_b) + 312 maxlen = sizeof(*msg_a) + sizeof(msg_b) +
303 ceph_x_encrypt_buflen(ticket_blob_len); 313 ceph_x_encrypt_buflen(ticket_blob_len);
@@ -309,8 +319,8 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
309 if (!au->buf) { 319 if (!au->buf) {
310 au->buf = ceph_buffer_new(maxlen, GFP_NOFS); 320 au->buf = ceph_buffer_new(maxlen, GFP_NOFS);
311 if (!au->buf) { 321 if (!au->buf) {
312 ceph_crypto_key_destroy(&au->session_key); 322 ret = -ENOMEM;
313 return -ENOMEM; 323 goto out_au;
314 } 324 }
315 } 325 }
316 au->service = th->service; 326 au->service = th->service;
@@ -340,7 +350,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
340 ret = ceph_x_encrypt(&au->session_key, &msg_b, sizeof(msg_b), 350 ret = ceph_x_encrypt(&au->session_key, &msg_b, sizeof(msg_b),
341 p, end - p); 351 p, end - p);
342 if (ret < 0) 352 if (ret < 0)
343 goto out_buf; 353 goto out_au;
344 p += ret; 354 p += ret;
345 au->buf->vec.iov_len = p - au->buf->vec.iov_base; 355 au->buf->vec.iov_len = p - au->buf->vec.iov_base;
346 dout(" built authorizer nonce %llx len %d\n", au->nonce, 356 dout(" built authorizer nonce %llx len %d\n", au->nonce,
@@ -348,9 +358,8 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
348 BUG_ON(au->buf->vec.iov_len > maxlen); 358 BUG_ON(au->buf->vec.iov_len > maxlen);
349 return 0; 359 return 0;
350 360
351out_buf: 361out_au:
352 ceph_buffer_put(au->buf); 362 ceph_x_authorizer_cleanup(au);
353 au->buf = NULL;
354 return ret; 363 return ret;
355} 364}
356 365
@@ -624,8 +633,7 @@ static void ceph_x_destroy_authorizer(struct ceph_auth_client *ac,
624{ 633{
625 struct ceph_x_authorizer *au = (void *)a; 634 struct ceph_x_authorizer *au = (void *)a;
626 635
627 ceph_crypto_key_destroy(&au->session_key); 636 ceph_x_authorizer_cleanup(au);
628 ceph_buffer_put(au->buf);
629 kfree(au); 637 kfree(au);
630} 638}
631 639
@@ -653,8 +661,7 @@ static void ceph_x_destroy(struct ceph_auth_client *ac)
653 remove_ticket_handler(ac, th); 661 remove_ticket_handler(ac, th);
654 } 662 }
655 663
656 if (xi->auth_authorizer.buf) 664 ceph_x_authorizer_cleanup(&xi->auth_authorizer);
657 ceph_buffer_put(xi->auth_authorizer.buf);
658 665
659 kfree(ac->private); 666 kfree(ac->private);
660 ac->private = NULL; 667 ac->private = NULL;
@@ -691,8 +698,10 @@ static int ceph_x_sign_message(struct ceph_auth_handshake *auth,
691 struct ceph_msg *msg) 698 struct ceph_msg *msg)
692{ 699{
693 int ret; 700 int ret;
694 if (!auth->authorizer) 701
702 if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN))
695 return 0; 703 return 0;
704
696 ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer, 705 ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
697 msg, &msg->footer.sig); 706 msg, &msg->footer.sig);
698 if (ret < 0) 707 if (ret < 0)
@@ -707,8 +716,9 @@ static int ceph_x_check_message_signature(struct ceph_auth_handshake *auth,
707 __le64 sig_check; 716 __le64 sig_check;
708 int ret; 717 int ret;
709 718
710 if (!auth->authorizer) 719 if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN))
711 return 0; 720 return 0;
721
712 ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer, 722 ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
713 msg, &sig_check); 723 msg, &sig_check);
714 if (ret < 0) 724 if (ret < 0)
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 78f098a20796..bcbec33c6a14 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -245,6 +245,8 @@ enum {
245 Opt_nocrc, 245 Opt_nocrc,
246 Opt_cephx_require_signatures, 246 Opt_cephx_require_signatures,
247 Opt_nocephx_require_signatures, 247 Opt_nocephx_require_signatures,
248 Opt_cephx_sign_messages,
249 Opt_nocephx_sign_messages,
248 Opt_tcp_nodelay, 250 Opt_tcp_nodelay,
249 Opt_notcp_nodelay, 251 Opt_notcp_nodelay,
250}; 252};
@@ -267,6 +269,8 @@ static match_table_t opt_tokens = {
267 {Opt_nocrc, "nocrc"}, 269 {Opt_nocrc, "nocrc"},
268 {Opt_cephx_require_signatures, "cephx_require_signatures"}, 270 {Opt_cephx_require_signatures, "cephx_require_signatures"},
269 {Opt_nocephx_require_signatures, "nocephx_require_signatures"}, 271 {Opt_nocephx_require_signatures, "nocephx_require_signatures"},
272 {Opt_cephx_sign_messages, "cephx_sign_messages"},
273 {Opt_nocephx_sign_messages, "nocephx_sign_messages"},
270 {Opt_tcp_nodelay, "tcp_nodelay"}, 274 {Opt_tcp_nodelay, "tcp_nodelay"},
271 {Opt_notcp_nodelay, "notcp_nodelay"}, 275 {Opt_notcp_nodelay, "notcp_nodelay"},
272 {-1, NULL} 276 {-1, NULL}
@@ -491,6 +495,12 @@ ceph_parse_options(char *options, const char *dev_name,
491 case Opt_nocephx_require_signatures: 495 case Opt_nocephx_require_signatures:
492 opt->flags |= CEPH_OPT_NOMSGAUTH; 496 opt->flags |= CEPH_OPT_NOMSGAUTH;
493 break; 497 break;
498 case Opt_cephx_sign_messages:
499 opt->flags &= ~CEPH_OPT_NOMSGSIGN;
500 break;
501 case Opt_nocephx_sign_messages:
502 opt->flags |= CEPH_OPT_NOMSGSIGN;
503 break;
494 504
495 case Opt_tcp_nodelay: 505 case Opt_tcp_nodelay:
496 opt->flags |= CEPH_OPT_TCP_NODELAY; 506 opt->flags |= CEPH_OPT_TCP_NODELAY;
@@ -534,6 +544,8 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client)
534 seq_puts(m, "nocrc,"); 544 seq_puts(m, "nocrc,");
535 if (opt->flags & CEPH_OPT_NOMSGAUTH) 545 if (opt->flags & CEPH_OPT_NOMSGAUTH)
536 seq_puts(m, "nocephx_require_signatures,"); 546 seq_puts(m, "nocephx_require_signatures,");
547 if (opt->flags & CEPH_OPT_NOMSGSIGN)
548 seq_puts(m, "nocephx_sign_messages,");
537 if ((opt->flags & CEPH_OPT_TCP_NODELAY) == 0) 549 if ((opt->flags & CEPH_OPT_TCP_NODELAY) == 0)
538 seq_puts(m, "notcp_nodelay,"); 550 seq_puts(m, "notcp_nodelay,");
539 551
@@ -596,11 +608,7 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
596 if (ceph_test_opt(client, MYIP)) 608 if (ceph_test_opt(client, MYIP))
597 myaddr = &client->options->my_addr; 609 myaddr = &client->options->my_addr;
598 610
599 ceph_messenger_init(&client->msgr, myaddr, 611 ceph_messenger_init(&client->msgr, myaddr);
600 client->supported_features,
601 client->required_features,
602 ceph_test_opt(client, NOCRC),
603 ceph_test_opt(client, TCP_NODELAY));
604 612
605 /* subsystems */ 613 /* subsystems */
606 err = ceph_monc_init(&client->monc, client); 614 err = ceph_monc_init(&client->monc, client);
diff --git a/net/ceph/crypto.h b/net/ceph/crypto.h
index d1498224c49d..2e9cab09f37b 100644
--- a/net/ceph/crypto.h
+++ b/net/ceph/crypto.h
@@ -16,8 +16,10 @@ struct ceph_crypto_key {
16 16
17static inline void ceph_crypto_key_destroy(struct ceph_crypto_key *key) 17static inline void ceph_crypto_key_destroy(struct ceph_crypto_key *key)
18{ 18{
19 if (key) 19 if (key) {
20 kfree(key->key); 20 kfree(key->key);
21 key->key = NULL;
22 }
21} 23}
22 24
23int ceph_crypto_key_clone(struct ceph_crypto_key *dst, 25int ceph_crypto_key_clone(struct ceph_crypto_key *dst,
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index b9b0e3b5da49..9981039ef4ff 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -509,7 +509,7 @@ static int ceph_tcp_connect(struct ceph_connection *con)
509 return ret; 509 return ret;
510 } 510 }
511 511
512 if (con->msgr->tcp_nodelay) { 512 if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) {
513 int optval = 1; 513 int optval = 1;
514 514
515 ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, 515 ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY,
@@ -637,9 +637,6 @@ static int con_close_socket(struct ceph_connection *con)
637static void ceph_msg_remove(struct ceph_msg *msg) 637static void ceph_msg_remove(struct ceph_msg *msg)
638{ 638{
639 list_del_init(&msg->list_head); 639 list_del_init(&msg->list_head);
640 BUG_ON(msg->con == NULL);
641 msg->con->ops->put(msg->con);
642 msg->con = NULL;
643 640
644 ceph_msg_put(msg); 641 ceph_msg_put(msg);
645} 642}
@@ -662,15 +659,14 @@ static void reset_connection(struct ceph_connection *con)
662 659
663 if (con->in_msg) { 660 if (con->in_msg) {
664 BUG_ON(con->in_msg->con != con); 661 BUG_ON(con->in_msg->con != con);
665 con->in_msg->con = NULL;
666 ceph_msg_put(con->in_msg); 662 ceph_msg_put(con->in_msg);
667 con->in_msg = NULL; 663 con->in_msg = NULL;
668 con->ops->put(con);
669 } 664 }
670 665
671 con->connect_seq = 0; 666 con->connect_seq = 0;
672 con->out_seq = 0; 667 con->out_seq = 0;
673 if (con->out_msg) { 668 if (con->out_msg) {
669 BUG_ON(con->out_msg->con != con);
674 ceph_msg_put(con->out_msg); 670 ceph_msg_put(con->out_msg);
675 con->out_msg = NULL; 671 con->out_msg = NULL;
676 } 672 }
@@ -1205,7 +1201,7 @@ static void prepare_write_message_footer(struct ceph_connection *con)
1205 con->out_kvec[v].iov_base = &m->footer; 1201 con->out_kvec[v].iov_base = &m->footer;
1206 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { 1202 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
1207 if (con->ops->sign_message) 1203 if (con->ops->sign_message)
1208 con->ops->sign_message(con, m); 1204 con->ops->sign_message(m);
1209 else 1205 else
1210 m->footer.sig = 0; 1206 m->footer.sig = 0;
1211 con->out_kvec[v].iov_len = sizeof(m->footer); 1207 con->out_kvec[v].iov_len = sizeof(m->footer);
@@ -1432,7 +1428,8 @@ static int prepare_write_connect(struct ceph_connection *con)
1432 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 1428 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
1433 con->connect_seq, global_seq, proto); 1429 con->connect_seq, global_seq, proto);
1434 1430
1435 con->out_connect.features = cpu_to_le64(con->msgr->supported_features); 1431 con->out_connect.features =
1432 cpu_to_le64(from_msgr(con->msgr)->supported_features);
1436 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 1433 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
1437 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 1434 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
1438 con->out_connect.global_seq = cpu_to_le32(global_seq); 1435 con->out_connect.global_seq = cpu_to_le32(global_seq);
@@ -1527,7 +1524,7 @@ static int write_partial_message_data(struct ceph_connection *con)
1527{ 1524{
1528 struct ceph_msg *msg = con->out_msg; 1525 struct ceph_msg *msg = con->out_msg;
1529 struct ceph_msg_data_cursor *cursor = &msg->cursor; 1526 struct ceph_msg_data_cursor *cursor = &msg->cursor;
1530 bool do_datacrc = !con->msgr->nocrc; 1527 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1531 u32 crc; 1528 u32 crc;
1532 1529
1533 dout("%s %p msg %p\n", __func__, con, msg); 1530 dout("%s %p msg %p\n", __func__, con, msg);
@@ -1552,8 +1549,8 @@ static int write_partial_message_data(struct ceph_connection *con)
1552 bool need_crc; 1549 bool need_crc;
1553 int ret; 1550 int ret;
1554 1551
1555 page = ceph_msg_data_next(&msg->cursor, &page_offset, &length, 1552 page = ceph_msg_data_next(cursor, &page_offset, &length,
1556 &last_piece); 1553 &last_piece);
1557 ret = ceph_tcp_sendpage(con->sock, page, page_offset, 1554 ret = ceph_tcp_sendpage(con->sock, page, page_offset,
1558 length, !last_piece); 1555 length, !last_piece);
1559 if (ret <= 0) { 1556 if (ret <= 0) {
@@ -1564,7 +1561,7 @@ static int write_partial_message_data(struct ceph_connection *con)
1564 } 1561 }
1565 if (do_datacrc && cursor->need_crc) 1562 if (do_datacrc && cursor->need_crc)
1566 crc = ceph_crc32c_page(crc, page, page_offset, length); 1563 crc = ceph_crc32c_page(crc, page, page_offset, length);
1567 need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret); 1564 need_crc = ceph_msg_data_advance(cursor, (size_t)ret);
1568 } 1565 }
1569 1566
1570 dout("%s %p msg %p done\n", __func__, con, msg); 1567 dout("%s %p msg %p done\n", __func__, con, msg);
@@ -2005,8 +2002,8 @@ static int process_banner(struct ceph_connection *con)
2005 2002
2006static int process_connect(struct ceph_connection *con) 2003static int process_connect(struct ceph_connection *con)
2007{ 2004{
2008 u64 sup_feat = con->msgr->supported_features; 2005 u64 sup_feat = from_msgr(con->msgr)->supported_features;
2009 u64 req_feat = con->msgr->required_features; 2006 u64 req_feat = from_msgr(con->msgr)->required_features;
2010 u64 server_feat = ceph_sanitize_features( 2007 u64 server_feat = ceph_sanitize_features(
2011 le64_to_cpu(con->in_reply.features)); 2008 le64_to_cpu(con->in_reply.features));
2012 int ret; 2009 int ret;
@@ -2232,7 +2229,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
2232{ 2229{
2233 struct ceph_msg *msg = con->in_msg; 2230 struct ceph_msg *msg = con->in_msg;
2234 struct ceph_msg_data_cursor *cursor = &msg->cursor; 2231 struct ceph_msg_data_cursor *cursor = &msg->cursor;
2235 const bool do_datacrc = !con->msgr->nocrc; 2232 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
2236 struct page *page; 2233 struct page *page;
2237 size_t page_offset; 2234 size_t page_offset;
2238 size_t length; 2235 size_t length;
@@ -2246,8 +2243,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
2246 if (do_datacrc) 2243 if (do_datacrc)
2247 crc = con->in_data_crc; 2244 crc = con->in_data_crc;
2248 while (cursor->resid) { 2245 while (cursor->resid) {
2249 page = ceph_msg_data_next(&msg->cursor, &page_offset, &length, 2246 page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
2250 NULL);
2251 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); 2247 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
2252 if (ret <= 0) { 2248 if (ret <= 0) {
2253 if (do_datacrc) 2249 if (do_datacrc)
@@ -2258,7 +2254,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
2258 2254
2259 if (do_datacrc) 2255 if (do_datacrc)
2260 crc = ceph_crc32c_page(crc, page, page_offset, ret); 2256 crc = ceph_crc32c_page(crc, page, page_offset, ret);
2261 (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret); 2257 (void) ceph_msg_data_advance(cursor, (size_t)ret);
2262 } 2258 }
2263 if (do_datacrc) 2259 if (do_datacrc)
2264 con->in_data_crc = crc; 2260 con->in_data_crc = crc;
@@ -2278,7 +2274,7 @@ static int read_partial_message(struct ceph_connection *con)
2278 int end; 2274 int end;
2279 int ret; 2275 int ret;
2280 unsigned int front_len, middle_len, data_len; 2276 unsigned int front_len, middle_len, data_len;
2281 bool do_datacrc = !con->msgr->nocrc; 2277 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
2282 bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); 2278 bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
2283 u64 seq; 2279 u64 seq;
2284 u32 crc; 2280 u32 crc;
@@ -2423,7 +2419,7 @@ static int read_partial_message(struct ceph_connection *con)
2423 } 2419 }
2424 2420
2425 if (need_sign && con->ops->check_message_signature && 2421 if (need_sign && con->ops->check_message_signature &&
2426 con->ops->check_message_signature(con, m)) { 2422 con->ops->check_message_signature(m)) {
2427 pr_err("read_partial_message %p signature check failed\n", m); 2423 pr_err("read_partial_message %p signature check failed\n", m);
2428 return -EBADMSG; 2424 return -EBADMSG;
2429 } 2425 }
@@ -2438,13 +2434,10 @@ static int read_partial_message(struct ceph_connection *con)
2438 */ 2434 */
2439static void process_message(struct ceph_connection *con) 2435static void process_message(struct ceph_connection *con)
2440{ 2436{
2441 struct ceph_msg *msg; 2437 struct ceph_msg *msg = con->in_msg;
2442 2438
2443 BUG_ON(con->in_msg->con != con); 2439 BUG_ON(con->in_msg->con != con);
2444 con->in_msg->con = NULL;
2445 msg = con->in_msg;
2446 con->in_msg = NULL; 2440 con->in_msg = NULL;
2447 con->ops->put(con);
2448 2441
2449 /* if first message, set peer_name */ 2442 /* if first message, set peer_name */
2450 if (con->peer_name.type == 0) 2443 if (con->peer_name.type == 0)
@@ -2677,7 +2670,7 @@ more:
2677 if (ret <= 0) { 2670 if (ret <= 0) {
2678 switch (ret) { 2671 switch (ret) {
2679 case -EBADMSG: 2672 case -EBADMSG:
2680 con->error_msg = "bad crc"; 2673 con->error_msg = "bad crc/signature";
2681 /* fall through */ 2674 /* fall through */
2682 case -EBADE: 2675 case -EBADE:
2683 ret = -EIO; 2676 ret = -EIO;
@@ -2918,10 +2911,8 @@ static void con_fault(struct ceph_connection *con)
2918 2911
2919 if (con->in_msg) { 2912 if (con->in_msg) {
2920 BUG_ON(con->in_msg->con != con); 2913 BUG_ON(con->in_msg->con != con);
2921 con->in_msg->con = NULL;
2922 ceph_msg_put(con->in_msg); 2914 ceph_msg_put(con->in_msg);
2923 con->in_msg = NULL; 2915 con->in_msg = NULL;
2924 con->ops->put(con);
2925 } 2916 }
2926 2917
2927 /* Requeue anything that hasn't been acked */ 2918 /* Requeue anything that hasn't been acked */
@@ -2952,15 +2943,8 @@ static void con_fault(struct ceph_connection *con)
2952 * initialize a new messenger instance 2943 * initialize a new messenger instance
2953 */ 2944 */
2954void ceph_messenger_init(struct ceph_messenger *msgr, 2945void ceph_messenger_init(struct ceph_messenger *msgr,
2955 struct ceph_entity_addr *myaddr, 2946 struct ceph_entity_addr *myaddr)
2956 u64 supported_features,
2957 u64 required_features,
2958 bool nocrc,
2959 bool tcp_nodelay)
2960{ 2947{
2961 msgr->supported_features = supported_features;
2962 msgr->required_features = required_features;
2963
2964 spin_lock_init(&msgr->global_seq_lock); 2948 spin_lock_init(&msgr->global_seq_lock);
2965 2949
2966 if (myaddr) 2950 if (myaddr)
@@ -2970,8 +2954,6 @@ void ceph_messenger_init(struct ceph_messenger *msgr,
2970 msgr->inst.addr.type = 0; 2954 msgr->inst.addr.type = 0;
2971 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); 2955 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
2972 encode_my_addr(msgr); 2956 encode_my_addr(msgr);
2973 msgr->nocrc = nocrc;
2974 msgr->tcp_nodelay = tcp_nodelay;
2975 2957
2976 atomic_set(&msgr->stopping, 0); 2958 atomic_set(&msgr->stopping, 0);
2977 write_pnet(&msgr->net, get_net(current->nsproxy->net_ns)); 2959 write_pnet(&msgr->net, get_net(current->nsproxy->net_ns));
@@ -2986,6 +2968,15 @@ void ceph_messenger_fini(struct ceph_messenger *msgr)
2986} 2968}
2987EXPORT_SYMBOL(ceph_messenger_fini); 2969EXPORT_SYMBOL(ceph_messenger_fini);
2988 2970
2971static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
2972{
2973 if (msg->con)
2974 msg->con->ops->put(msg->con);
2975
2976 msg->con = con ? con->ops->get(con) : NULL;
2977 BUG_ON(msg->con != con);
2978}
2979
2989static void clear_standby(struct ceph_connection *con) 2980static void clear_standby(struct ceph_connection *con)
2990{ 2981{
2991 /* come back from STANDBY? */ 2982 /* come back from STANDBY? */
@@ -3017,9 +3008,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
3017 return; 3008 return;
3018 } 3009 }
3019 3010
3020 BUG_ON(msg->con != NULL); 3011 msg_con_set(msg, con);
3021 msg->con = con->ops->get(con);
3022 BUG_ON(msg->con == NULL);
3023 3012
3024 BUG_ON(!list_empty(&msg->list_head)); 3013 BUG_ON(!list_empty(&msg->list_head));
3025 list_add_tail(&msg->list_head, &con->out_queue); 3014 list_add_tail(&msg->list_head, &con->out_queue);
@@ -3047,16 +3036,15 @@ void ceph_msg_revoke(struct ceph_msg *msg)
3047{ 3036{
3048 struct ceph_connection *con = msg->con; 3037 struct ceph_connection *con = msg->con;
3049 3038
3050 if (!con) 3039 if (!con) {
3040 dout("%s msg %p null con\n", __func__, msg);
3051 return; /* Message not in our possession */ 3041 return; /* Message not in our possession */
3042 }
3052 3043
3053 mutex_lock(&con->mutex); 3044 mutex_lock(&con->mutex);
3054 if (!list_empty(&msg->list_head)) { 3045 if (!list_empty(&msg->list_head)) {
3055 dout("%s %p msg %p - was on queue\n", __func__, con, msg); 3046 dout("%s %p msg %p - was on queue\n", __func__, con, msg);
3056 list_del_init(&msg->list_head); 3047 list_del_init(&msg->list_head);
3057 BUG_ON(msg->con == NULL);
3058 msg->con->ops->put(msg->con);
3059 msg->con = NULL;
3060 msg->hdr.seq = 0; 3048 msg->hdr.seq = 0;
3061 3049
3062 ceph_msg_put(msg); 3050 ceph_msg_put(msg);
@@ -3080,16 +3068,13 @@ void ceph_msg_revoke(struct ceph_msg *msg)
3080 */ 3068 */
3081void ceph_msg_revoke_incoming(struct ceph_msg *msg) 3069void ceph_msg_revoke_incoming(struct ceph_msg *msg)
3082{ 3070{
3083 struct ceph_connection *con; 3071 struct ceph_connection *con = msg->con;
3084 3072
3085 BUG_ON(msg == NULL); 3073 if (!con) {
3086 if (!msg->con) {
3087 dout("%s msg %p null con\n", __func__, msg); 3074 dout("%s msg %p null con\n", __func__, msg);
3088
3089 return; /* Message not in our possession */ 3075 return; /* Message not in our possession */
3090 } 3076 }
3091 3077
3092 con = msg->con;
3093 mutex_lock(&con->mutex); 3078 mutex_lock(&con->mutex);
3094 if (con->in_msg == msg) { 3079 if (con->in_msg == msg) {
3095 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); 3080 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
@@ -3335,9 +3320,8 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
3335 } 3320 }
3336 if (msg) { 3321 if (msg) {
3337 BUG_ON(*skip); 3322 BUG_ON(*skip);
3323 msg_con_set(msg, con);
3338 con->in_msg = msg; 3324 con->in_msg = msg;
3339 con->in_msg->con = con->ops->get(con);
3340 BUG_ON(con->in_msg->con == NULL);
3341 } else { 3325 } else {
3342 /* 3326 /*
3343 * Null message pointer means either we should skip 3327 * Null message pointer means either we should skip
@@ -3384,6 +3368,8 @@ static void ceph_msg_release(struct kref *kref)
3384 dout("%s %p\n", __func__, m); 3368 dout("%s %p\n", __func__, m);
3385 WARN_ON(!list_empty(&m->list_head)); 3369 WARN_ON(!list_empty(&m->list_head));
3386 3370
3371 msg_con_set(m, NULL);
3372
3387 /* drop middle, data, if any */ 3373 /* drop middle, data, if any */
3388 if (m->middle) { 3374 if (m->middle) {
3389 ceph_buffer_put(m->middle); 3375 ceph_buffer_put(m->middle);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index f79ccac6699f..f8f235930d88 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -120,11 +120,13 @@ static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
120} 120}
121#endif /* CONFIG_BLOCK */ 121#endif /* CONFIG_BLOCK */
122 122
123#define osd_req_op_data(oreq, whch, typ, fld) \ 123#define osd_req_op_data(oreq, whch, typ, fld) \
124 ({ \ 124({ \
125 BUG_ON(whch >= (oreq)->r_num_ops); \ 125 struct ceph_osd_request *__oreq = (oreq); \
126 &(oreq)->r_ops[whch].typ.fld; \ 126 unsigned int __whch = (whch); \
127 }) 127 BUG_ON(__whch >= __oreq->r_num_ops); \
128 &__oreq->r_ops[__whch].typ.fld; \
129})
128 130
129static struct ceph_osd_data * 131static struct ceph_osd_data *
130osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) 132osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
@@ -1750,8 +1752,7 @@ static void complete_request(struct ceph_osd_request *req)
1750 * handle osd op reply. either call the callback if it is specified, 1752 * handle osd op reply. either call the callback if it is specified,
1751 * or do the completion to wake up the waiting thread. 1753 * or do the completion to wake up the waiting thread.
1752 */ 1754 */
1753static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1755static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1754 struct ceph_connection *con)
1755{ 1756{
1756 void *p, *end; 1757 void *p, *end;
1757 struct ceph_osd_request *req; 1758 struct ceph_osd_request *req;
@@ -2807,7 +2808,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2807 ceph_osdc_handle_map(osdc, msg); 2808 ceph_osdc_handle_map(osdc, msg);
2808 break; 2809 break;
2809 case CEPH_MSG_OSD_OPREPLY: 2810 case CEPH_MSG_OSD_OPREPLY:
2810 handle_reply(osdc, msg, con); 2811 handle_reply(osdc, msg);
2811 break; 2812 break;
2812 case CEPH_MSG_WATCH_NOTIFY: 2813 case CEPH_MSG_WATCH_NOTIFY:
2813 handle_watch_notify(osdc, msg); 2814 handle_watch_notify(osdc, msg);
@@ -2849,9 +2850,6 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2849 goto out; 2850 goto out;
2850 } 2851 }
2851 2852
2852 if (req->r_reply->con)
2853 dout("%s revoking msg %p from old con %p\n", __func__,
2854 req->r_reply, req->r_reply->con);
2855 ceph_msg_revoke_incoming(req->r_reply); 2853 ceph_msg_revoke_incoming(req->r_reply);
2856 2854
2857 if (front_len > req->r_reply->front_alloc_len) { 2855 if (front_len > req->r_reply->front_alloc_len) {
@@ -2978,17 +2976,19 @@ static int invalidate_authorizer(struct ceph_connection *con)
2978 return ceph_monc_validate_auth(&osdc->client->monc); 2976 return ceph_monc_validate_auth(&osdc->client->monc);
2979} 2977}
2980 2978
2981static int sign_message(struct ceph_connection *con, struct ceph_msg *msg) 2979static int osd_sign_message(struct ceph_msg *msg)
2982{ 2980{
2983 struct ceph_osd *o = con->private; 2981 struct ceph_osd *o = msg->con->private;
2984 struct ceph_auth_handshake *auth = &o->o_auth; 2982 struct ceph_auth_handshake *auth = &o->o_auth;
2983
2985 return ceph_auth_sign_message(auth, msg); 2984 return ceph_auth_sign_message(auth, msg);
2986} 2985}
2987 2986
2988static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg) 2987static int osd_check_message_signature(struct ceph_msg *msg)
2989{ 2988{
2990 struct ceph_osd *o = con->private; 2989 struct ceph_osd *o = msg->con->private;
2991 struct ceph_auth_handshake *auth = &o->o_auth; 2990 struct ceph_auth_handshake *auth = &o->o_auth;
2991
2992 return ceph_auth_check_message_signature(auth, msg); 2992 return ceph_auth_check_message_signature(auth, msg);
2993} 2993}
2994 2994
@@ -3000,7 +3000,7 @@ static const struct ceph_connection_operations osd_con_ops = {
3000 .verify_authorizer_reply = verify_authorizer_reply, 3000 .verify_authorizer_reply = verify_authorizer_reply,
3001 .invalidate_authorizer = invalidate_authorizer, 3001 .invalidate_authorizer = invalidate_authorizer,
3002 .alloc_msg = alloc_msg, 3002 .alloc_msg = alloc_msg,
3003 .sign_message = sign_message, 3003 .sign_message = osd_sign_message,
3004 .check_message_signature = check_message_signature, 3004 .check_message_signature = osd_check_message_signature,
3005 .fault = osd_reset, 3005 .fault = osd_reset,
3006}; 3006};