aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2017-02-28 18:36:09 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2017-02-28 18:36:09 -0500
commitb2deee2dc06db7cdf99b84346e69bdb9db9baa85 (patch)
treeceb073fa12c1a9804761ec8ce8911a517b007ed6
parentd4f4cf77b37eaea58ef863a4cbc95dad3880b524 (diff)
parent54ea0046b6fe36ec18e82d282a29a18da6cdea0f (diff)
Merge tag 'ceph-for-4.11-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov: "This time around we have: - support for rbd data-pool feature, which enables rbd images on erasure-coded pools (myself). CEPH_PG_MAX_SIZE has been bumped to allow erasure-coded profiles with k+m up to 32. - a patch for ceph_d_revalidate() performance regression introduced in 4.9, along with some cleanups in the area (Jeff Layton) - a set of fixes for unsafe ->d_parent accesses in CephFS (Jeff Layton) - buffered reads are now processed in rsize windows instead of rasize windows (Andreas Gerstmayr). The new default for rsize mount option is 64M. - ack vs commit distinction is gone, greatly simplifying ->fsync() and MOSDOpReply handling code (myself) ... also a few filesystem bug fixes from Zheng, a CRUSH sync up (CRUSH computations are still serialized though) and several minor fixes and cleanups all over" * tag 'ceph-for-4.11-rc1' of git://github.com/ceph/ceph-client: (52 commits) libceph, rbd, ceph: WRITE | ONDISK -> WRITE libceph: get rid of ack vs commit ceph: remove special ack vs commit behavior ceph: tidy some white space in get_nonsnap_parent() crush: fix dprintk compilation crush: do is_out test only if we do not collide ceph: remove req from unsafe list when unregistering it rbd: constify device_type structure rbd: kill obj_request->object_name and rbd_segment_name_cache rbd: store and use obj_request->object_no rbd: RBD_V{1,2}_DATA_FORMAT macros rbd: factor out __rbd_osd_req_create() rbd: set offset and length outside of rbd_obj_request_create() rbd: support for data-pool feature rbd: introduce rbd_init_layout() rbd: use rbd_obj_bytes() more rbd: remove now unused rbd_obj_request_wait() and helpers rbd: switch rbd_obj_method_sync() to ceph_osdc_call() libceph: pass reply buffer length through ceph_osdc_call() rbd: do away with obj_request in rbd_obj_read_sync() ...
-rw-r--r--Documentation/filesystems/ceph.txt5
-rw-r--r--drivers/block/rbd.c601
-rw-r--r--drivers/block/rbd_types.h10
-rw-r--r--fs/ceph/addr.c19
-rw-r--r--fs/ceph/cache.c2
-rw-r--r--fs/ceph/caps.c40
-rw-r--r--fs/ceph/debugfs.c2
-rw-r--r--fs/ceph/dir.c32
-rw-r--r--fs/ceph/export.c3
-rw-r--r--fs/ceph/file.c106
-rw-r--r--fs/ceph/inode.c172
-rw-r--r--fs/ceph/ioctl.c4
-rw-r--r--fs/ceph/mds_client.c175
-rw-r--r--fs/ceph/mds_client.h15
-rw-r--r--fs/ceph/super.c9
-rw-r--r--fs/ceph/super.h14
-rw-r--r--include/linux/ceph/osd_client.h6
-rw-r--r--include/linux/ceph/osdmap.h13
-rw-r--r--include/linux/ceph/rados.h2
-rw-r--r--include/linux/crush/crush.h41
-rw-r--r--include/linux/crush/mapper.h16
-rw-r--r--net/ceph/cls_lock_client.c14
-rw-r--r--net/ceph/crush/crush.c5
-rw-r--r--net/ceph/crush/mapper.c227
-rw-r--r--net/ceph/crypto.c1
-rw-r--r--net/ceph/osd_client.c130
-rw-r--r--net/ceph/osdmap.c101
-rw-r--r--net/ceph/snapshot.c2
28 files changed, 835 insertions, 932 deletions
diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt
index f5306ee40ea9..0b302a11718a 100644
--- a/Documentation/filesystems/ceph.txt
+++ b/Documentation/filesystems/ceph.txt
@@ -98,11 +98,10 @@ Mount Options
98 size. 98 size.
99 99
100 rsize=X 100 rsize=X
101 Specify the maximum read size in bytes. By default there is no 101 Specify the maximum read size in bytes. Default: 64 MB.
102 maximum.
103 102
104 rasize=X 103 rasize=X
105 Specify the maximum readahead. 104 Specify the maximum readahead. Default: 8 MB.
106 105
107 mount_timeout=X 106 mount_timeout=X
108 Specify the timeout value for mount (in seconds), in the case 107 Specify the timeout value for mount (in seconds), in the case
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 362cecc77130..4d6807723798 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -123,9 +123,11 @@ static int atomic_dec_return_safe(atomic_t *v)
123#define RBD_FEATURE_LAYERING (1<<0) 123#define RBD_FEATURE_LAYERING (1<<0)
124#define RBD_FEATURE_STRIPINGV2 (1<<1) 124#define RBD_FEATURE_STRIPINGV2 (1<<1)
125#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2) 125#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
126#define RBD_FEATURE_DATA_POOL (1<<7)
126#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ 127#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
127 RBD_FEATURE_STRIPINGV2 | \ 128 RBD_FEATURE_STRIPINGV2 | \
128 RBD_FEATURE_EXCLUSIVE_LOCK) 129 RBD_FEATURE_EXCLUSIVE_LOCK | \
130 RBD_FEATURE_DATA_POOL)
129 131
130/* Features supported by this (client software) implementation. */ 132/* Features supported by this (client software) implementation. */
131 133
@@ -144,10 +146,9 @@ struct rbd_image_header {
144 /* These six fields never change for a given rbd image */ 146 /* These six fields never change for a given rbd image */
145 char *object_prefix; 147 char *object_prefix;
146 __u8 obj_order; 148 __u8 obj_order;
147 __u8 crypt_type;
148 __u8 comp_type;
149 u64 stripe_unit; 149 u64 stripe_unit;
150 u64 stripe_count; 150 u64 stripe_count;
151 s64 data_pool_id;
151 u64 features; /* Might be changeable someday? */ 152 u64 features; /* Might be changeable someday? */
152 153
153 /* The remaining fields need to be updated occasionally */ 154 /* The remaining fields need to be updated occasionally */
@@ -230,7 +231,7 @@ enum obj_req_flags {
230}; 231};
231 232
232struct rbd_obj_request { 233struct rbd_obj_request {
233 const char *object_name; 234 u64 object_no;
234 u64 offset; /* object start byte */ 235 u64 offset; /* object start byte */
235 u64 length; /* bytes from offset */ 236 u64 length; /* bytes from offset */
236 unsigned long flags; 237 unsigned long flags;
@@ -438,7 +439,6 @@ static DEFINE_SPINLOCK(rbd_client_list_lock);
438 439
439static struct kmem_cache *rbd_img_request_cache; 440static struct kmem_cache *rbd_img_request_cache;
440static struct kmem_cache *rbd_obj_request_cache; 441static struct kmem_cache *rbd_obj_request_cache;
441static struct kmem_cache *rbd_segment_name_cache;
442 442
443static int rbd_major; 443static int rbd_major;
444static DEFINE_IDA(rbd_dev_id_ida); 444static DEFINE_IDA(rbd_dev_id_ida);
@@ -973,6 +973,30 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
973} 973}
974 974
975/* 975/*
976 * returns the size of an object in the image
977 */
978static u32 rbd_obj_bytes(struct rbd_image_header *header)
979{
980 return 1U << header->obj_order;
981}
982
983static void rbd_init_layout(struct rbd_device *rbd_dev)
984{
985 if (rbd_dev->header.stripe_unit == 0 ||
986 rbd_dev->header.stripe_count == 0) {
987 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
988 rbd_dev->header.stripe_count = 1;
989 }
990
991 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
992 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
993 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
994 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
995 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
996 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
997}
998
999/*
976 * Fill an rbd image header with information from the given format 1 1000 * Fill an rbd image header with information from the given format 1
977 * on-disk header. 1001 * on-disk header.
978 */ 1002 */
@@ -992,15 +1016,11 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
992 /* Allocate this now to avoid having to handle failure below */ 1016 /* Allocate this now to avoid having to handle failure below */
993 1017
994 if (first_time) { 1018 if (first_time) {
995 size_t len; 1019 object_prefix = kstrndup(ondisk->object_prefix,
996 1020 sizeof(ondisk->object_prefix),
997 len = strnlen(ondisk->object_prefix, 1021 GFP_KERNEL);
998 sizeof (ondisk->object_prefix));
999 object_prefix = kmalloc(len + 1, GFP_KERNEL);
1000 if (!object_prefix) 1022 if (!object_prefix)
1001 return -ENOMEM; 1023 return -ENOMEM;
1002 memcpy(object_prefix, ondisk->object_prefix, len);
1003 object_prefix[len] = '\0';
1004 } 1024 }
1005 1025
1006 /* Allocate the snapshot context and fill it in */ 1026 /* Allocate the snapshot context and fill it in */
@@ -1051,12 +1071,7 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1051 if (first_time) { 1071 if (first_time) {
1052 header->object_prefix = object_prefix; 1072 header->object_prefix = object_prefix;
1053 header->obj_order = ondisk->options.order; 1073 header->obj_order = ondisk->options.order;
1054 header->crypt_type = ondisk->options.crypt_type; 1074 rbd_init_layout(rbd_dev);
1055 header->comp_type = ondisk->options.comp_type;
1056 /* The rest aren't used for format 1 images */
1057 header->stripe_unit = 0;
1058 header->stripe_count = 0;
1059 header->features = 0;
1060 } else { 1075 } else {
1061 ceph_put_snap_context(header->snapc); 1076 ceph_put_snap_context(header->snapc);
1062 kfree(header->snap_names); 1077 kfree(header->snap_names);
@@ -1232,42 +1247,9 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1232 rbd_dev->mapping.features = 0; 1247 rbd_dev->mapping.features = 0;
1233} 1248}
1234 1249
1235static void rbd_segment_name_free(const char *name)
1236{
1237 /* The explicit cast here is needed to drop the const qualifier */
1238
1239 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1240}
1241
1242static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1243{
1244 char *name;
1245 u64 segment;
1246 int ret;
1247 char *name_format;
1248
1249 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1250 if (!name)
1251 return NULL;
1252 segment = offset >> rbd_dev->header.obj_order;
1253 name_format = "%s.%012llx";
1254 if (rbd_dev->image_format == 2)
1255 name_format = "%s.%016llx";
1256 ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
1257 rbd_dev->header.object_prefix, segment);
1258 if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
1259 pr_err("error formatting segment name for #%llu (%d)\n",
1260 segment, ret);
1261 rbd_segment_name_free(name);
1262 name = NULL;
1263 }
1264
1265 return name;
1266}
1267
1268static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) 1250static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1269{ 1251{
1270 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; 1252 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1271 1253
1272 return offset & (segment_size - 1); 1254 return offset & (segment_size - 1);
1273} 1255}
@@ -1275,7 +1257,7 @@ static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1275static u64 rbd_segment_length(struct rbd_device *rbd_dev, 1257static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1276 u64 offset, u64 length) 1258 u64 offset, u64 length)
1277{ 1259{
1278 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; 1260 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1279 1261
1280 offset &= segment_size - 1; 1262 offset &= segment_size - 1;
1281 1263
@@ -1287,14 +1269,6 @@ static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1287} 1269}
1288 1270
1289/* 1271/*
1290 * returns the size of an object in the image
1291 */
1292static u64 rbd_obj_bytes(struct rbd_image_header *header)
1293{
1294 return 1 << header->obj_order;
1295}
1296
1297/*
1298 * bio helpers 1272 * bio helpers
1299 */ 1273 */
1300 1274
@@ -1623,7 +1597,9 @@ static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1623{ 1597{
1624 struct ceph_osd_request *osd_req = obj_request->osd_req; 1598 struct ceph_osd_request *osd_req = obj_request->osd_req;
1625 1599
1626 dout("%s %p osd_req %p\n", __func__, obj_request, osd_req); 1600 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1601 obj_request, obj_request->object_no, obj_request->offset,
1602 obj_request->length, osd_req);
1627 if (obj_request_img_data_test(obj_request)) { 1603 if (obj_request_img_data_test(obj_request)) {
1628 WARN_ON(obj_request->callback != rbd_img_obj_callback); 1604 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1629 rbd_img_request_get(obj_request->img_request); 1605 rbd_img_request_get(obj_request->img_request);
@@ -1631,44 +1607,6 @@ static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1631 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); 1607 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1632} 1608}
1633 1609
1634static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1635{
1636 dout("%s %p\n", __func__, obj_request);
1637 ceph_osdc_cancel_request(obj_request->osd_req);
1638}
1639
1640/*
1641 * Wait for an object request to complete. If interrupted, cancel the
1642 * underlying osd request.
1643 *
1644 * @timeout: in jiffies, 0 means "wait forever"
1645 */
1646static int __rbd_obj_request_wait(struct rbd_obj_request *obj_request,
1647 unsigned long timeout)
1648{
1649 long ret;
1650
1651 dout("%s %p\n", __func__, obj_request);
1652 ret = wait_for_completion_interruptible_timeout(
1653 &obj_request->completion,
1654 ceph_timeout_jiffies(timeout));
1655 if (ret <= 0) {
1656 if (ret == 0)
1657 ret = -ETIMEDOUT;
1658 rbd_obj_request_end(obj_request);
1659 } else {
1660 ret = 0;
1661 }
1662
1663 dout("%s %p ret %d\n", __func__, obj_request, (int)ret);
1664 return ret;
1665}
1666
1667static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1668{
1669 return __rbd_obj_request_wait(obj_request, 0);
1670}
1671
1672static void rbd_img_request_complete(struct rbd_img_request *img_request) 1610static void rbd_img_request_complete(struct rbd_img_request *img_request)
1673{ 1611{
1674 1612
@@ -1955,8 +1893,8 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1955 rbd_osd_call_callback(obj_request); 1893 rbd_osd_call_callback(obj_request);
1956 break; 1894 break;
1957 default: 1895 default:
1958 rbd_warn(NULL, "%s: unsupported op %hu", 1896 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1959 obj_request->object_name, (unsigned short) opcode); 1897 obj_request->object_no, opcode);
1960 break; 1898 break;
1961 } 1899 }
1962 1900
@@ -1980,6 +1918,40 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1980 osd_req->r_data_offset = obj_request->offset; 1918 osd_req->r_data_offset = obj_request->offset;
1981} 1919}
1982 1920
1921static struct ceph_osd_request *
1922__rbd_osd_req_create(struct rbd_device *rbd_dev,
1923 struct ceph_snap_context *snapc,
1924 int num_ops, unsigned int flags,
1925 struct rbd_obj_request *obj_request)
1926{
1927 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1928 struct ceph_osd_request *req;
1929 const char *name_format = rbd_dev->image_format == 1 ?
1930 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1931
1932 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1933 if (!req)
1934 return NULL;
1935
1936 req->r_flags = flags;
1937 req->r_callback = rbd_osd_req_callback;
1938 req->r_priv = obj_request;
1939
1940 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1941 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1942 rbd_dev->header.object_prefix, obj_request->object_no))
1943 goto err_req;
1944
1945 if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1946 goto err_req;
1947
1948 return req;
1949
1950err_req:
1951 ceph_osdc_put_request(req);
1952 return NULL;
1953}
1954
1983/* 1955/*
1984 * Create an osd request. A read request has one osd op (read). 1956 * Create an osd request. A read request has one osd op (read).
1985 * A write request has either one (watch) or two (hint+write) osd ops. 1957 * A write request has either one (watch) or two (hint+write) osd ops.
@@ -1993,8 +1965,6 @@ static struct ceph_osd_request *rbd_osd_req_create(
1993 struct rbd_obj_request *obj_request) 1965 struct rbd_obj_request *obj_request)
1994{ 1966{
1995 struct ceph_snap_context *snapc = NULL; 1967 struct ceph_snap_context *snapc = NULL;
1996 struct ceph_osd_client *osdc;
1997 struct ceph_osd_request *osd_req;
1998 1968
1999 if (obj_request_img_data_test(obj_request) && 1969 if (obj_request_img_data_test(obj_request) &&
2000 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) { 1970 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
@@ -2009,35 +1979,9 @@ static struct ceph_osd_request *rbd_osd_req_create(
2009 1979
2010 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2)); 1980 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
2011 1981
2012 /* Allocate and initialize the request, for the num_ops ops */ 1982 return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
2013 1983 (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
2014 osdc = &rbd_dev->rbd_client->client->osdc; 1984 CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
2015 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
2016 GFP_NOIO);
2017 if (!osd_req)
2018 goto fail;
2019
2020 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2021 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
2022 else
2023 osd_req->r_flags = CEPH_OSD_FLAG_READ;
2024
2025 osd_req->r_callback = rbd_osd_req_callback;
2026 osd_req->r_priv = obj_request;
2027
2028 osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
2029 if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2030 obj_request->object_name))
2031 goto fail;
2032
2033 if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2034 goto fail;
2035
2036 return osd_req;
2037
2038fail:
2039 ceph_osdc_put_request(osd_req);
2040 return NULL;
2041} 1985}
2042 1986
2043/* 1987/*
@@ -2050,10 +1994,6 @@ static struct ceph_osd_request *
2050rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) 1994rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2051{ 1995{
2052 struct rbd_img_request *img_request; 1996 struct rbd_img_request *img_request;
2053 struct ceph_snap_context *snapc;
2054 struct rbd_device *rbd_dev;
2055 struct ceph_osd_client *osdc;
2056 struct ceph_osd_request *osd_req;
2057 int num_osd_ops = 3; 1997 int num_osd_ops = 3;
2058 1998
2059 rbd_assert(obj_request_img_data_test(obj_request)); 1999 rbd_assert(obj_request_img_data_test(obj_request));
@@ -2065,77 +2005,34 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2065 if (img_request_discard_test(img_request)) 2005 if (img_request_discard_test(img_request))
2066 num_osd_ops = 2; 2006 num_osd_ops = 2;
2067 2007
2068 /* Allocate and initialize the request, for all the ops */ 2008 return __rbd_osd_req_create(img_request->rbd_dev,
2069 2009 img_request->snapc, num_osd_ops,
2070 snapc = img_request->snapc; 2010 CEPH_OSD_FLAG_WRITE, obj_request);
2071 rbd_dev = img_request->rbd_dev;
2072 osdc = &rbd_dev->rbd_client->client->osdc;
2073 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
2074 false, GFP_NOIO);
2075 if (!osd_req)
2076 goto fail;
2077
2078 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
2079 osd_req->r_callback = rbd_osd_req_callback;
2080 osd_req->r_priv = obj_request;
2081
2082 osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
2083 if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2084 obj_request->object_name))
2085 goto fail;
2086
2087 if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2088 goto fail;
2089
2090 return osd_req;
2091
2092fail:
2093 ceph_osdc_put_request(osd_req);
2094 return NULL;
2095} 2011}
2096 2012
2097
2098static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) 2013static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2099{ 2014{
2100 ceph_osdc_put_request(osd_req); 2015 ceph_osdc_put_request(osd_req);
2101} 2016}
2102 2017
2103/* object_name is assumed to be a non-null pointer and NUL-terminated */ 2018static struct rbd_obj_request *
2104 2019rbd_obj_request_create(enum obj_request_type type)
2105static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
2106 u64 offset, u64 length,
2107 enum obj_request_type type)
2108{ 2020{
2109 struct rbd_obj_request *obj_request; 2021 struct rbd_obj_request *obj_request;
2110 size_t size;
2111 char *name;
2112 2022
2113 rbd_assert(obj_request_type_valid(type)); 2023 rbd_assert(obj_request_type_valid(type));
2114 2024
2115 size = strlen(object_name) + 1;
2116 name = kmalloc(size, GFP_NOIO);
2117 if (!name)
2118 return NULL;
2119
2120 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO); 2025 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2121 if (!obj_request) { 2026 if (!obj_request)
2122 kfree(name);
2123 return NULL; 2027 return NULL;
2124 }
2125 2028
2126 obj_request->object_name = memcpy(name, object_name, size);
2127 obj_request->offset = offset;
2128 obj_request->length = length;
2129 obj_request->flags = 0;
2130 obj_request->which = BAD_WHICH; 2029 obj_request->which = BAD_WHICH;
2131 obj_request->type = type; 2030 obj_request->type = type;
2132 INIT_LIST_HEAD(&obj_request->links); 2031 INIT_LIST_HEAD(&obj_request->links);
2133 init_completion(&obj_request->completion); 2032 init_completion(&obj_request->completion);
2134 kref_init(&obj_request->kref); 2033 kref_init(&obj_request->kref);
2135 2034
2136 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name, 2035 dout("%s %p\n", __func__, obj_request);
2137 offset, length, (int)type, obj_request);
2138
2139 return obj_request; 2036 return obj_request;
2140} 2037}
2141 2038
@@ -2170,8 +2067,6 @@ static void rbd_obj_request_destroy(struct kref *kref)
2170 break; 2067 break;
2171 } 2068 }
2172 2069
2173 kfree(obj_request->object_name);
2174 obj_request->object_name = NULL;
2175 kmem_cache_free(rbd_obj_request_cache, obj_request); 2070 kmem_cache_free(rbd_obj_request_cache, obj_request);
2176} 2071}
2177 2072
@@ -2546,22 +2441,18 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
2546 2441
2547 while (resid) { 2442 while (resid) {
2548 struct ceph_osd_request *osd_req; 2443 struct ceph_osd_request *osd_req;
2549 const char *object_name; 2444 u64 object_no = img_offset >> rbd_dev->header.obj_order;
2550 u64 offset; 2445 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2551 u64 length; 2446 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
2552 2447
2553 object_name = rbd_segment_name(rbd_dev, img_offset); 2448 obj_request = rbd_obj_request_create(type);
2554 if (!object_name)
2555 goto out_unwind;
2556 offset = rbd_segment_offset(rbd_dev, img_offset);
2557 length = rbd_segment_length(rbd_dev, img_offset, resid);
2558 obj_request = rbd_obj_request_create(object_name,
2559 offset, length, type);
2560 /* object request has its own copy of the object name */
2561 rbd_segment_name_free(object_name);
2562 if (!obj_request) 2449 if (!obj_request)
2563 goto out_unwind; 2450 goto out_unwind;
2564 2451
2452 obj_request->object_no = object_no;
2453 obj_request->offset = offset;
2454 obj_request->length = length;
2455
2565 /* 2456 /*
2566 * set obj_request->img_request before creating the 2457 * set obj_request->img_request before creating the
2567 * osd_request so that it gets the right snapc 2458 * osd_request so that it gets the right snapc
@@ -2771,7 +2662,7 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2771 * child image to which the original request was to be sent. 2662 * child image to which the original request was to be sent.
2772 */ 2663 */
2773 img_offset = obj_request->img_offset - obj_request->offset; 2664 img_offset = obj_request->img_offset - obj_request->offset;
2774 length = (u64)1 << rbd_dev->header.obj_order; 2665 length = rbd_obj_bytes(&rbd_dev->header);
2775 2666
2776 /* 2667 /*
2777 * There is no defined parent data beyond the parent 2668 * There is no defined parent data beyond the parent
@@ -2900,11 +2791,12 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2900 size_t size; 2791 size_t size;
2901 int ret; 2792 int ret;
2902 2793
2903 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, 2794 stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
2904 OBJ_REQUEST_PAGES);
2905 if (!stat_request) 2795 if (!stat_request)
2906 return -ENOMEM; 2796 return -ENOMEM;
2907 2797
2798 stat_request->object_no = obj_request->object_no;
2799
2908 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, 2800 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2909 stat_request); 2801 stat_request);
2910 if (!stat_request->osd_req) { 2802 if (!stat_request->osd_req) {
@@ -3983,17 +3875,17 @@ out:
3983 * returned in the outbound buffer, or a negative error code. 3875 * returned in the outbound buffer, or a negative error code.
3984 */ 3876 */
3985static int rbd_obj_method_sync(struct rbd_device *rbd_dev, 3877static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3986 const char *object_name, 3878 struct ceph_object_id *oid,
3987 const char *class_name, 3879 struct ceph_object_locator *oloc,
3988 const char *method_name, 3880 const char *method_name,
3989 const void *outbound, 3881 const void *outbound,
3990 size_t outbound_size, 3882 size_t outbound_size,
3991 void *inbound, 3883 void *inbound,
3992 size_t inbound_size) 3884 size_t inbound_size)
3993{ 3885{
3994 struct rbd_obj_request *obj_request; 3886 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3995 struct page **pages; 3887 struct page *req_page = NULL;
3996 u32 page_count; 3888 struct page *reply_page;
3997 int ret; 3889 int ret;
3998 3890
3999 /* 3891 /*
@@ -4003,61 +3895,35 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4003 * method. Currently if this is present it will be a 3895 * method. Currently if this is present it will be a
4004 * snapshot id. 3896 * snapshot id.
4005 */ 3897 */
4006 page_count = (u32)calc_pages_for(0, inbound_size); 3898 if (outbound) {
4007 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 3899 if (outbound_size > PAGE_SIZE)
4008 if (IS_ERR(pages)) 3900 return -E2BIG;
4009 return PTR_ERR(pages);
4010
4011 ret = -ENOMEM;
4012 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
4013 OBJ_REQUEST_PAGES);
4014 if (!obj_request)
4015 goto out;
4016 3901
4017 obj_request->pages = pages; 3902 req_page = alloc_page(GFP_KERNEL);
4018 obj_request->page_count = page_count; 3903 if (!req_page)
4019 3904 return -ENOMEM;
4020 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
4021 obj_request);
4022 if (!obj_request->osd_req)
4023 goto out;
4024
4025 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
4026 class_name, method_name);
4027 if (outbound_size) {
4028 struct ceph_pagelist *pagelist;
4029
4030 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
4031 if (!pagelist)
4032 goto out;
4033 3905
4034 ceph_pagelist_init(pagelist); 3906 memcpy(page_address(req_page), outbound, outbound_size);
4035 ceph_pagelist_append(pagelist, outbound, outbound_size);
4036 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
4037 pagelist);
4038 } 3907 }
4039 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
4040 obj_request->pages, inbound_size,
4041 0, false, false);
4042
4043 rbd_obj_request_submit(obj_request);
4044 ret = rbd_obj_request_wait(obj_request);
4045 if (ret)
4046 goto out;
4047 3908
4048 ret = obj_request->result; 3909 reply_page = alloc_page(GFP_KERNEL);
4049 if (ret < 0) 3910 if (!reply_page) {
4050 goto out; 3911 if (req_page)
3912 __free_page(req_page);
3913 return -ENOMEM;
3914 }
4051 3915
4052 rbd_assert(obj_request->xferred < (u64)INT_MAX); 3916 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
4053 ret = (int)obj_request->xferred; 3917 CEPH_OSD_FLAG_READ, req_page, outbound_size,
4054 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred); 3918 reply_page, &inbound_size);
4055out: 3919 if (!ret) {
4056 if (obj_request) 3920 memcpy(inbound, page_address(reply_page), inbound_size);
4057 rbd_obj_request_put(obj_request); 3921 ret = inbound_size;
4058 else 3922 }
4059 ceph_release_page_vector(pages, page_count);
4060 3923
3924 if (req_page)
3925 __free_page(req_page);
3926 __free_page(reply_page);
4061 return ret; 3927 return ret;
4062} 3928}
4063 3929
@@ -4256,63 +4122,46 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
4256} 4122}
4257 4123
4258static int rbd_obj_read_sync(struct rbd_device *rbd_dev, 4124static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4259 const char *object_name, 4125 struct ceph_object_id *oid,
4260 u64 offset, u64 length, void *buf) 4126 struct ceph_object_locator *oloc,
4127 void *buf, int buf_len)
4261 4128
4262{ 4129{
4263 struct rbd_obj_request *obj_request; 4130 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4264 struct page **pages = NULL; 4131 struct ceph_osd_request *req;
4265 u32 page_count; 4132 struct page **pages;
4266 size_t size; 4133 int num_pages = calc_pages_for(0, buf_len);
4267 int ret; 4134 int ret;
4268 4135
4269 page_count = (u32) calc_pages_for(offset, length); 4136 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4270 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 4137 if (!req)
4271 if (IS_ERR(pages)) 4138 return -ENOMEM;
4272 return PTR_ERR(pages);
4273
4274 ret = -ENOMEM;
4275 obj_request = rbd_obj_request_create(object_name, offset, length,
4276 OBJ_REQUEST_PAGES);
4277 if (!obj_request)
4278 goto out;
4279
4280 obj_request->pages = pages;
4281 obj_request->page_count = page_count;
4282
4283 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
4284 obj_request);
4285 if (!obj_request->osd_req)
4286 goto out;
4287 4139
4288 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ, 4140 ceph_oid_copy(&req->r_base_oid, oid);
4289 offset, length, 0, 0); 4141 ceph_oloc_copy(&req->r_base_oloc, oloc);
4290 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, 4142 req->r_flags = CEPH_OSD_FLAG_READ;
4291 obj_request->pages,
4292 obj_request->length,
4293 obj_request->offset & ~PAGE_MASK,
4294 false, false);
4295 4143
4296 rbd_obj_request_submit(obj_request); 4144 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4297 ret = rbd_obj_request_wait(obj_request);
4298 if (ret) 4145 if (ret)
4299 goto out; 4146 goto out_req;
4300 4147
4301 ret = obj_request->result; 4148 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4302 if (ret < 0) 4149 if (IS_ERR(pages)) {
4303 goto out; 4150 ret = PTR_ERR(pages);
4151 goto out_req;
4152 }
4304 4153
4305 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX); 4154 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4306 size = (size_t) obj_request->xferred; 4155 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4307 ceph_copy_from_page_vector(pages, buf, 0, size); 4156 true);
4308 rbd_assert(size <= (size_t)INT_MAX); 4157
4309 ret = (int)size; 4158 ceph_osdc_start_request(osdc, req, false);
4310out: 4159 ret = ceph_osdc_wait_request(osdc, req);
4311 if (obj_request) 4160 if (ret >= 0)
4312 rbd_obj_request_put(obj_request); 4161 ceph_copy_from_page_vector(pages, buf, 0, ret);
4313 else
4314 ceph_release_page_vector(pages, page_count);
4315 4162
4163out_req:
4164 ceph_osdc_put_request(req);
4316 return ret; 4165 return ret;
4317} 4166}
4318 4167
@@ -4348,8 +4197,8 @@ static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4348 if (!ondisk) 4197 if (!ondisk)
4349 return -ENOMEM; 4198 return -ENOMEM;
4350 4199
4351 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_oid.name, 4200 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4352 0, size, ondisk); 4201 &rbd_dev->header_oloc, ondisk, size);
4353 if (ret < 0) 4202 if (ret < 0)
4354 goto out; 4203 goto out;
4355 if ((size_t)ret < size) { 4204 if ((size_t)ret < size) {
@@ -4781,7 +4630,7 @@ static const struct attribute_group *rbd_attr_groups[] = {
4781 4630
4782static void rbd_dev_release(struct device *dev); 4631static void rbd_dev_release(struct device *dev);
4783 4632
4784static struct device_type rbd_device_type = { 4633static const struct device_type rbd_device_type = {
4785 .name = "rbd", 4634 .name = "rbd",
4786 .groups = rbd_attr_groups, 4635 .groups = rbd_attr_groups,
4787 .release = rbd_dev_release, 4636 .release = rbd_dev_release,
@@ -4876,8 +4725,9 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4876 INIT_LIST_HEAD(&rbd_dev->node); 4725 INIT_LIST_HEAD(&rbd_dev->node);
4877 init_rwsem(&rbd_dev->header_rwsem); 4726 init_rwsem(&rbd_dev->header_rwsem);
4878 4727
4728 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
4879 ceph_oid_init(&rbd_dev->header_oid); 4729 ceph_oid_init(&rbd_dev->header_oid);
4880 ceph_oloc_init(&rbd_dev->header_oloc); 4730 rbd_dev->header_oloc.pool = spec->pool_id;
4881 4731
4882 mutex_init(&rbd_dev->watch_mutex); 4732 mutex_init(&rbd_dev->watch_mutex);
4883 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; 4733 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
@@ -4899,12 +4749,6 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4899 rbd_dev->rbd_client = rbdc; 4749 rbd_dev->rbd_client = rbdc;
4900 rbd_dev->spec = spec; 4750 rbd_dev->spec = spec;
4901 4751
4902 rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
4903 rbd_dev->layout.stripe_count = 1;
4904 rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER;
4905 rbd_dev->layout.pool_id = spec->pool_id;
4906 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
4907
4908 return rbd_dev; 4752 return rbd_dev;
4909} 4753}
4910 4754
@@ -4970,10 +4814,10 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4970 __le64 size; 4814 __le64 size;
4971 } __attribute__ ((packed)) size_buf = { 0 }; 4815 } __attribute__ ((packed)) size_buf = { 0 };
4972 4816
4973 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, 4817 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4974 "rbd", "get_size", 4818 &rbd_dev->header_oloc, "get_size",
4975 &snapid, sizeof (snapid), 4819 &snapid, sizeof(snapid),
4976 &size_buf, sizeof (size_buf)); 4820 &size_buf, sizeof(size_buf));
4977 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4821 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4978 if (ret < 0) 4822 if (ret < 0)
4979 return ret; 4823 return ret;
@@ -5010,9 +4854,9 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5010 if (!reply_buf) 4854 if (!reply_buf)
5011 return -ENOMEM; 4855 return -ENOMEM;
5012 4856
5013 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, 4857 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5014 "rbd", "get_object_prefix", NULL, 0, 4858 &rbd_dev->header_oloc, "get_object_prefix",
5015 reply_buf, RBD_OBJ_PREFIX_LEN_MAX); 4859 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
5016 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4860 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5017 if (ret < 0) 4861 if (ret < 0)
5018 goto out; 4862 goto out;
@@ -5045,10 +4889,10 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5045 u64 unsup; 4889 u64 unsup;
5046 int ret; 4890 int ret;
5047 4891
5048 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, 4892 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5049 "rbd", "get_features", 4893 &rbd_dev->header_oloc, "get_features",
5050 &snapid, sizeof (snapid), 4894 &snapid, sizeof(snapid),
5051 &features_buf, sizeof (features_buf)); 4895 &features_buf, sizeof(features_buf));
5052 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4896 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5053 if (ret < 0) 4897 if (ret < 0)
5054 return ret; 4898 return ret;
@@ -5107,10 +4951,9 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5107 } 4951 }
5108 4952
5109 snapid = cpu_to_le64(rbd_dev->spec->snap_id); 4953 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5110 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, 4954 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5111 "rbd", "get_parent", 4955 &rbd_dev->header_oloc, "get_parent",
5112 &snapid, sizeof (snapid), 4956 &snapid, sizeof(snapid), reply_buf, size);
5113 reply_buf, size);
5114 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4957 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5115 if (ret < 0) 4958 if (ret < 0)
5116 goto out_err; 4959 goto out_err;
@@ -5210,9 +5053,9 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5210 u64 stripe_count; 5053 u64 stripe_count;
5211 int ret; 5054 int ret;
5212 5055
5213 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, 5056 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5214 "rbd", "get_stripe_unit_count", NULL, 0, 5057 &rbd_dev->header_oloc, "get_stripe_unit_count",
5215 (char *)&striping_info_buf, size); 5058 NULL, 0, &striping_info_buf, size);
5216 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5059 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5217 if (ret < 0) 5060 if (ret < 0)
5218 return ret; 5061 return ret;
@@ -5226,7 +5069,7 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5226 * out, and only fail if the image has non-default values. 5069 * out, and only fail if the image has non-default values.
5227 */ 5070 */
5228 ret = -EINVAL; 5071 ret = -EINVAL;
5229 obj_size = (u64)1 << rbd_dev->header.obj_order; 5072 obj_size = rbd_obj_bytes(&rbd_dev->header);
5230 p = &striping_info_buf; 5073 p = &striping_info_buf;
5231 stripe_unit = ceph_decode_64(&p); 5074 stripe_unit = ceph_decode_64(&p);
5232 if (stripe_unit != obj_size) { 5075 if (stripe_unit != obj_size) {
@@ -5247,8 +5090,27 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5247 return 0; 5090 return 0;
5248} 5091}
5249 5092
5093static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5094{
5095 __le64 data_pool_id;
5096 int ret;
5097
5098 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5099 &rbd_dev->header_oloc, "get_data_pool",
5100 NULL, 0, &data_pool_id, sizeof(data_pool_id));
5101 if (ret < 0)
5102 return ret;
5103 if (ret < sizeof(data_pool_id))
5104 return -EBADMSG;
5105
5106 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5107 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5108 return 0;
5109}
5110
5250static char *rbd_dev_image_name(struct rbd_device *rbd_dev) 5111static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5251{ 5112{
5113 CEPH_DEFINE_OID_ONSTACK(oid);
5252 size_t image_id_size; 5114 size_t image_id_size;
5253 char *image_id; 5115 char *image_id;
5254 void *p; 5116 void *p;
@@ -5276,10 +5138,10 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5276 if (!reply_buf) 5138 if (!reply_buf)
5277 goto out; 5139 goto out;
5278 5140
5279 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY, 5141 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5280 "rbd", "dir_get_name", 5142 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5281 image_id, image_id_size, 5143 "dir_get_name", image_id, image_id_size,
5282 reply_buf, size); 5144 reply_buf, size);
5283 if (ret < 0) 5145 if (ret < 0)
5284 goto out; 5146 goto out;
5285 p = reply_buf; 5147 p = reply_buf;
@@ -5458,9 +5320,9 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5458 if (!reply_buf) 5320 if (!reply_buf)
5459 return -ENOMEM; 5321 return -ENOMEM;
5460 5322
5461 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, 5323 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5462 "rbd", "get_snapcontext", NULL, 0, 5324 &rbd_dev->header_oloc, "get_snapcontext",
5463 reply_buf, size); 5325 NULL, 0, reply_buf, size);
5464 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5326 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5465 if (ret < 0) 5327 if (ret < 0)
5466 goto out; 5328 goto out;
@@ -5523,10 +5385,9 @@ static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5523 return ERR_PTR(-ENOMEM); 5385 return ERR_PTR(-ENOMEM);
5524 5386
5525 snapid = cpu_to_le64(snap_id); 5387 snapid = cpu_to_le64(snap_id);
5526 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, 5388 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5527 "rbd", "get_snapshot_name", 5389 &rbd_dev->header_oloc, "get_snapshot_name",
5528 &snapid, sizeof (snapid), 5390 &snapid, sizeof(snapid), reply_buf, size);
5529 reply_buf, size);
5530 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5391 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5531 if (ret < 0) { 5392 if (ret < 0) {
5532 snap_name = ERR_PTR(ret); 5393 snap_name = ERR_PTR(ret);
@@ -5833,7 +5694,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5833{ 5694{
5834 int ret; 5695 int ret;
5835 size_t size; 5696 size_t size;
5836 char *object_name; 5697 CEPH_DEFINE_OID_ONSTACK(oid);
5837 void *response; 5698 void *response;
5838 char *image_id; 5699 char *image_id;
5839 5700
@@ -5853,12 +5714,12 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5853 * First, see if the format 2 image id file exists, and if 5714 * First, see if the format 2 image id file exists, and if
5854 * so, get the image's persistent id from it. 5715 * so, get the image's persistent id from it.
5855 */ 5716 */
5856 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name); 5717 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5857 object_name = kmalloc(size, GFP_NOIO); 5718 rbd_dev->spec->image_name);
5858 if (!object_name) 5719 if (ret)
5859 return -ENOMEM; 5720 return ret;
5860 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name); 5721
5861 dout("rbd id object name is %s\n", object_name); 5722 dout("rbd id object name is %s\n", oid.name);
5862 5723
5863 /* Response will be an encoded string, which includes a length */ 5724 /* Response will be an encoded string, which includes a length */
5864 5725
@@ -5871,9 +5732,9 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5871 5732
5872 /* If it doesn't exist we'll assume it's a format 1 image */ 5733 /* If it doesn't exist we'll assume it's a format 1 image */
5873 5734
5874 ret = rbd_obj_method_sync(rbd_dev, object_name, 5735 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5875 "rbd", "get_id", NULL, 0, 5736 "get_id", NULL, 0,
5876 response, RBD_IMAGE_ID_LEN_MAX); 5737 response, RBD_IMAGE_ID_LEN_MAX);
5877 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 5738 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5878 if (ret == -ENOENT) { 5739 if (ret == -ENOENT) {
5879 image_id = kstrdup("", GFP_KERNEL); 5740 image_id = kstrdup("", GFP_KERNEL);
@@ -5896,8 +5757,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5896 } 5757 }
5897out: 5758out:
5898 kfree(response); 5759 kfree(response);
5899 kfree(object_name); 5760 ceph_oid_destroy(&oid);
5900
5901 return ret; 5761 return ret;
5902} 5762}
5903 5763
@@ -5944,14 +5804,20 @@ static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5944 if (ret < 0) 5804 if (ret < 0)
5945 goto out_err; 5805 goto out_err;
5946 } 5806 }
5947 /* No support for crypto and compression type format 2 images */
5948 5807
5808 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5809 ret = rbd_dev_v2_data_pool(rbd_dev);
5810 if (ret)
5811 goto out_err;
5812 }
5813
5814 rbd_init_layout(rbd_dev);
5949 return 0; 5815 return 0;
5816
5950out_err: 5817out_err:
5951 rbd_dev->header.features = 0; 5818 rbd_dev->header.features = 0;
5952 kfree(rbd_dev->header.object_prefix); 5819 kfree(rbd_dev->header.object_prefix);
5953 rbd_dev->header.object_prefix = NULL; 5820 rbd_dev->header.object_prefix = NULL;
5954
5955 return ret; 5821 return ret;
5956} 5822}
5957 5823
@@ -6077,8 +5943,6 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6077 /* Record the header object name for this rbd image. */ 5943 /* Record the header object name for this rbd image. */
6078 5944
6079 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 5945 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6080
6081 rbd_dev->header_oloc.pool = rbd_dev->layout.pool_id;
6082 if (rbd_dev->image_format == 1) 5946 if (rbd_dev->image_format == 1)
6083 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", 5947 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6084 spec->image_name, RBD_SUFFIX); 5948 spec->image_name, RBD_SUFFIX);
@@ -6471,27 +6335,16 @@ static int rbd_slab_init(void)
6471 if (!rbd_obj_request_cache) 6335 if (!rbd_obj_request_cache)
6472 goto out_err; 6336 goto out_err;
6473 6337
6474 rbd_assert(!rbd_segment_name_cache); 6338 return 0;
6475 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
6476 CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
6477 if (rbd_segment_name_cache)
6478 return 0;
6479out_err:
6480 kmem_cache_destroy(rbd_obj_request_cache);
6481 rbd_obj_request_cache = NULL;
6482 6339
6340out_err:
6483 kmem_cache_destroy(rbd_img_request_cache); 6341 kmem_cache_destroy(rbd_img_request_cache);
6484 rbd_img_request_cache = NULL; 6342 rbd_img_request_cache = NULL;
6485
6486 return -ENOMEM; 6343 return -ENOMEM;
6487} 6344}
6488 6345
6489static void rbd_slab_exit(void) 6346static void rbd_slab_exit(void)
6490{ 6347{
6491 rbd_assert(rbd_segment_name_cache);
6492 kmem_cache_destroy(rbd_segment_name_cache);
6493 rbd_segment_name_cache = NULL;
6494
6495 rbd_assert(rbd_obj_request_cache); 6348 rbd_assert(rbd_obj_request_cache);
6496 kmem_cache_destroy(rbd_obj_request_cache); 6349 kmem_cache_destroy(rbd_obj_request_cache);
6497 rbd_obj_request_cache = NULL; 6350 rbd_obj_request_cache = NULL;
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index 94f367db27b0..62ff50d3e7a6 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -25,8 +25,8 @@
25 */ 25 */
26 26
27#define RBD_HEADER_PREFIX "rbd_header." 27#define RBD_HEADER_PREFIX "rbd_header."
28#define RBD_DATA_PREFIX "rbd_data."
29#define RBD_ID_PREFIX "rbd_id." 28#define RBD_ID_PREFIX "rbd_id."
29#define RBD_V2_DATA_FORMAT "%s.%016llx"
30 30
31#define RBD_LOCK_NAME "rbd_lock" 31#define RBD_LOCK_NAME "rbd_lock"
32#define RBD_LOCK_TAG "internal" 32#define RBD_LOCK_TAG "internal"
@@ -42,13 +42,14 @@ enum rbd_notify_op {
42/* 42/*
43 * For format version 1, rbd image 'foo' consists of objects 43 * For format version 1, rbd image 'foo' consists of objects
44 * foo.rbd - image metadata 44 * foo.rbd - image metadata
45 * rb.<idhi>.<idlo>.00000000 45 * rb.<idhi>.<idlo>.<extra>.000000000000
46 * rb.<idhi>.<idlo>.00000001 46 * rb.<idhi>.<idlo>.<extra>.000000000001
47 * ... - data 47 * ... - data
48 * There is no notion of a persistent image id in rbd format 1. 48 * There is no notion of a persistent image id in rbd format 1.
49 */ 49 */
50 50
51#define RBD_SUFFIX ".rbd" 51#define RBD_SUFFIX ".rbd"
52#define RBD_V1_DATA_FORMAT "%s.%012llx"
52 53
53#define RBD_DIRECTORY "rbd_directory" 54#define RBD_DIRECTORY "rbd_directory"
54#define RBD_INFO "rbd_info" 55#define RBD_INFO "rbd_info"
@@ -57,9 +58,6 @@ enum rbd_notify_op {
57#define RBD_MIN_OBJ_ORDER 16 58#define RBD_MIN_OBJ_ORDER 16
58#define RBD_MAX_OBJ_ORDER 30 59#define RBD_MAX_OBJ_ORDER 30
59 60
60#define RBD_COMP_NONE 0
61#define RBD_CRYPT_NONE 0
62
63#define RBD_HEADER_TEXT "<<< Rados Block Device Image >>>\n" 61#define RBD_HEADER_TEXT "<<< Rados Block Device Image >>>\n"
64#define RBD_HEADER_SIGNATURE "RBD" 62#define RBD_HEADER_SIGNATURE "RBD"
65#define RBD_HEADER_VERSION "001.005" 63#define RBD_HEADER_VERSION "001.005"
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 7ce35aec8c76..f297a9e18642 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -391,6 +391,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
391 nr_pages = i; 391 nr_pages = i;
392 if (nr_pages > 0) { 392 if (nr_pages > 0) {
393 len = nr_pages << PAGE_SHIFT; 393 len = nr_pages << PAGE_SHIFT;
394 osd_req_op_extent_update(req, 0, len);
394 break; 395 break;
395 } 396 }
396 goto out_pages; 397 goto out_pages;
@@ -771,7 +772,7 @@ static int ceph_writepages_start(struct address_space *mapping,
771 wbc->sync_mode == WB_SYNC_NONE ? "NONE" : 772 wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
772 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); 773 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
773 774
774 if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 775 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
775 if (ci->i_wrbuffer_ref > 0) { 776 if (ci->i_wrbuffer_ref > 0) {
776 pr_warn_ratelimited( 777 pr_warn_ratelimited(
777 "writepage_start %p %lld forced umount\n", 778 "writepage_start %p %lld forced umount\n",
@@ -1017,8 +1018,7 @@ new_request:
1017 &ci->i_layout, vino, 1018 &ci->i_layout, vino,
1018 offset, &len, 0, num_ops, 1019 offset, &len, 0, num_ops,
1019 CEPH_OSD_OP_WRITE, 1020 CEPH_OSD_OP_WRITE,
1020 CEPH_OSD_FLAG_WRITE | 1021 CEPH_OSD_FLAG_WRITE,
1021 CEPH_OSD_FLAG_ONDISK,
1022 snapc, truncate_seq, 1022 snapc, truncate_seq,
1023 truncate_size, false); 1023 truncate_size, false);
1024 if (IS_ERR(req)) { 1024 if (IS_ERR(req)) {
@@ -1028,8 +1028,7 @@ new_request:
1028 min(num_ops, 1028 min(num_ops,
1029 CEPH_OSD_SLAB_OPS), 1029 CEPH_OSD_SLAB_OPS),
1030 CEPH_OSD_OP_WRITE, 1030 CEPH_OSD_OP_WRITE,
1031 CEPH_OSD_FLAG_WRITE | 1031 CEPH_OSD_FLAG_WRITE,
1032 CEPH_OSD_FLAG_ONDISK,
1033 snapc, truncate_seq, 1032 snapc, truncate_seq,
1034 truncate_size, true); 1033 truncate_size, true);
1035 BUG_ON(IS_ERR(req)); 1034 BUG_ON(IS_ERR(req));
@@ -1194,7 +1193,7 @@ static int ceph_update_writeable_page(struct file *file,
1194 int r; 1193 int r;
1195 struct ceph_snap_context *snapc, *oldest; 1194 struct ceph_snap_context *snapc, *oldest;
1196 1195
1197 if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 1196 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1198 dout(" page %p forced umount\n", page); 1197 dout(" page %p forced umount\n", page);
1199 unlock_page(page); 1198 unlock_page(page);
1200 return -EIO; 1199 return -EIO;
@@ -1681,8 +1680,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
1681 1680
1682 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 1681 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1683 ceph_vino(inode), 0, &len, 0, 1, 1682 ceph_vino(inode), 0, &len, 0, 1,
1684 CEPH_OSD_OP_CREATE, 1683 CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE,
1685 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1686 NULL, 0, 0, false); 1684 NULL, 0, 0, false);
1687 if (IS_ERR(req)) { 1685 if (IS_ERR(req)) {
1688 err = PTR_ERR(req); 1686 err = PTR_ERR(req);
@@ -1699,8 +1697,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
1699 1697
1700 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 1698 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1701 ceph_vino(inode), 0, &len, 1, 3, 1699 ceph_vino(inode), 0, &len, 1, 3,
1702 CEPH_OSD_OP_WRITE, 1700 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
1703 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1704 NULL, ci->i_truncate_seq, 1701 NULL, ci->i_truncate_seq,
1705 ci->i_truncate_size, false); 1702 ci->i_truncate_size, false);
1706 if (IS_ERR(req)) { 1703 if (IS_ERR(req)) {
@@ -1873,7 +1870,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
1873 goto out_unlock; 1870 goto out_unlock;
1874 } 1871 }
1875 1872
1876 wr_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ACK; 1873 wr_req->r_flags = CEPH_OSD_FLAG_WRITE;
1877 osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL); 1874 osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
1878 ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc); 1875 ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc);
1879 ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid); 1876 ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid);
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index 5bc5d37b1217..4e7421caf380 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -234,7 +234,7 @@ void ceph_fscache_file_set_cookie(struct inode *inode, struct file *filp)
234 fscache_enable_cookie(ci->fscache, ceph_fscache_can_enable, 234 fscache_enable_cookie(ci->fscache, ceph_fscache_can_enable,
235 inode); 235 inode);
236 if (fscache_cookie_enabled(ci->fscache)) { 236 if (fscache_cookie_enabled(ci->fscache)) {
237 dout("fscache_file_set_cookie %p %p enabing cache\n", 237 dout("fscache_file_set_cookie %p %p enabling cache\n",
238 inode, filp); 238 inode, filp);
239 } 239 }
240 } 240 }
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 94fd76d04683..cd966f276a8d 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -867,7 +867,7 @@ int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
867/* 867/*
868 * Return caps we have registered with the MDS(s) as 'wanted'. 868 * Return caps we have registered with the MDS(s) as 'wanted'.
869 */ 869 */
870int __ceph_caps_mds_wanted(struct ceph_inode_info *ci) 870int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check)
871{ 871{
872 struct ceph_cap *cap; 872 struct ceph_cap *cap;
873 struct rb_node *p; 873 struct rb_node *p;
@@ -875,7 +875,7 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)
875 875
876 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 876 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
877 cap = rb_entry(p, struct ceph_cap, ci_node); 877 cap = rb_entry(p, struct ceph_cap, ci_node);
878 if (!__cap_is_valid(cap)) 878 if (check && !__cap_is_valid(cap))
879 continue; 879 continue;
880 if (cap == ci->i_auth_cap) 880 if (cap == ci->i_auth_cap)
881 mds_wanted |= cap->mds_wanted; 881 mds_wanted |= cap->mds_wanted;
@@ -1184,6 +1184,13 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1184 delayed = 1; 1184 delayed = 1;
1185 } 1185 }
1186 ci->i_ceph_flags &= ~(CEPH_I_NODELAY | CEPH_I_FLUSH); 1186 ci->i_ceph_flags &= ~(CEPH_I_NODELAY | CEPH_I_FLUSH);
1187 if (want & ~cap->mds_wanted) {
1188 /* user space may open/close single file frequently.
1189 * This avoids droping mds_wanted immediately after
1190 * requesting new mds_wanted.
1191 */
1192 __cap_set_timeouts(mdsc, ci);
1193 }
1187 1194
1188 cap->issued &= retain; /* drop bits we don't want */ 1195 cap->issued &= retain; /* drop bits we don't want */
1189 if (cap->implemented & ~cap->issued) { 1196 if (cap->implemented & ~cap->issued) {
@@ -2084,8 +2091,6 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
2084 2091
2085 dout("fsync %p%s\n", inode, datasync ? " datasync" : ""); 2092 dout("fsync %p%s\n", inode, datasync ? " datasync" : "");
2086 2093
2087 ceph_sync_write_wait(inode);
2088
2089 ret = filemap_write_and_wait_range(inode->i_mapping, start, end); 2094 ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
2090 if (ret < 0) 2095 if (ret < 0)
2091 goto out; 2096 goto out;
@@ -2477,23 +2482,22 @@ again:
2477 2482
2478 if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) { 2483 if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) {
2479 int mds_wanted; 2484 int mds_wanted;
2480 if (ACCESS_ONCE(mdsc->fsc->mount_state) == 2485 if (READ_ONCE(mdsc->fsc->mount_state) ==
2481 CEPH_MOUNT_SHUTDOWN) { 2486 CEPH_MOUNT_SHUTDOWN) {
2482 dout("get_cap_refs %p forced umount\n", inode); 2487 dout("get_cap_refs %p forced umount\n", inode);
2483 *err = -EIO; 2488 *err = -EIO;
2484 ret = 1; 2489 ret = 1;
2485 goto out_unlock; 2490 goto out_unlock;
2486 } 2491 }
2487 mds_wanted = __ceph_caps_mds_wanted(ci); 2492 mds_wanted = __ceph_caps_mds_wanted(ci, false);
2488 if ((mds_wanted & need) != need) { 2493 if (need & ~(mds_wanted & need)) {
2489 dout("get_cap_refs %p caps were dropped" 2494 dout("get_cap_refs %p caps were dropped"
2490 " (session killed?)\n", inode); 2495 " (session killed?)\n", inode);
2491 *err = -ESTALE; 2496 *err = -ESTALE;
2492 ret = 1; 2497 ret = 1;
2493 goto out_unlock; 2498 goto out_unlock;
2494 } 2499 }
2495 if ((mds_wanted & file_wanted) == 2500 if (!(file_wanted & ~mds_wanted))
2496 (file_wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR)))
2497 ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED; 2501 ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED;
2498 } 2502 }
2499 2503
@@ -3404,6 +3408,7 @@ retry:
3404 tcap->implemented |= issued; 3408 tcap->implemented |= issued;
3405 if (cap == ci->i_auth_cap) 3409 if (cap == ci->i_auth_cap)
3406 ci->i_auth_cap = tcap; 3410 ci->i_auth_cap = tcap;
3411
3407 if (!list_empty(&ci->i_cap_flush_list) && 3412 if (!list_empty(&ci->i_cap_flush_list) &&
3408 ci->i_auth_cap == tcap) { 3413 ci->i_auth_cap == tcap) {
3409 spin_lock(&mdsc->cap_dirty_lock); 3414 spin_lock(&mdsc->cap_dirty_lock);
@@ -3417,9 +3422,18 @@ retry:
3417 } else if (tsession) { 3422 } else if (tsession) {
3418 /* add placeholder for the export tagert */ 3423 /* add placeholder for the export tagert */
3419 int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0; 3424 int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
3425 tcap = new_cap;
3420 ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0, 3426 ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
3421 t_seq - 1, t_mseq, (u64)-1, flag, &new_cap); 3427 t_seq - 1, t_mseq, (u64)-1, flag, &new_cap);
3422 3428
3429 if (!list_empty(&ci->i_cap_flush_list) &&
3430 ci->i_auth_cap == tcap) {
3431 spin_lock(&mdsc->cap_dirty_lock);
3432 list_move_tail(&ci->i_flushing_item,
3433 &tcap->session->s_cap_flushing);
3434 spin_unlock(&mdsc->cap_dirty_lock);
3435 }
3436
3423 __ceph_remove_cap(cap, false); 3437 __ceph_remove_cap(cap, false);
3424 goto out_unlock; 3438 goto out_unlock;
3425 } 3439 }
@@ -3924,9 +3938,10 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
3924} 3938}
3925 3939
3926int ceph_encode_dentry_release(void **p, struct dentry *dentry, 3940int ceph_encode_dentry_release(void **p, struct dentry *dentry,
3941 struct inode *dir,
3927 int mds, int drop, int unless) 3942 int mds, int drop, int unless)
3928{ 3943{
3929 struct inode *dir = d_inode(dentry->d_parent); 3944 struct dentry *parent = NULL;
3930 struct ceph_mds_request_release *rel = *p; 3945 struct ceph_mds_request_release *rel = *p;
3931 struct ceph_dentry_info *di = ceph_dentry(dentry); 3946 struct ceph_dentry_info *di = ceph_dentry(dentry);
3932 int force = 0; 3947 int force = 0;
@@ -3941,9 +3956,14 @@ int ceph_encode_dentry_release(void **p, struct dentry *dentry,
3941 spin_lock(&dentry->d_lock); 3956 spin_lock(&dentry->d_lock);
3942 if (di->lease_session && di->lease_session->s_mds == mds) 3957 if (di->lease_session && di->lease_session->s_mds == mds)
3943 force = 1; 3958 force = 1;
3959 if (!dir) {
3960 parent = dget(dentry->d_parent);
3961 dir = d_inode(parent);
3962 }
3944 spin_unlock(&dentry->d_lock); 3963 spin_unlock(&dentry->d_lock);
3945 3964
3946 ret = ceph_encode_inode_release(p, dir, mds, drop, unless, force); 3965 ret = ceph_encode_inode_release(p, dir, mds, drop, unless, force);
3966 dput(parent);
3947 3967
3948 spin_lock(&dentry->d_lock); 3968 spin_lock(&dentry->d_lock);
3949 if (ret && di->lease_session && di->lease_session->s_mds == mds) { 3969 if (ret && di->lease_session && di->lease_session->s_mds == mds) {
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 39ff678e567f..f2ae393e2c31 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -70,7 +70,7 @@ static int mdsc_show(struct seq_file *s, void *p)
70 70
71 seq_printf(s, "%s", ceph_mds_op_name(req->r_op)); 71 seq_printf(s, "%s", ceph_mds_op_name(req->r_op));
72 72
73 if (req->r_got_unsafe) 73 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
74 seq_puts(s, "\t(unsafe)"); 74 seq_puts(s, "\t(unsafe)");
75 else 75 else
76 seq_puts(s, "\t"); 76 seq_puts(s, "\t");
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 8ab1fdf0bd49..3e9ad501addf 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -371,7 +371,7 @@ more:
371 /* hints to request -> mds selection code */ 371 /* hints to request -> mds selection code */
372 req->r_direct_mode = USE_AUTH_MDS; 372 req->r_direct_mode = USE_AUTH_MDS;
373 req->r_direct_hash = ceph_frag_value(frag); 373 req->r_direct_hash = ceph_frag_value(frag);
374 req->r_direct_is_hash = true; 374 __set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
375 if (fi->last_name) { 375 if (fi->last_name) {
376 req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL); 376 req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL);
377 if (!req->r_path2) { 377 if (!req->r_path2) {
@@ -417,7 +417,7 @@ more:
417 fi->frag = frag; 417 fi->frag = frag;
418 fi->last_readdir = req; 418 fi->last_readdir = req;
419 419
420 if (req->r_did_prepopulate) { 420 if (test_bit(CEPH_MDS_R_DID_PREPOPULATE, &req->r_req_flags)) {
421 fi->readdir_cache_idx = req->r_readdir_cache_idx; 421 fi->readdir_cache_idx = req->r_readdir_cache_idx;
422 if (fi->readdir_cache_idx < 0) { 422 if (fi->readdir_cache_idx < 0) {
423 /* preclude from marking dir ordered */ 423 /* preclude from marking dir ordered */
@@ -752,7 +752,8 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
752 mask |= CEPH_CAP_XATTR_SHARED; 752 mask |= CEPH_CAP_XATTR_SHARED;
753 req->r_args.getattr.mask = cpu_to_le32(mask); 753 req->r_args.getattr.mask = cpu_to_le32(mask);
754 754
755 req->r_locked_dir = dir; 755 req->r_parent = dir;
756 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
756 err = ceph_mdsc_do_request(mdsc, NULL, req); 757 err = ceph_mdsc_do_request(mdsc, NULL, req);
757 err = ceph_handle_snapdir(req, dentry, err); 758 err = ceph_handle_snapdir(req, dentry, err);
758 dentry = ceph_finish_lookup(req, dentry, err); 759 dentry = ceph_finish_lookup(req, dentry, err);
@@ -813,7 +814,8 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
813 } 814 }
814 req->r_dentry = dget(dentry); 815 req->r_dentry = dget(dentry);
815 req->r_num_caps = 2; 816 req->r_num_caps = 2;
816 req->r_locked_dir = dir; 817 req->r_parent = dir;
818 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
817 req->r_args.mknod.mode = cpu_to_le32(mode); 819 req->r_args.mknod.mode = cpu_to_le32(mode);
818 req->r_args.mknod.rdev = cpu_to_le32(rdev); 820 req->r_args.mknod.rdev = cpu_to_le32(rdev);
819 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 821 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
@@ -864,7 +866,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
864 ceph_mdsc_put_request(req); 866 ceph_mdsc_put_request(req);
865 goto out; 867 goto out;
866 } 868 }
867 req->r_locked_dir = dir; 869 req->r_parent = dir;
870 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
868 req->r_dentry = dget(dentry); 871 req->r_dentry = dget(dentry);
869 req->r_num_caps = 2; 872 req->r_num_caps = 2;
870 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 873 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
@@ -913,7 +916,8 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
913 916
914 req->r_dentry = dget(dentry); 917 req->r_dentry = dget(dentry);
915 req->r_num_caps = 2; 918 req->r_num_caps = 2;
916 req->r_locked_dir = dir; 919 req->r_parent = dir;
920 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
917 req->r_args.mkdir.mode = cpu_to_le32(mode); 921 req->r_args.mkdir.mode = cpu_to_le32(mode);
918 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 922 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
919 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 923 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
@@ -957,7 +961,8 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
957 req->r_dentry = dget(dentry); 961 req->r_dentry = dget(dentry);
958 req->r_num_caps = 2; 962 req->r_num_caps = 2;
959 req->r_old_dentry = dget(old_dentry); 963 req->r_old_dentry = dget(old_dentry);
960 req->r_locked_dir = dir; 964 req->r_parent = dir;
965 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
961 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 966 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
962 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 967 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
963 /* release LINK_SHARED on source inode (mds will lock it) */ 968 /* release LINK_SHARED on source inode (mds will lock it) */
@@ -1023,7 +1028,8 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
1023 } 1028 }
1024 req->r_dentry = dget(dentry); 1029 req->r_dentry = dget(dentry);
1025 req->r_num_caps = 2; 1030 req->r_num_caps = 2;
1026 req->r_locked_dir = dir; 1031 req->r_parent = dir;
1032 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
1027 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 1033 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
1028 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 1034 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
1029 req->r_inode_drop = drop_caps_for_unlink(inode); 1035 req->r_inode_drop = drop_caps_for_unlink(inode);
@@ -1066,7 +1072,8 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
1066 req->r_num_caps = 2; 1072 req->r_num_caps = 2;
1067 req->r_old_dentry = dget(old_dentry); 1073 req->r_old_dentry = dget(old_dentry);
1068 req->r_old_dentry_dir = old_dir; 1074 req->r_old_dentry_dir = old_dir;
1069 req->r_locked_dir = new_dir; 1075 req->r_parent = new_dir;
1076 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
1070 req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED; 1077 req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED;
1071 req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL; 1078 req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
1072 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 1079 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
@@ -1194,7 +1201,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
1194 struct inode *dir; 1201 struct inode *dir;
1195 1202
1196 if (flags & LOOKUP_RCU) { 1203 if (flags & LOOKUP_RCU) {
1197 parent = ACCESS_ONCE(dentry->d_parent); 1204 parent = READ_ONCE(dentry->d_parent);
1198 dir = d_inode_rcu(parent); 1205 dir = d_inode_rcu(parent);
1199 if (!dir) 1206 if (!dir)
1200 return -ECHILD; 1207 return -ECHILD;
@@ -1237,11 +1244,12 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
1237 return -ECHILD; 1244 return -ECHILD;
1238 1245
1239 op = ceph_snap(dir) == CEPH_SNAPDIR ? 1246 op = ceph_snap(dir) == CEPH_SNAPDIR ?
1240 CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_GETATTR; 1247 CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
1241 req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS); 1248 req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
1242 if (!IS_ERR(req)) { 1249 if (!IS_ERR(req)) {
1243 req->r_dentry = dget(dentry); 1250 req->r_dentry = dget(dentry);
1244 req->r_num_caps = op == CEPH_MDS_OP_GETATTR ? 1 : 2; 1251 req->r_num_caps = 2;
1252 req->r_parent = dir;
1245 1253
1246 mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; 1254 mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
1247 if (ceph_security_xattr_wanted(dir)) 1255 if (ceph_security_xattr_wanted(dir))
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 180bbef760f2..e8f11fa565c5 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -207,7 +207,8 @@ static int ceph_get_name(struct dentry *parent, char *name,
207 req->r_inode = d_inode(child); 207 req->r_inode = d_inode(child);
208 ihold(d_inode(child)); 208 ihold(d_inode(child));
209 req->r_ino2 = ceph_vino(d_inode(parent)); 209 req->r_ino2 = ceph_vino(d_inode(parent));
210 req->r_locked_dir = d_inode(parent); 210 req->r_parent = d_inode(parent);
211 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
211 req->r_num_caps = 2; 212 req->r_num_caps = 2;
212 err = ceph_mdsc_do_request(mdsc, NULL, req); 213 err = ceph_mdsc_do_request(mdsc, NULL, req);
213 214
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 045d30d26624..26cc95421cca 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -283,7 +283,7 @@ int ceph_open(struct inode *inode, struct file *file)
283 spin_lock(&ci->i_ceph_lock); 283 spin_lock(&ci->i_ceph_lock);
284 if (__ceph_is_any_real_caps(ci) && 284 if (__ceph_is_any_real_caps(ci) &&
285 (((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) { 285 (((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) {
286 int mds_wanted = __ceph_caps_mds_wanted(ci); 286 int mds_wanted = __ceph_caps_mds_wanted(ci, true);
287 int issued = __ceph_caps_issued(ci, NULL); 287 int issued = __ceph_caps_issued(ci, NULL);
288 288
289 dout("open %p fmode %d want %s issued %s using existing\n", 289 dout("open %p fmode %d want %s issued %s using existing\n",
@@ -379,7 +379,8 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
379 mask |= CEPH_CAP_XATTR_SHARED; 379 mask |= CEPH_CAP_XATTR_SHARED;
380 req->r_args.open.mask = cpu_to_le32(mask); 380 req->r_args.open.mask = cpu_to_le32(mask);
381 381
382 req->r_locked_dir = dir; /* caller holds dir->i_mutex */ 382 req->r_parent = dir;
383 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
383 err = ceph_mdsc_do_request(mdsc, 384 err = ceph_mdsc_do_request(mdsc,
384 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, 385 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
385 req); 386 req);
@@ -758,9 +759,7 @@ static void ceph_aio_retry_work(struct work_struct *work)
758 goto out; 759 goto out;
759 } 760 }
760 761
761 req->r_flags = CEPH_OSD_FLAG_ORDERSNAP | 762 req->r_flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_WRITE;
762 CEPH_OSD_FLAG_ONDISK |
763 CEPH_OSD_FLAG_WRITE;
764 ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc); 763 ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
765 ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid); 764 ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
766 765
@@ -794,89 +793,6 @@ out:
794 kfree(aio_work); 793 kfree(aio_work);
795} 794}
796 795
797/*
798 * Write commit request unsafe callback, called to tell us when a
799 * request is unsafe (that is, in flight--has been handed to the
800 * messenger to send to its target osd). It is called again when
801 * we've received a response message indicating the request is
802 * "safe" (its CEPH_OSD_FLAG_ONDISK flag is set), or when a request
803 * is completed early (and unsuccessfully) due to a timeout or
804 * interrupt.
805 *
806 * This is used if we requested both an ACK and ONDISK commit reply
807 * from the OSD.
808 */
809static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
810{
811 struct ceph_inode_info *ci = ceph_inode(req->r_inode);
812
813 dout("%s %p tid %llu %ssafe\n", __func__, req, req->r_tid,
814 unsafe ? "un" : "");
815 if (unsafe) {
816 ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR);
817 spin_lock(&ci->i_unsafe_lock);
818 list_add_tail(&req->r_unsafe_item,
819 &ci->i_unsafe_writes);
820 spin_unlock(&ci->i_unsafe_lock);
821
822 complete_all(&req->r_completion);
823 } else {
824 spin_lock(&ci->i_unsafe_lock);
825 list_del_init(&req->r_unsafe_item);
826 spin_unlock(&ci->i_unsafe_lock);
827 ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
828 }
829}
830
831/*
832 * Wait on any unsafe replies for the given inode. First wait on the
833 * newest request, and make that the upper bound. Then, if there are
834 * more requests, keep waiting on the oldest as long as it is still older
835 * than the original request.
836 */
837void ceph_sync_write_wait(struct inode *inode)
838{
839 struct ceph_inode_info *ci = ceph_inode(inode);
840 struct list_head *head = &ci->i_unsafe_writes;
841 struct ceph_osd_request *req;
842 u64 last_tid;
843
844 if (!S_ISREG(inode->i_mode))
845 return;
846
847 spin_lock(&ci->i_unsafe_lock);
848 if (list_empty(head))
849 goto out;
850
851 /* set upper bound as _last_ entry in chain */
852
853 req = list_last_entry(head, struct ceph_osd_request,
854 r_unsafe_item);
855 last_tid = req->r_tid;
856
857 do {
858 ceph_osdc_get_request(req);
859 spin_unlock(&ci->i_unsafe_lock);
860
861 dout("sync_write_wait on tid %llu (until %llu)\n",
862 req->r_tid, last_tid);
863 wait_for_completion(&req->r_done_completion);
864 ceph_osdc_put_request(req);
865
866 spin_lock(&ci->i_unsafe_lock);
867 /*
868 * from here on look at first entry in chain, since we
869 * only want to wait for anything older than last_tid
870 */
871 if (list_empty(head))
872 break;
873 req = list_first_entry(head, struct ceph_osd_request,
874 r_unsafe_item);
875 } while (req->r_tid < last_tid);
876out:
877 spin_unlock(&ci->i_unsafe_lock);
878}
879
880static ssize_t 796static ssize_t
881ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, 797ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
882 struct ceph_snap_context *snapc, 798 struct ceph_snap_context *snapc,
@@ -915,9 +831,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
915 if (ret2 < 0) 831 if (ret2 < 0)
916 dout("invalidate_inode_pages2_range returned %d\n", ret2); 832 dout("invalidate_inode_pages2_range returned %d\n", ret2);
917 833
918 flags = CEPH_OSD_FLAG_ORDERSNAP | 834 flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_WRITE;
919 CEPH_OSD_FLAG_ONDISK |
920 CEPH_OSD_FLAG_WRITE;
921 } else { 835 } else {
922 flags = CEPH_OSD_FLAG_READ; 836 flags = CEPH_OSD_FLAG_READ;
923 } 837 }
@@ -1116,10 +1030,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
1116 if (ret < 0) 1030 if (ret < 0)
1117 dout("invalidate_inode_pages2_range returned %d\n", ret); 1031 dout("invalidate_inode_pages2_range returned %d\n", ret);
1118 1032
1119 flags = CEPH_OSD_FLAG_ORDERSNAP | 1033 flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_WRITE;
1120 CEPH_OSD_FLAG_ONDISK |
1121 CEPH_OSD_FLAG_WRITE |
1122 CEPH_OSD_FLAG_ACK;
1123 1034
1124 while ((len = iov_iter_count(from)) > 0) { 1035 while ((len = iov_iter_count(from)) > 0) {
1125 size_t left; 1036 size_t left;
@@ -1165,8 +1076,6 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
1165 goto out; 1076 goto out;
1166 } 1077 }
1167 1078
1168 /* get a second commit callback */
1169 req->r_unsafe_callback = ceph_sync_write_unsafe;
1170 req->r_inode = inode; 1079 req->r_inode = inode;
1171 1080
1172 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, 1081 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
@@ -1616,8 +1525,7 @@ static int ceph_zero_partial_object(struct inode *inode,
1616 ceph_vino(inode), 1525 ceph_vino(inode),
1617 offset, length, 1526 offset, length,
1618 0, 1, op, 1527 0, 1, op,
1619 CEPH_OSD_FLAG_WRITE | 1528 CEPH_OSD_FLAG_WRITE,
1620 CEPH_OSD_FLAG_ONDISK,
1621 NULL, 0, 0, false); 1529 NULL, 0, 0, false);
1622 if (IS_ERR(req)) { 1530 if (IS_ERR(req)) {
1623 ret = PTR_ERR(req); 1531 ret = PTR_ERR(req);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 5e659d054b40..fd8f771f99b7 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -499,7 +499,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
499 ci->i_rdcache_gen = 0; 499 ci->i_rdcache_gen = 0;
500 ci->i_rdcache_revoking = 0; 500 ci->i_rdcache_revoking = 0;
501 501
502 INIT_LIST_HEAD(&ci->i_unsafe_writes);
503 INIT_LIST_HEAD(&ci->i_unsafe_dirops); 502 INIT_LIST_HEAD(&ci->i_unsafe_dirops);
504 INIT_LIST_HEAD(&ci->i_unsafe_iops); 503 INIT_LIST_HEAD(&ci->i_unsafe_iops);
505 spin_lock_init(&ci->i_unsafe_lock); 504 spin_lock_init(&ci->i_unsafe_lock);
@@ -583,14 +582,6 @@ int ceph_drop_inode(struct inode *inode)
583 return 1; 582 return 1;
584} 583}
585 584
586void ceph_evict_inode(struct inode *inode)
587{
588 /* wait unsafe sync writes */
589 ceph_sync_write_wait(inode);
590 truncate_inode_pages_final(&inode->i_data);
591 clear_inode(inode);
592}
593
594static inline blkcnt_t calc_inode_blocks(u64 size) 585static inline blkcnt_t calc_inode_blocks(u64 size)
595{ 586{
596 return (size + (1<<9) - 1) >> 9; 587 return (size + (1<<9) - 1) >> 9;
@@ -1016,7 +1007,9 @@ out:
1016static void update_dentry_lease(struct dentry *dentry, 1007static void update_dentry_lease(struct dentry *dentry,
1017 struct ceph_mds_reply_lease *lease, 1008 struct ceph_mds_reply_lease *lease,
1018 struct ceph_mds_session *session, 1009 struct ceph_mds_session *session,
1019 unsigned long from_time) 1010 unsigned long from_time,
1011 struct ceph_vino *tgt_vino,
1012 struct ceph_vino *dir_vino)
1020{ 1013{
1021 struct ceph_dentry_info *di = ceph_dentry(dentry); 1014 struct ceph_dentry_info *di = ceph_dentry(dentry);
1022 long unsigned duration = le32_to_cpu(lease->duration_ms); 1015 long unsigned duration = le32_to_cpu(lease->duration_ms);
@@ -1024,13 +1017,27 @@ static void update_dentry_lease(struct dentry *dentry,
1024 long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000; 1017 long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000;
1025 struct inode *dir; 1018 struct inode *dir;
1026 1019
1020 /*
1021 * Make sure dentry's inode matches tgt_vino. NULL tgt_vino means that
1022 * we expect a negative dentry.
1023 */
1024 if (!tgt_vino && d_really_is_positive(dentry))
1025 return;
1026
1027 if (tgt_vino && (d_really_is_negative(dentry) ||
1028 !ceph_ino_compare(d_inode(dentry), tgt_vino)))
1029 return;
1030
1027 spin_lock(&dentry->d_lock); 1031 spin_lock(&dentry->d_lock);
1028 dout("update_dentry_lease %p duration %lu ms ttl %lu\n", 1032 dout("update_dentry_lease %p duration %lu ms ttl %lu\n",
1029 dentry, duration, ttl); 1033 dentry, duration, ttl);
1030 1034
1031 /* make lease_rdcache_gen match directory */
1032 dir = d_inode(dentry->d_parent); 1035 dir = d_inode(dentry->d_parent);
1033 1036
1037 /* make sure parent matches dir_vino */
1038 if (!ceph_ino_compare(dir, dir_vino))
1039 goto out_unlock;
1040
1034 /* only track leases on regular dentries */ 1041 /* only track leases on regular dentries */
1035 if (ceph_snap(dir) != CEPH_NOSNAP) 1042 if (ceph_snap(dir) != CEPH_NOSNAP)
1036 goto out_unlock; 1043 goto out_unlock;
@@ -1108,61 +1115,27 @@ out:
1108 * 1115 *
1109 * Called with snap_rwsem (read). 1116 * Called with snap_rwsem (read).
1110 */ 1117 */
1111int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, 1118int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
1112 struct ceph_mds_session *session)
1113{ 1119{
1120 struct ceph_mds_session *session = req->r_session;
1114 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 1121 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1115 struct inode *in = NULL; 1122 struct inode *in = NULL;
1116 struct ceph_vino vino; 1123 struct ceph_vino tvino, dvino;
1117 struct ceph_fs_client *fsc = ceph_sb_to_client(sb); 1124 struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
1118 int err = 0; 1125 int err = 0;
1119 1126
1120 dout("fill_trace %p is_dentry %d is_target %d\n", req, 1127 dout("fill_trace %p is_dentry %d is_target %d\n", req,
1121 rinfo->head->is_dentry, rinfo->head->is_target); 1128 rinfo->head->is_dentry, rinfo->head->is_target);
1122 1129
1123#if 0
1124 /*
1125 * Debugging hook:
1126 *
1127 * If we resend completed ops to a recovering mds, we get no
1128 * trace. Since that is very rare, pretend this is the case
1129 * to ensure the 'no trace' handlers in the callers behave.
1130 *
1131 * Fill in inodes unconditionally to avoid breaking cap
1132 * invariants.
1133 */
1134 if (rinfo->head->op & CEPH_MDS_OP_WRITE) {
1135 pr_info("fill_trace faking empty trace on %lld %s\n",
1136 req->r_tid, ceph_mds_op_name(rinfo->head->op));
1137 if (rinfo->head->is_dentry) {
1138 rinfo->head->is_dentry = 0;
1139 err = fill_inode(req->r_locked_dir,
1140 &rinfo->diri, rinfo->dirfrag,
1141 session, req->r_request_started, -1);
1142 }
1143 if (rinfo->head->is_target) {
1144 rinfo->head->is_target = 0;
1145 ininfo = rinfo->targeti.in;
1146 vino.ino = le64_to_cpu(ininfo->ino);
1147 vino.snap = le64_to_cpu(ininfo->snapid);
1148 in = ceph_get_inode(sb, vino);
1149 err = fill_inode(in, &rinfo->targeti, NULL,
1150 session, req->r_request_started,
1151 req->r_fmode);
1152 iput(in);
1153 }
1154 }
1155#endif
1156
1157 if (!rinfo->head->is_target && !rinfo->head->is_dentry) { 1130 if (!rinfo->head->is_target && !rinfo->head->is_dentry) {
1158 dout("fill_trace reply is empty!\n"); 1131 dout("fill_trace reply is empty!\n");
1159 if (rinfo->head->result == 0 && req->r_locked_dir) 1132 if (rinfo->head->result == 0 && req->r_parent)
1160 ceph_invalidate_dir_request(req); 1133 ceph_invalidate_dir_request(req);
1161 return 0; 1134 return 0;
1162 } 1135 }
1163 1136
1164 if (rinfo->head->is_dentry) { 1137 if (rinfo->head->is_dentry) {
1165 struct inode *dir = req->r_locked_dir; 1138 struct inode *dir = req->r_parent;
1166 1139
1167 if (dir) { 1140 if (dir) {
1168 err = fill_inode(dir, NULL, 1141 err = fill_inode(dir, NULL,
@@ -1188,8 +1161,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
1188 dname.name = rinfo->dname; 1161 dname.name = rinfo->dname;
1189 dname.len = rinfo->dname_len; 1162 dname.len = rinfo->dname_len;
1190 dname.hash = full_name_hash(parent, dname.name, dname.len); 1163 dname.hash = full_name_hash(parent, dname.name, dname.len);
1191 vino.ino = le64_to_cpu(rinfo->targeti.in->ino); 1164 tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
1192 vino.snap = le64_to_cpu(rinfo->targeti.in->snapid); 1165 tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
1193retry_lookup: 1166retry_lookup:
1194 dn = d_lookup(parent, &dname); 1167 dn = d_lookup(parent, &dname);
1195 dout("d_lookup on parent=%p name=%.*s got %p\n", 1168 dout("d_lookup on parent=%p name=%.*s got %p\n",
@@ -1206,8 +1179,8 @@ retry_lookup:
1206 } 1179 }
1207 err = 0; 1180 err = 0;
1208 } else if (d_really_is_positive(dn) && 1181 } else if (d_really_is_positive(dn) &&
1209 (ceph_ino(d_inode(dn)) != vino.ino || 1182 (ceph_ino(d_inode(dn)) != tvino.ino ||
1210 ceph_snap(d_inode(dn)) != vino.snap)) { 1183 ceph_snap(d_inode(dn)) != tvino.snap)) {
1211 dout(" dn %p points to wrong inode %p\n", 1184 dout(" dn %p points to wrong inode %p\n",
1212 dn, d_inode(dn)); 1185 dn, d_inode(dn));
1213 d_delete(dn); 1186 d_delete(dn);
@@ -1221,10 +1194,10 @@ retry_lookup:
1221 } 1194 }
1222 1195
1223 if (rinfo->head->is_target) { 1196 if (rinfo->head->is_target) {
1224 vino.ino = le64_to_cpu(rinfo->targeti.in->ino); 1197 tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
1225 vino.snap = le64_to_cpu(rinfo->targeti.in->snapid); 1198 tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
1226 1199
1227 in = ceph_get_inode(sb, vino); 1200 in = ceph_get_inode(sb, tvino);
1228 if (IS_ERR(in)) { 1201 if (IS_ERR(in)) {
1229 err = PTR_ERR(in); 1202 err = PTR_ERR(in);
1230 goto done; 1203 goto done;
@@ -1233,8 +1206,8 @@ retry_lookup:
1233 1206
1234 err = fill_inode(in, req->r_locked_page, &rinfo->targeti, NULL, 1207 err = fill_inode(in, req->r_locked_page, &rinfo->targeti, NULL,
1235 session, req->r_request_started, 1208 session, req->r_request_started,
1236 (!req->r_aborted && rinfo->head->result == 0) ? 1209 (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) &&
1237 req->r_fmode : -1, 1210 rinfo->head->result == 0) ? req->r_fmode : -1,
1238 &req->r_caps_reservation); 1211 &req->r_caps_reservation);
1239 if (err < 0) { 1212 if (err < 0) {
1240 pr_err("fill_inode badness %p %llx.%llx\n", 1213 pr_err("fill_inode badness %p %llx.%llx\n",
@@ -1247,8 +1220,9 @@ retry_lookup:
1247 * ignore null lease/binding on snapdir ENOENT, or else we 1220 * ignore null lease/binding on snapdir ENOENT, or else we
1248 * will have trouble splicing in the virtual snapdir later 1221 * will have trouble splicing in the virtual snapdir later
1249 */ 1222 */
1250 if (rinfo->head->is_dentry && !req->r_aborted && 1223 if (rinfo->head->is_dentry &&
1251 req->r_locked_dir && 1224 !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) &&
1225 test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
1252 (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name, 1226 (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name,
1253 fsc->mount_options->snapdir_name, 1227 fsc->mount_options->snapdir_name,
1254 req->r_dentry->d_name.len))) { 1228 req->r_dentry->d_name.len))) {
@@ -1257,17 +1231,19 @@ retry_lookup:
1257 * mknod symlink mkdir : null -> new inode 1231 * mknod symlink mkdir : null -> new inode
1258 * unlink : linked -> null 1232 * unlink : linked -> null
1259 */ 1233 */
1260 struct inode *dir = req->r_locked_dir; 1234 struct inode *dir = req->r_parent;
1261 struct dentry *dn = req->r_dentry; 1235 struct dentry *dn = req->r_dentry;
1262 bool have_dir_cap, have_lease; 1236 bool have_dir_cap, have_lease;
1263 1237
1264 BUG_ON(!dn); 1238 BUG_ON(!dn);
1265 BUG_ON(!dir); 1239 BUG_ON(!dir);
1266 BUG_ON(d_inode(dn->d_parent) != dir); 1240 BUG_ON(d_inode(dn->d_parent) != dir);
1267 BUG_ON(ceph_ino(dir) != 1241
1268 le64_to_cpu(rinfo->diri.in->ino)); 1242 dvino.ino = le64_to_cpu(rinfo->diri.in->ino);
1269 BUG_ON(ceph_snap(dir) != 1243 dvino.snap = le64_to_cpu(rinfo->diri.in->snapid);
1270 le64_to_cpu(rinfo->diri.in->snapid)); 1244
1245 BUG_ON(ceph_ino(dir) != dvino.ino);
1246 BUG_ON(ceph_snap(dir) != dvino.snap);
1271 1247
1272 /* do we have a lease on the whole dir? */ 1248 /* do we have a lease on the whole dir? */
1273 have_dir_cap = 1249 have_dir_cap =
@@ -1319,12 +1295,13 @@ retry_lookup:
1319 ceph_dir_clear_ordered(dir); 1295 ceph_dir_clear_ordered(dir);
1320 dout("d_delete %p\n", dn); 1296 dout("d_delete %p\n", dn);
1321 d_delete(dn); 1297 d_delete(dn);
1322 } else { 1298 } else if (have_lease) {
1323 if (have_lease && d_unhashed(dn)) 1299 if (d_unhashed(dn))
1324 d_add(dn, NULL); 1300 d_add(dn, NULL);
1325 update_dentry_lease(dn, rinfo->dlease, 1301 update_dentry_lease(dn, rinfo->dlease,
1326 session, 1302 session,
1327 req->r_request_started); 1303 req->r_request_started,
1304 NULL, &dvino);
1328 } 1305 }
1329 goto done; 1306 goto done;
1330 } 1307 }
@@ -1347,15 +1324,19 @@ retry_lookup:
1347 have_lease = false; 1324 have_lease = false;
1348 } 1325 }
1349 1326
1350 if (have_lease) 1327 if (have_lease) {
1328 tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
1329 tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
1351 update_dentry_lease(dn, rinfo->dlease, session, 1330 update_dentry_lease(dn, rinfo->dlease, session,
1352 req->r_request_started); 1331 req->r_request_started,
1332 &tvino, &dvino);
1333 }
1353 dout(" final dn %p\n", dn); 1334 dout(" final dn %p\n", dn);
1354 } else if (!req->r_aborted && 1335 } else if ((req->r_op == CEPH_MDS_OP_LOOKUPSNAP ||
1355 (req->r_op == CEPH_MDS_OP_LOOKUPSNAP || 1336 req->r_op == CEPH_MDS_OP_MKSNAP) &&
1356 req->r_op == CEPH_MDS_OP_MKSNAP)) { 1337 !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
1357 struct dentry *dn = req->r_dentry; 1338 struct dentry *dn = req->r_dentry;
1358 struct inode *dir = req->r_locked_dir; 1339 struct inode *dir = req->r_parent;
1359 1340
1360 /* fill out a snapdir LOOKUPSNAP dentry */ 1341 /* fill out a snapdir LOOKUPSNAP dentry */
1361 BUG_ON(!dn); 1342 BUG_ON(!dn);
@@ -1370,6 +1351,26 @@ retry_lookup:
1370 goto done; 1351 goto done;
1371 } 1352 }
1372 req->r_dentry = dn; /* may have spliced */ 1353 req->r_dentry = dn; /* may have spliced */
1354 } else if (rinfo->head->is_dentry) {
1355 struct ceph_vino *ptvino = NULL;
1356
1357 if ((le32_to_cpu(rinfo->diri.in->cap.caps) & CEPH_CAP_FILE_SHARED) ||
1358 le32_to_cpu(rinfo->dlease->duration_ms)) {
1359 dvino.ino = le64_to_cpu(rinfo->diri.in->ino);
1360 dvino.snap = le64_to_cpu(rinfo->diri.in->snapid);
1361
1362 if (rinfo->head->is_target) {
1363 tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
1364 tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
1365 ptvino = &tvino;
1366 }
1367
1368 update_dentry_lease(req->r_dentry, rinfo->dlease,
1369 session, req->r_request_started, ptvino,
1370 &dvino);
1371 } else {
1372 dout("%s: no dentry lease or dir cap\n", __func__);
1373 }
1373 } 1374 }
1374done: 1375done:
1375 dout("fill_trace done err=%d\n", err); 1376 dout("fill_trace done err=%d\n", err);
@@ -1478,7 +1479,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1478 u32 fpos_offset; 1479 u32 fpos_offset;
1479 struct ceph_readdir_cache_control cache_ctl = {}; 1480 struct ceph_readdir_cache_control cache_ctl = {};
1480 1481
1481 if (req->r_aborted) 1482 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
1482 return readdir_prepopulate_inodes_only(req, session); 1483 return readdir_prepopulate_inodes_only(req, session);
1483 1484
1484 if (rinfo->hash_order && req->r_path2) { 1485 if (rinfo->hash_order && req->r_path2) {
@@ -1523,14 +1524,14 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1523 /* FIXME: release caps/leases if error occurs */ 1524 /* FIXME: release caps/leases if error occurs */
1524 for (i = 0; i < rinfo->dir_nr; i++) { 1525 for (i = 0; i < rinfo->dir_nr; i++) {
1525 struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i; 1526 struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
1526 struct ceph_vino vino; 1527 struct ceph_vino tvino, dvino;
1527 1528
1528 dname.name = rde->name; 1529 dname.name = rde->name;
1529 dname.len = rde->name_len; 1530 dname.len = rde->name_len;
1530 dname.hash = full_name_hash(parent, dname.name, dname.len); 1531 dname.hash = full_name_hash(parent, dname.name, dname.len);
1531 1532
1532 vino.ino = le64_to_cpu(rde->inode.in->ino); 1533 tvino.ino = le64_to_cpu(rde->inode.in->ino);
1533 vino.snap = le64_to_cpu(rde->inode.in->snapid); 1534 tvino.snap = le64_to_cpu(rde->inode.in->snapid);
1534 1535
1535 if (rinfo->hash_order) { 1536 if (rinfo->hash_order) {
1536 u32 hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash, 1537 u32 hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
@@ -1559,8 +1560,8 @@ retry_lookup:
1559 goto out; 1560 goto out;
1560 } 1561 }
1561 } else if (d_really_is_positive(dn) && 1562 } else if (d_really_is_positive(dn) &&
1562 (ceph_ino(d_inode(dn)) != vino.ino || 1563 (ceph_ino(d_inode(dn)) != tvino.ino ||
1563 ceph_snap(d_inode(dn)) != vino.snap)) { 1564 ceph_snap(d_inode(dn)) != tvino.snap)) {
1564 dout(" dn %p points to wrong inode %p\n", 1565 dout(" dn %p points to wrong inode %p\n",
1565 dn, d_inode(dn)); 1566 dn, d_inode(dn));
1566 d_delete(dn); 1567 d_delete(dn);
@@ -1572,7 +1573,7 @@ retry_lookup:
1572 if (d_really_is_positive(dn)) { 1573 if (d_really_is_positive(dn)) {
1573 in = d_inode(dn); 1574 in = d_inode(dn);
1574 } else { 1575 } else {
1575 in = ceph_get_inode(parent->d_sb, vino); 1576 in = ceph_get_inode(parent->d_sb, tvino);
1576 if (IS_ERR(in)) { 1577 if (IS_ERR(in)) {
1577 dout("new_inode badness\n"); 1578 dout("new_inode badness\n");
1578 d_drop(dn); 1579 d_drop(dn);
@@ -1617,8 +1618,9 @@ retry_lookup:
1617 1618
1618 ceph_dentry(dn)->offset = rde->offset; 1619 ceph_dentry(dn)->offset = rde->offset;
1619 1620
1621 dvino = ceph_vino(d_inode(parent));
1620 update_dentry_lease(dn, rde->lease, req->r_session, 1622 update_dentry_lease(dn, rde->lease, req->r_session,
1621 req->r_request_started); 1623 req->r_request_started, &tvino, &dvino);
1622 1624
1623 if (err == 0 && skipped == 0 && cache_ctl.index >= 0) { 1625 if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
1624 ret = fill_readdir_cache(d_inode(parent), dn, 1626 ret = fill_readdir_cache(d_inode(parent), dn,
@@ -1632,7 +1634,7 @@ next_item:
1632 } 1634 }
1633out: 1635out:
1634 if (err == 0 && skipped == 0) { 1636 if (err == 0 && skipped == 0) {
1635 req->r_did_prepopulate = true; 1637 set_bit(CEPH_MDS_R_DID_PREPOPULATE, &req->r_req_flags);
1636 req->r_readdir_cache_idx = cache_ctl.index; 1638 req->r_readdir_cache_idx = cache_ctl.index;
1637 } 1639 }
1638 ceph_readdir_cache_release(&cache_ctl); 1640 ceph_readdir_cache_release(&cache_ctl);
@@ -1720,7 +1722,7 @@ static void ceph_invalidate_work(struct work_struct *work)
1720 1722
1721 mutex_lock(&ci->i_truncate_mutex); 1723 mutex_lock(&ci->i_truncate_mutex);
1722 1724
1723 if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 1725 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1724 pr_warn_ratelimited("invalidate_pages %p %lld forced umount\n", 1726 pr_warn_ratelimited("invalidate_pages %p %lld forced umount\n",
1725 inode, ceph_ino(inode)); 1727 inode, ceph_ino(inode));
1726 mapping_set_error(inode->i_mapping, -EIO); 1728 mapping_set_error(inode->i_mapping, -EIO);
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index 7d752d53353a..4c9c72f26eb9 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -25,7 +25,7 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg)
25 l.stripe_count = ci->i_layout.stripe_count; 25 l.stripe_count = ci->i_layout.stripe_count;
26 l.object_size = ci->i_layout.object_size; 26 l.object_size = ci->i_layout.object_size;
27 l.data_pool = ci->i_layout.pool_id; 27 l.data_pool = ci->i_layout.pool_id;
28 l.preferred_osd = (s32)-1; 28 l.preferred_osd = -1;
29 if (copy_to_user(arg, &l, sizeof(l))) 29 if (copy_to_user(arg, &l, sizeof(l)))
30 return -EFAULT; 30 return -EFAULT;
31 } 31 }
@@ -97,7 +97,7 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
97 nl.data_pool = ci->i_layout.pool_id; 97 nl.data_pool = ci->i_layout.pool_id;
98 98
99 /* this is obsolete, and always -1 */ 99 /* this is obsolete, and always -1 */
100 nl.preferred_osd = le64_to_cpu(-1); 100 nl.preferred_osd = -1;
101 101
102 err = __validate_layout(mdsc, &nl); 102 err = __validate_layout(mdsc, &nl);
103 if (err) 103 if (err)
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index c9d2e553a6c4..c681762d76e6 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -547,8 +547,8 @@ void ceph_mdsc_release_request(struct kref *kref)
547 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 547 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
548 iput(req->r_inode); 548 iput(req->r_inode);
549 } 549 }
550 if (req->r_locked_dir) 550 if (req->r_parent)
551 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 551 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
552 iput(req->r_target_inode); 552 iput(req->r_target_inode);
553 if (req->r_dentry) 553 if (req->r_dentry)
554 dput(req->r_dentry); 554 dput(req->r_dentry);
@@ -628,6 +628,9 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
628{ 628{
629 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 629 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
630 630
631 /* Never leave an unregistered request on an unsafe list! */
632 list_del_init(&req->r_unsafe_item);
633
631 if (req->r_tid == mdsc->oldest_tid) { 634 if (req->r_tid == mdsc->oldest_tid) {
632 struct rb_node *p = rb_next(&req->r_node); 635 struct rb_node *p = rb_next(&req->r_node);
633 mdsc->oldest_tid = 0; 636 mdsc->oldest_tid = 0;
@@ -644,13 +647,15 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
644 647
645 erase_request(&mdsc->request_tree, req); 648 erase_request(&mdsc->request_tree, req);
646 649
647 if (req->r_unsafe_dir && req->r_got_unsafe) { 650 if (req->r_unsafe_dir &&
651 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
648 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 652 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
649 spin_lock(&ci->i_unsafe_lock); 653 spin_lock(&ci->i_unsafe_lock);
650 list_del_init(&req->r_unsafe_dir_item); 654 list_del_init(&req->r_unsafe_dir_item);
651 spin_unlock(&ci->i_unsafe_lock); 655 spin_unlock(&ci->i_unsafe_lock);
652 } 656 }
653 if (req->r_target_inode && req->r_got_unsafe) { 657 if (req->r_target_inode &&
658 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
654 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 659 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
655 spin_lock(&ci->i_unsafe_lock); 660 spin_lock(&ci->i_unsafe_lock);
656 list_del_init(&req->r_unsafe_target_item); 661 list_del_init(&req->r_unsafe_target_item);
@@ -668,6 +673,28 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
668} 673}
669 674
670/* 675/*
676 * Walk back up the dentry tree until we hit a dentry representing a
677 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
678 * when calling this) to ensure that the objects won't disappear while we're
679 * working with them. Once we hit a candidate dentry, we attempt to take a
680 * reference to it, and return that as the result.
681 */
682static struct inode *get_nonsnap_parent(struct dentry *dentry)
683{
684 struct inode *inode = NULL;
685
686 while (dentry && !IS_ROOT(dentry)) {
687 inode = d_inode_rcu(dentry);
688 if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
689 break;
690 dentry = dentry->d_parent;
691 }
692 if (inode)
693 inode = igrab(inode);
694 return inode;
695}
696
697/*
671 * Choose mds to send request to next. If there is a hint set in the 698 * Choose mds to send request to next. If there is a hint set in the
672 * request (e.g., due to a prior forward hint from the mds), use that. 699 * request (e.g., due to a prior forward hint from the mds), use that.
673 * Otherwise, consult frag tree and/or caps to identify the 700 * Otherwise, consult frag tree and/or caps to identify the
@@ -675,19 +702,6 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
675 * 702 *
676 * Called under mdsc->mutex. 703 * Called under mdsc->mutex.
677 */ 704 */
678static struct dentry *get_nonsnap_parent(struct dentry *dentry)
679{
680 /*
681 * we don't need to worry about protecting the d_parent access
682 * here because we never renaming inside the snapped namespace
683 * except to resplice to another snapdir, and either the old or new
684 * result is a valid result.
685 */
686 while (!IS_ROOT(dentry) && ceph_snap(d_inode(dentry)) != CEPH_NOSNAP)
687 dentry = dentry->d_parent;
688 return dentry;
689}
690
691static int __choose_mds(struct ceph_mds_client *mdsc, 705static int __choose_mds(struct ceph_mds_client *mdsc,
692 struct ceph_mds_request *req) 706 struct ceph_mds_request *req)
693{ 707{
@@ -697,7 +711,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
697 int mode = req->r_direct_mode; 711 int mode = req->r_direct_mode;
698 int mds = -1; 712 int mds = -1;
699 u32 hash = req->r_direct_hash; 713 u32 hash = req->r_direct_hash;
700 bool is_hash = req->r_direct_is_hash; 714 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
701 715
702 /* 716 /*
703 * is there a specific mds we should try? ignore hint if we have 717 * is there a specific mds we should try? ignore hint if we have
@@ -717,30 +731,39 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
717 inode = NULL; 731 inode = NULL;
718 if (req->r_inode) { 732 if (req->r_inode) {
719 inode = req->r_inode; 733 inode = req->r_inode;
734 ihold(inode);
720 } else if (req->r_dentry) { 735 } else if (req->r_dentry) {
721 /* ignore race with rename; old or new d_parent is okay */ 736 /* ignore race with rename; old or new d_parent is okay */
722 struct dentry *parent = req->r_dentry->d_parent; 737 struct dentry *parent;
723 struct inode *dir = d_inode(parent); 738 struct inode *dir;
739
740 rcu_read_lock();
741 parent = req->r_dentry->d_parent;
742 dir = req->r_parent ? : d_inode_rcu(parent);
724 743
725 if (dir->i_sb != mdsc->fsc->sb) { 744 if (!dir || dir->i_sb != mdsc->fsc->sb) {
726 /* not this fs! */ 745 /* not this fs or parent went negative */
727 inode = d_inode(req->r_dentry); 746 inode = d_inode(req->r_dentry);
747 if (inode)
748 ihold(inode);
728 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 749 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
729 /* direct snapped/virtual snapdir requests 750 /* direct snapped/virtual snapdir requests
730 * based on parent dir inode */ 751 * based on parent dir inode */
731 struct dentry *dn = get_nonsnap_parent(parent); 752 inode = get_nonsnap_parent(parent);
732 inode = d_inode(dn);
733 dout("__choose_mds using nonsnap parent %p\n", inode); 753 dout("__choose_mds using nonsnap parent %p\n", inode);
734 } else { 754 } else {
735 /* dentry target */ 755 /* dentry target */
736 inode = d_inode(req->r_dentry); 756 inode = d_inode(req->r_dentry);
737 if (!inode || mode == USE_AUTH_MDS) { 757 if (!inode || mode == USE_AUTH_MDS) {
738 /* dir + name */ 758 /* dir + name */
739 inode = dir; 759 inode = igrab(dir);
740 hash = ceph_dentry_hash(dir, req->r_dentry); 760 hash = ceph_dentry_hash(dir, req->r_dentry);
741 is_hash = true; 761 is_hash = true;
762 } else {
763 ihold(inode);
742 } 764 }
743 } 765 }
766 rcu_read_unlock();
744 } 767 }
745 768
746 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash, 769 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
@@ -769,7 +792,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
769 (int)r, frag.ndist); 792 (int)r, frag.ndist);
770 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 793 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
771 CEPH_MDS_STATE_ACTIVE) 794 CEPH_MDS_STATE_ACTIVE)
772 return mds; 795 goto out;
773 } 796 }
774 797
775 /* since this file/dir wasn't known to be 798 /* since this file/dir wasn't known to be
@@ -784,7 +807,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
784 inode, ceph_vinop(inode), frag.frag, mds); 807 inode, ceph_vinop(inode), frag.frag, mds);
785 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 808 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
786 CEPH_MDS_STATE_ACTIVE) 809 CEPH_MDS_STATE_ACTIVE)
787 return mds; 810 goto out;
788 } 811 }
789 } 812 }
790 } 813 }
@@ -797,6 +820,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
797 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 820 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
798 if (!cap) { 821 if (!cap) {
799 spin_unlock(&ci->i_ceph_lock); 822 spin_unlock(&ci->i_ceph_lock);
823 iput(inode);
800 goto random; 824 goto random;
801 } 825 }
802 mds = cap->session->s_mds; 826 mds = cap->session->s_mds;
@@ -804,6 +828,8 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
804 inode, ceph_vinop(inode), mds, 828 inode, ceph_vinop(inode), mds,
805 cap == ci->i_auth_cap ? "auth " : "", cap); 829 cap == ci->i_auth_cap ? "auth " : "", cap);
806 spin_unlock(&ci->i_ceph_lock); 830 spin_unlock(&ci->i_ceph_lock);
831out:
832 iput(inode);
807 return mds; 833 return mds;
808 834
809random: 835random:
@@ -1036,7 +1062,6 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1036 while (!list_empty(&session->s_unsafe)) { 1062 while (!list_empty(&session->s_unsafe)) {
1037 req = list_first_entry(&session->s_unsafe, 1063 req = list_first_entry(&session->s_unsafe,
1038 struct ceph_mds_request, r_unsafe_item); 1064 struct ceph_mds_request, r_unsafe_item);
1039 list_del_init(&req->r_unsafe_item);
1040 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1065 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1041 req->r_tid); 1066 req->r_tid);
1042 __unregister_request(mdsc, req); 1067 __unregister_request(mdsc, req);
@@ -1146,7 +1171,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1146 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED; 1171 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1147 1172
1148 if (ci->i_wrbuffer_ref > 0 && 1173 if (ci->i_wrbuffer_ref > 0 &&
1149 ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 1174 READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
1150 invalidate = true; 1175 invalidate = true;
1151 1176
1152 while (!list_empty(&ci->i_cap_flush_list)) { 1177 while (!list_empty(&ci->i_cap_flush_list)) {
@@ -1775,18 +1800,23 @@ retry:
1775 return path; 1800 return path;
1776} 1801}
1777 1802
1778static int build_dentry_path(struct dentry *dentry, 1803static int build_dentry_path(struct dentry *dentry, struct inode *dir,
1779 const char **ppath, int *ppathlen, u64 *pino, 1804 const char **ppath, int *ppathlen, u64 *pino,
1780 int *pfreepath) 1805 int *pfreepath)
1781{ 1806{
1782 char *path; 1807 char *path;
1783 1808
1784 if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_NOSNAP) { 1809 rcu_read_lock();
1785 *pino = ceph_ino(d_inode(dentry->d_parent)); 1810 if (!dir)
1811 dir = d_inode_rcu(dentry->d_parent);
1812 if (dir && ceph_snap(dir) == CEPH_NOSNAP) {
1813 *pino = ceph_ino(dir);
1814 rcu_read_unlock();
1786 *ppath = dentry->d_name.name; 1815 *ppath = dentry->d_name.name;
1787 *ppathlen = dentry->d_name.len; 1816 *ppathlen = dentry->d_name.len;
1788 return 0; 1817 return 0;
1789 } 1818 }
1819 rcu_read_unlock();
1790 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 1820 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1791 if (IS_ERR(path)) 1821 if (IS_ERR(path))
1792 return PTR_ERR(path); 1822 return PTR_ERR(path);
@@ -1822,8 +1852,8 @@ static int build_inode_path(struct inode *inode,
1822 * an explicit ino+path. 1852 * an explicit ino+path.
1823 */ 1853 */
1824static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 1854static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1825 const char *rpath, u64 rino, 1855 struct inode *rdiri, const char *rpath,
1826 const char **ppath, int *pathlen, 1856 u64 rino, const char **ppath, int *pathlen,
1827 u64 *ino, int *freepath) 1857 u64 *ino, int *freepath)
1828{ 1858{
1829 int r = 0; 1859 int r = 0;
@@ -1833,7 +1863,8 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1833 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 1863 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1834 ceph_snap(rinode)); 1864 ceph_snap(rinode));
1835 } else if (rdentry) { 1865 } else if (rdentry) {
1836 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath); 1866 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
1867 freepath);
1837 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 1868 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1838 *ppath); 1869 *ppath);
1839 } else if (rpath || rino) { 1870 } else if (rpath || rino) {
@@ -1866,7 +1897,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1866 int ret; 1897 int ret;
1867 1898
1868 ret = set_request_path_attr(req->r_inode, req->r_dentry, 1899 ret = set_request_path_attr(req->r_inode, req->r_dentry,
1869 req->r_path1, req->r_ino1.ino, 1900 req->r_parent, req->r_path1, req->r_ino1.ino,
1870 &path1, &pathlen1, &ino1, &freepath1); 1901 &path1, &pathlen1, &ino1, &freepath1);
1871 if (ret < 0) { 1902 if (ret < 0) {
1872 msg = ERR_PTR(ret); 1903 msg = ERR_PTR(ret);
@@ -1874,6 +1905,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1874 } 1905 }
1875 1906
1876 ret = set_request_path_attr(NULL, req->r_old_dentry, 1907 ret = set_request_path_attr(NULL, req->r_old_dentry,
1908 req->r_old_dentry_dir,
1877 req->r_path2, req->r_ino2.ino, 1909 req->r_path2, req->r_ino2.ino,
1878 &path2, &pathlen2, &ino2, &freepath2); 1910 &path2, &pathlen2, &ino2, &freepath2);
1879 if (ret < 0) { 1911 if (ret < 0) {
@@ -1927,10 +1959,13 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1927 mds, req->r_inode_drop, req->r_inode_unless, 0); 1959 mds, req->r_inode_drop, req->r_inode_unless, 0);
1928 if (req->r_dentry_drop) 1960 if (req->r_dentry_drop)
1929 releases += ceph_encode_dentry_release(&p, req->r_dentry, 1961 releases += ceph_encode_dentry_release(&p, req->r_dentry,
1930 mds, req->r_dentry_drop, req->r_dentry_unless); 1962 req->r_parent, mds, req->r_dentry_drop,
1963 req->r_dentry_unless);
1931 if (req->r_old_dentry_drop) 1964 if (req->r_old_dentry_drop)
1932 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 1965 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1933 mds, req->r_old_dentry_drop, req->r_old_dentry_unless); 1966 req->r_old_dentry_dir, mds,
1967 req->r_old_dentry_drop,
1968 req->r_old_dentry_unless);
1934 if (req->r_old_inode_drop) 1969 if (req->r_old_inode_drop)
1935 releases += ceph_encode_inode_release(&p, 1970 releases += ceph_encode_inode_release(&p,
1936 d_inode(req->r_old_dentry), 1971 d_inode(req->r_old_dentry),
@@ -2012,7 +2047,7 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
2012 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2047 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2013 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2048 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2014 2049
2015 if (req->r_got_unsafe) { 2050 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2016 void *p; 2051 void *p;
2017 /* 2052 /*
2018 * Replay. Do not regenerate message (and rebuild 2053 * Replay. Do not regenerate message (and rebuild
@@ -2061,16 +2096,16 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
2061 2096
2062 rhead = msg->front.iov_base; 2097 rhead = msg->front.iov_base;
2063 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2098 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2064 if (req->r_got_unsafe) 2099 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2065 flags |= CEPH_MDS_FLAG_REPLAY; 2100 flags |= CEPH_MDS_FLAG_REPLAY;
2066 if (req->r_locked_dir) 2101 if (req->r_parent)
2067 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2102 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2068 rhead->flags = cpu_to_le32(flags); 2103 rhead->flags = cpu_to_le32(flags);
2069 rhead->num_fwd = req->r_num_fwd; 2104 rhead->num_fwd = req->r_num_fwd;
2070 rhead->num_retry = req->r_attempts - 1; 2105 rhead->num_retry = req->r_attempts - 1;
2071 rhead->ino = 0; 2106 rhead->ino = 0;
2072 2107
2073 dout(" r_locked_dir = %p\n", req->r_locked_dir); 2108 dout(" r_parent = %p\n", req->r_parent);
2074 return 0; 2109 return 0;
2075} 2110}
2076 2111
@@ -2084,8 +2119,8 @@ static int __do_request(struct ceph_mds_client *mdsc,
2084 int mds = -1; 2119 int mds = -1;
2085 int err = 0; 2120 int err = 0;
2086 2121
2087 if (req->r_err || req->r_got_result) { 2122 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2088 if (req->r_aborted) 2123 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2089 __unregister_request(mdsc, req); 2124 __unregister_request(mdsc, req);
2090 goto out; 2125 goto out;
2091 } 2126 }
@@ -2096,12 +2131,12 @@ static int __do_request(struct ceph_mds_client *mdsc,
2096 err = -EIO; 2131 err = -EIO;
2097 goto finish; 2132 goto finish;
2098 } 2133 }
2099 if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2134 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2100 dout("do_request forced umount\n"); 2135 dout("do_request forced umount\n");
2101 err = -EIO; 2136 err = -EIO;
2102 goto finish; 2137 goto finish;
2103 } 2138 }
2104 if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2139 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2105 if (mdsc->mdsmap_err) { 2140 if (mdsc->mdsmap_err) {
2106 err = mdsc->mdsmap_err; 2141 err = mdsc->mdsmap_err;
2107 dout("do_request mdsmap err %d\n", err); 2142 dout("do_request mdsmap err %d\n", err);
@@ -2215,7 +2250,7 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2215 while (p) { 2250 while (p) {
2216 req = rb_entry(p, struct ceph_mds_request, r_node); 2251 req = rb_entry(p, struct ceph_mds_request, r_node);
2217 p = rb_next(p); 2252 p = rb_next(p);
2218 if (req->r_got_unsafe) 2253 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2219 continue; 2254 continue;
2220 if (req->r_attempts > 0) 2255 if (req->r_attempts > 0)
2221 continue; /* only new requests */ 2256 continue; /* only new requests */
@@ -2250,11 +2285,11 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2250 2285
2251 dout("do_request on %p\n", req); 2286 dout("do_request on %p\n", req);
2252 2287
2253 /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */ 2288 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2254 if (req->r_inode) 2289 if (req->r_inode)
2255 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2290 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2256 if (req->r_locked_dir) 2291 if (req->r_parent)
2257 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 2292 ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
2258 if (req->r_old_dentry_dir) 2293 if (req->r_old_dentry_dir)
2259 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2294 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2260 CEPH_CAP_PIN); 2295 CEPH_CAP_PIN);
@@ -2289,7 +2324,7 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2289 mutex_lock(&mdsc->mutex); 2324 mutex_lock(&mdsc->mutex);
2290 2325
2291 /* only abort if we didn't race with a real reply */ 2326 /* only abort if we didn't race with a real reply */
2292 if (req->r_got_result) { 2327 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2293 err = le32_to_cpu(req->r_reply_info.head->result); 2328 err = le32_to_cpu(req->r_reply_info.head->result);
2294 } else if (err < 0) { 2329 } else if (err < 0) {
2295 dout("aborted request %lld with %d\n", req->r_tid, err); 2330 dout("aborted request %lld with %d\n", req->r_tid, err);
@@ -2301,10 +2336,10 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2301 */ 2336 */
2302 mutex_lock(&req->r_fill_mutex); 2337 mutex_lock(&req->r_fill_mutex);
2303 req->r_err = err; 2338 req->r_err = err;
2304 req->r_aborted = true; 2339 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
2305 mutex_unlock(&req->r_fill_mutex); 2340 mutex_unlock(&req->r_fill_mutex);
2306 2341
2307 if (req->r_locked_dir && 2342 if (req->r_parent &&
2308 (req->r_op & CEPH_MDS_OP_WRITE)) 2343 (req->r_op & CEPH_MDS_OP_WRITE))
2309 ceph_invalidate_dir_request(req); 2344 ceph_invalidate_dir_request(req);
2310 } else { 2345 } else {
@@ -2323,7 +2358,7 @@ out:
2323 */ 2358 */
2324void ceph_invalidate_dir_request(struct ceph_mds_request *req) 2359void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2325{ 2360{
2326 struct inode *inode = req->r_locked_dir; 2361 struct inode *inode = req->r_parent;
2327 2362
2328 dout("invalidate_dir_request %p (complete, lease(s))\n", inode); 2363 dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
2329 2364
@@ -2379,14 +2414,14 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2379 } 2414 }
2380 2415
2381 /* dup? */ 2416 /* dup? */
2382 if ((req->r_got_unsafe && !head->safe) || 2417 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
2383 (req->r_got_safe && head->safe)) { 2418 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
2384 pr_warn("got a dup %s reply on %llu from mds%d\n", 2419 pr_warn("got a dup %s reply on %llu from mds%d\n",
2385 head->safe ? "safe" : "unsafe", tid, mds); 2420 head->safe ? "safe" : "unsafe", tid, mds);
2386 mutex_unlock(&mdsc->mutex); 2421 mutex_unlock(&mdsc->mutex);
2387 goto out; 2422 goto out;
2388 } 2423 }
2389 if (req->r_got_safe) { 2424 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
2390 pr_warn("got unsafe after safe on %llu from mds%d\n", 2425 pr_warn("got unsafe after safe on %llu from mds%d\n",
2391 tid, mds); 2426 tid, mds);
2392 mutex_unlock(&mdsc->mutex); 2427 mutex_unlock(&mdsc->mutex);
@@ -2425,10 +2460,10 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2425 2460
2426 2461
2427 if (head->safe) { 2462 if (head->safe) {
2428 req->r_got_safe = true; 2463 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
2429 __unregister_request(mdsc, req); 2464 __unregister_request(mdsc, req);
2430 2465
2431 if (req->r_got_unsafe) { 2466 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2432 /* 2467 /*
2433 * We already handled the unsafe response, now do the 2468 * We already handled the unsafe response, now do the
2434 * cleanup. No need to examine the response; the MDS 2469 * cleanup. No need to examine the response; the MDS
@@ -2437,7 +2472,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2437 * useful we could do with a revised return value. 2472 * useful we could do with a revised return value.
2438 */ 2473 */
2439 dout("got safe reply %llu, mds%d\n", tid, mds); 2474 dout("got safe reply %llu, mds%d\n", tid, mds);
2440 list_del_init(&req->r_unsafe_item);
2441 2475
2442 /* last unsafe request during umount? */ 2476 /* last unsafe request during umount? */
2443 if (mdsc->stopping && !__get_oldest_req(mdsc)) 2477 if (mdsc->stopping && !__get_oldest_req(mdsc))
@@ -2446,7 +2480,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2446 goto out; 2480 goto out;
2447 } 2481 }
2448 } else { 2482 } else {
2449 req->r_got_unsafe = true; 2483 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
2450 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 2484 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2451 if (req->r_unsafe_dir) { 2485 if (req->r_unsafe_dir) {
2452 struct ceph_inode_info *ci = 2486 struct ceph_inode_info *ci =
@@ -2486,7 +2520,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2486 /* insert trace into our cache */ 2520 /* insert trace into our cache */
2487 mutex_lock(&req->r_fill_mutex); 2521 mutex_lock(&req->r_fill_mutex);
2488 current->journal_info = req; 2522 current->journal_info = req;
2489 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); 2523 err = ceph_fill_trace(mdsc->fsc->sb, req);
2490 if (err == 0) { 2524 if (err == 0) {
2491 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 2525 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2492 req->r_op == CEPH_MDS_OP_LSSNAP)) 2526 req->r_op == CEPH_MDS_OP_LSSNAP))
@@ -2500,7 +2534,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2500 if (realm) 2534 if (realm)
2501 ceph_put_snap_realm(mdsc, realm); 2535 ceph_put_snap_realm(mdsc, realm);
2502 2536
2503 if (err == 0 && req->r_got_unsafe && req->r_target_inode) { 2537 if (err == 0 && req->r_target_inode &&
2538 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2504 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 2539 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
2505 spin_lock(&ci->i_unsafe_lock); 2540 spin_lock(&ci->i_unsafe_lock);
2506 list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops); 2541 list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
@@ -2508,12 +2543,12 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2508 } 2543 }
2509out_err: 2544out_err:
2510 mutex_lock(&mdsc->mutex); 2545 mutex_lock(&mdsc->mutex);
2511 if (!req->r_aborted) { 2546 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2512 if (err) { 2547 if (err) {
2513 req->r_err = err; 2548 req->r_err = err;
2514 } else { 2549 } else {
2515 req->r_reply = ceph_msg_get(msg); 2550 req->r_reply = ceph_msg_get(msg);
2516 req->r_got_result = true; 2551 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
2517 } 2552 }
2518 } else { 2553 } else {
2519 dout("reply arrived after request %lld was aborted\n", tid); 2554 dout("reply arrived after request %lld was aborted\n", tid);
@@ -2557,7 +2592,7 @@ static void handle_forward(struct ceph_mds_client *mdsc,
2557 goto out; /* dup reply? */ 2592 goto out; /* dup reply? */
2558 } 2593 }
2559 2594
2560 if (req->r_aborted) { 2595 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2561 dout("forward tid %llu aborted, unregistering\n", tid); 2596 dout("forward tid %llu aborted, unregistering\n", tid);
2562 __unregister_request(mdsc, req); 2597 __unregister_request(mdsc, req);
2563 } else if (fwd_seq <= req->r_num_fwd) { 2598 } else if (fwd_seq <= req->r_num_fwd) {
@@ -2567,7 +2602,7 @@ static void handle_forward(struct ceph_mds_client *mdsc,
2567 /* resend. forward race not possible; mds would drop */ 2602 /* resend. forward race not possible; mds would drop */
2568 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 2603 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2569 BUG_ON(req->r_err); 2604 BUG_ON(req->r_err);
2570 BUG_ON(req->r_got_result); 2605 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
2571 req->r_attempts = 0; 2606 req->r_attempts = 0;
2572 req->r_num_fwd = fwd_seq; 2607 req->r_num_fwd = fwd_seq;
2573 req->r_resend_mds = next_mds; 2608 req->r_resend_mds = next_mds;
@@ -2732,7 +2767,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2732 while (p) { 2767 while (p) {
2733 req = rb_entry(p, struct ceph_mds_request, r_node); 2768 req = rb_entry(p, struct ceph_mds_request, r_node);
2734 p = rb_next(p); 2769 p = rb_next(p);
2735 if (req->r_got_unsafe) 2770 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2736 continue; 2771 continue;
2737 if (req->r_attempts == 0) 2772 if (req->r_attempts == 0)
2738 continue; /* only old requests */ 2773 continue; /* only old requests */
@@ -3556,7 +3591,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3556{ 3591{
3557 u64 want_tid, want_flush; 3592 u64 want_tid, want_flush;
3558 3593
3559 if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 3594 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3560 return; 3595 return;
3561 3596
3562 dout("sync\n"); 3597 dout("sync\n");
@@ -3587,7 +3622,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3587 */ 3622 */
3588static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 3623static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
3589{ 3624{
3590 if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 3625 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3591 return true; 3626 return true;
3592 return atomic_read(&mdsc->num_sessions) <= skipped; 3627 return atomic_read(&mdsc->num_sessions) <= skipped;
3593} 3628}
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 3c6f77b7bb02..ac0475a2daa7 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -202,9 +202,18 @@ struct ceph_mds_request {
202 char *r_path1, *r_path2; 202 char *r_path1, *r_path2;
203 struct ceph_vino r_ino1, r_ino2; 203 struct ceph_vino r_ino1, r_ino2;
204 204
205 struct inode *r_locked_dir; /* dir (if any) i_mutex locked by vfs */ 205 struct inode *r_parent; /* parent dir inode */
206 struct inode *r_target_inode; /* resulting inode */ 206 struct inode *r_target_inode; /* resulting inode */
207 207
208#define CEPH_MDS_R_DIRECT_IS_HASH (1) /* r_direct_hash is valid */
209#define CEPH_MDS_R_ABORTED (2) /* call was aborted */
210#define CEPH_MDS_R_GOT_UNSAFE (3) /* got an unsafe reply */
211#define CEPH_MDS_R_GOT_SAFE (4) /* got a safe reply */
212#define CEPH_MDS_R_GOT_RESULT (5) /* got a result */
213#define CEPH_MDS_R_DID_PREPOPULATE (6) /* prepopulated readdir */
214#define CEPH_MDS_R_PARENT_LOCKED (7) /* is r_parent->i_rwsem wlocked? */
215 unsigned long r_req_flags;
216
208 struct mutex r_fill_mutex; 217 struct mutex r_fill_mutex;
209 218
210 union ceph_mds_request_args r_args; 219 union ceph_mds_request_args r_args;
@@ -216,7 +225,6 @@ struct ceph_mds_request {
216 /* for choosing which mds to send this request to */ 225 /* for choosing which mds to send this request to */
217 int r_direct_mode; 226 int r_direct_mode;
218 u32 r_direct_hash; /* choose dir frag based on this dentry hash */ 227 u32 r_direct_hash; /* choose dir frag based on this dentry hash */
219 bool r_direct_is_hash; /* true if r_direct_hash is valid */
220 228
221 /* data payload is used for xattr ops */ 229 /* data payload is used for xattr ops */
222 struct ceph_pagelist *r_pagelist; 230 struct ceph_pagelist *r_pagelist;
@@ -234,7 +242,6 @@ struct ceph_mds_request {
234 struct ceph_mds_reply_info_parsed r_reply_info; 242 struct ceph_mds_reply_info_parsed r_reply_info;
235 struct page *r_locked_page; 243 struct page *r_locked_page;
236 int r_err; 244 int r_err;
237 bool r_aborted;
238 245
239 unsigned long r_timeout; /* optional. jiffies, 0 is "wait forever" */ 246 unsigned long r_timeout; /* optional. jiffies, 0 is "wait forever" */
240 unsigned long r_started; /* start time to measure timeout against */ 247 unsigned long r_started; /* start time to measure timeout against */
@@ -262,9 +269,7 @@ struct ceph_mds_request {
262 ceph_mds_request_callback_t r_callback; 269 ceph_mds_request_callback_t r_callback;
263 ceph_mds_request_wait_callback_t r_wait_for_completion; 270 ceph_mds_request_wait_callback_t r_wait_for_completion;
264 struct list_head r_unsafe_item; /* per-session unsafe list item */ 271 struct list_head r_unsafe_item; /* per-session unsafe list item */
265 bool r_got_unsafe, r_got_safe, r_got_result;
266 272
267 bool r_did_prepopulate;
268 long long r_dir_release_cnt; 273 long long r_dir_release_cnt;
269 long long r_dir_ordered_cnt; 274 long long r_dir_ordered_cnt;
270 int r_readdir_cache_idx; 275 int r_readdir_cache_idx;
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 6bd20d707bfd..0ec8d0114e57 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -757,7 +757,6 @@ static const struct super_operations ceph_super_ops = {
757 .destroy_inode = ceph_destroy_inode, 757 .destroy_inode = ceph_destroy_inode,
758 .write_inode = ceph_write_inode, 758 .write_inode = ceph_write_inode,
759 .drop_inode = ceph_drop_inode, 759 .drop_inode = ceph_drop_inode,
760 .evict_inode = ceph_evict_inode,
761 .sync_fs = ceph_sync_fs, 760 .sync_fs = ceph_sync_fs,
762 .put_super = ceph_put_super, 761 .put_super = ceph_put_super,
763 .show_options = ceph_show_options, 762 .show_options = ceph_show_options,
@@ -952,6 +951,14 @@ static int ceph_register_bdi(struct super_block *sb,
952 fsc->backing_dev_info.ra_pages = 951 fsc->backing_dev_info.ra_pages =
953 VM_MAX_READAHEAD * 1024 / PAGE_SIZE; 952 VM_MAX_READAHEAD * 1024 / PAGE_SIZE;
954 953
954 if (fsc->mount_options->rsize > fsc->mount_options->rasize &&
955 fsc->mount_options->rsize >= PAGE_SIZE)
956 fsc->backing_dev_info.io_pages =
957 (fsc->mount_options->rsize + PAGE_SIZE - 1)
958 >> PAGE_SHIFT;
959 else if (fsc->mount_options->rsize == 0)
960 fsc->backing_dev_info.io_pages = ULONG_MAX;
961
955 err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%ld", 962 err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%ld",
956 atomic_long_inc_return(&bdi_seq)); 963 atomic_long_inc_return(&bdi_seq));
957 if (!err) 964 if (!err)
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 3373b61faefd..e9410bcf4113 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -45,8 +45,8 @@
45#define ceph_test_mount_opt(fsc, opt) \ 45#define ceph_test_mount_opt(fsc, opt) \
46 (!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt)) 46 (!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt))
47 47
48#define CEPH_RSIZE_DEFAULT 0 /* max read size */ 48#define CEPH_RSIZE_DEFAULT (64*1024*1024) /* max read size */
49#define CEPH_RASIZE_DEFAULT (8192*1024) /* readahead */ 49#define CEPH_RASIZE_DEFAULT (8192*1024) /* max readahead */
50#define CEPH_MAX_READDIR_DEFAULT 1024 50#define CEPH_MAX_READDIR_DEFAULT 1024
51#define CEPH_MAX_READDIR_BYTES_DEFAULT (512*1024) 51#define CEPH_MAX_READDIR_BYTES_DEFAULT (512*1024)
52#define CEPH_SNAPDIRNAME_DEFAULT ".snap" 52#define CEPH_SNAPDIRNAME_DEFAULT ".snap"
@@ -343,7 +343,6 @@ struct ceph_inode_info {
343 u32 i_rdcache_gen; /* incremented each time we get FILE_CACHE. */ 343 u32 i_rdcache_gen; /* incremented each time we get FILE_CACHE. */
344 u32 i_rdcache_revoking; /* RDCACHE gen to async invalidate, if any */ 344 u32 i_rdcache_revoking; /* RDCACHE gen to async invalidate, if any */
345 345
346 struct list_head i_unsafe_writes; /* uncommitted sync writes */
347 struct list_head i_unsafe_dirops; /* uncommitted mds dir ops */ 346 struct list_head i_unsafe_dirops; /* uncommitted mds dir ops */
348 struct list_head i_unsafe_iops; /* uncommitted mds inode ops */ 347 struct list_head i_unsafe_iops; /* uncommitted mds inode ops */
349 spinlock_t i_unsafe_lock; 348 spinlock_t i_unsafe_lock;
@@ -602,7 +601,7 @@ static inline int __ceph_caps_wanted(struct ceph_inode_info *ci)
602} 601}
603 602
604/* what the mds thinks we want */ 603/* what the mds thinks we want */
605extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci); 604extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check);
606 605
607extern void ceph_caps_init(struct ceph_mds_client *mdsc); 606extern void ceph_caps_init(struct ceph_mds_client *mdsc);
608extern void ceph_caps_finalize(struct ceph_mds_client *mdsc); 607extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
@@ -753,7 +752,6 @@ extern const struct inode_operations ceph_file_iops;
753extern struct inode *ceph_alloc_inode(struct super_block *sb); 752extern struct inode *ceph_alloc_inode(struct super_block *sb);
754extern void ceph_destroy_inode(struct inode *inode); 753extern void ceph_destroy_inode(struct inode *inode);
755extern int ceph_drop_inode(struct inode *inode); 754extern int ceph_drop_inode(struct inode *inode);
756extern void ceph_evict_inode(struct inode *inode);
757 755
758extern struct inode *ceph_get_inode(struct super_block *sb, 756extern struct inode *ceph_get_inode(struct super_block *sb,
759 struct ceph_vino vino); 757 struct ceph_vino vino);
@@ -764,8 +762,7 @@ extern void ceph_fill_file_time(struct inode *inode, int issued,
764 u64 time_warp_seq, struct timespec *ctime, 762 u64 time_warp_seq, struct timespec *ctime,
765 struct timespec *mtime, struct timespec *atime); 763 struct timespec *mtime, struct timespec *atime);
766extern int ceph_fill_trace(struct super_block *sb, 764extern int ceph_fill_trace(struct super_block *sb,
767 struct ceph_mds_request *req, 765 struct ceph_mds_request *req);
768 struct ceph_mds_session *session);
769extern int ceph_readdir_prepopulate(struct ceph_mds_request *req, 766extern int ceph_readdir_prepopulate(struct ceph_mds_request *req,
770 struct ceph_mds_session *session); 767 struct ceph_mds_session *session);
771 768
@@ -904,6 +901,7 @@ extern void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc);
904extern int ceph_encode_inode_release(void **p, struct inode *inode, 901extern int ceph_encode_inode_release(void **p, struct inode *inode,
905 int mds, int drop, int unless, int force); 902 int mds, int drop, int unless, int force);
906extern int ceph_encode_dentry_release(void **p, struct dentry *dn, 903extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
904 struct inode *dir,
907 int mds, int drop, int unless); 905 int mds, int drop, int unless);
908 906
909extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, 907extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
@@ -933,7 +931,7 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
933extern int ceph_release(struct inode *inode, struct file *filp); 931extern int ceph_release(struct inode *inode, struct file *filp);
934extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, 932extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
935 char *data, size_t len); 933 char *data, size_t len);
936extern void ceph_sync_write_wait(struct inode *inode); 934
937/* dir.c */ 935/* dir.c */
938extern const struct file_operations ceph_dir_fops; 936extern const struct file_operations ceph_dir_fops;
939extern const struct file_operations ceph_snapdir_fops; 937extern const struct file_operations ceph_snapdir_fops;
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 03a6653d329a..2ea0c282f3dc 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -22,7 +22,6 @@ struct ceph_osd_client;
22 * completion callback for async writepages 22 * completion callback for async writepages
23 */ 23 */
24typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *); 24typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
25typedef void (*ceph_osdc_unsafe_callback_t)(struct ceph_osd_request *, bool);
26 25
27#define CEPH_HOMELESS_OSD -1 26#define CEPH_HOMELESS_OSD -1
28 27
@@ -170,15 +169,12 @@ struct ceph_osd_request {
170 unsigned int r_num_ops; 169 unsigned int r_num_ops;
171 170
172 int r_result; 171 int r_result;
173 bool r_got_reply;
174 172
175 struct ceph_osd_client *r_osdc; 173 struct ceph_osd_client *r_osdc;
176 struct kref r_kref; 174 struct kref r_kref;
177 bool r_mempool; 175 bool r_mempool;
178 struct completion r_completion; 176 struct completion r_completion; /* private to osd_client.c */
179 struct completion r_done_completion; /* fsync waiter */
180 ceph_osdc_callback_t r_callback; 177 ceph_osdc_callback_t r_callback;
181 ceph_osdc_unsafe_callback_t r_unsafe_callback;
182 struct list_head r_unsafe_item; 178 struct list_head r_unsafe_item;
183 179
184 struct inode *r_inode; /* for use by callbacks */ 180 struct inode *r_inode; /* for use by callbacks */
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index 9a9041784dcf..938656f70807 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -57,7 +57,7 @@ static inline bool ceph_can_shift_osds(struct ceph_pg_pool_info *pool)
57 case CEPH_POOL_TYPE_EC: 57 case CEPH_POOL_TYPE_EC:
58 return false; 58 return false;
59 default: 59 default:
60 BUG_ON(1); 60 BUG();
61 } 61 }
62} 62}
63 63
@@ -82,13 +82,6 @@ void ceph_oloc_copy(struct ceph_object_locator *dest,
82void ceph_oloc_destroy(struct ceph_object_locator *oloc); 82void ceph_oloc_destroy(struct ceph_object_locator *oloc);
83 83
84/* 84/*
85 * Maximum supported by kernel client object name length
86 *
87 * (probably outdated: must be >= RBD_MAX_MD_NAME_LEN -- currently 100)
88 */
89#define CEPH_MAX_OID_NAME_LEN 100
90
91/*
92 * 51-char inline_name is long enough for all cephfs and all but one 85 * 51-char inline_name is long enough for all cephfs and all but one
93 * rbd requests: <imgname> in "<imgname>.rbd"/"rbd_id.<imgname>" can be 86 * rbd requests: <imgname> in "<imgname>.rbd"/"rbd_id.<imgname>" can be
94 * arbitrarily long (~PAGE_SIZE). It's done once during rbd map; all 87 * arbitrarily long (~PAGE_SIZE). It's done once during rbd map; all
@@ -173,8 +166,8 @@ struct ceph_osdmap {
173 * the list of osds that store+replicate them. */ 166 * the list of osds that store+replicate them. */
174 struct crush_map *crush; 167 struct crush_map *crush;
175 168
176 struct mutex crush_scratch_mutex; 169 struct mutex crush_workspace_mutex;
177 int crush_scratch_ary[CEPH_PG_MAX_SIZE * 3]; 170 void *crush_workspace;
178}; 171};
179 172
180static inline bool ceph_osd_exists(struct ceph_osdmap *map, int osd) 173static inline bool ceph_osd_exists(struct ceph_osdmap *map, int osd)
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index 5c0da61cb763..5d0018782d50 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -50,7 +50,7 @@ struct ceph_timespec {
50#define CEPH_PG_LAYOUT_LINEAR 2 50#define CEPH_PG_LAYOUT_LINEAR 2
51#define CEPH_PG_LAYOUT_HYBRID 3 51#define CEPH_PG_LAYOUT_HYBRID 3
52 52
53#define CEPH_PG_MAX_SIZE 16 /* max # osds in a single pg */ 53#define CEPH_PG_MAX_SIZE 32 /* max # osds in a single pg */
54 54
55/* 55/*
56 * placement group. 56 * placement group.
diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h
index be8f12b8f195..fbecbd089d75 100644
--- a/include/linux/crush/crush.h
+++ b/include/linux/crush/crush.h
@@ -135,13 +135,6 @@ struct crush_bucket {
135 __u32 size; /* num items */ 135 __u32 size; /* num items */
136 __s32 *items; 136 __s32 *items;
137 137
138 /*
139 * cached random permutation: used for uniform bucket and for
140 * the linear search fallback for the other bucket types.
141 */
142 __u32 perm_x; /* @x for which *perm is defined */
143 __u32 perm_n; /* num elements of *perm that are permuted/defined */
144 __u32 *perm;
145}; 138};
146 139
147struct crush_bucket_uniform { 140struct crush_bucket_uniform {
@@ -211,6 +204,21 @@ struct crush_map {
211 * device fails. */ 204 * device fails. */
212 __u8 chooseleaf_stable; 205 __u8 chooseleaf_stable;
213 206
207 /*
208 * This value is calculated after decode or construction by
209 * the builder. It is exposed here (rather than having a
210 * 'build CRUSH working space' function) so that callers can
211 * reserve a static buffer, allocate space on the stack, or
212 * otherwise avoid calling into the heap allocator if they
213 * want to. The size of the working space depends on the map,
214 * while the size of the scratch vector passed to the mapper
215 * depends on the size of the desired result set.
216 *
217 * Nothing stops the caller from allocating both in one swell
218 * foop and passing in two points, though.
219 */
220 size_t working_size;
221
214#ifndef __KERNEL__ 222#ifndef __KERNEL__
215 /* 223 /*
216 * version 0 (original) of straw_calc has various flaws. version 1 224 * version 0 (original) of straw_calc has various flaws. version 1
@@ -248,4 +256,23 @@ static inline int crush_calc_tree_node(int i)
248 return ((i+1) << 1)-1; 256 return ((i+1) << 1)-1;
249} 257}
250 258
259/*
260 * These data structures are private to the CRUSH implementation. They
261 * are exposed in this header file because builder needs their
262 * definitions to calculate the total working size.
263 *
264 * Moving this out of the crush map allow us to treat the CRUSH map as
265 * immutable within the mapper and removes the requirement for a CRUSH
266 * map lock.
267 */
268struct crush_work_bucket {
269 __u32 perm_x; /* @x for which *perm is defined */
270 __u32 perm_n; /* num elements of *perm that are permuted/defined */
271 __u32 *perm; /* Permutation of the bucket's items */
272};
273
274struct crush_work {
275 struct crush_work_bucket **work; /* Per-bucket working store */
276};
277
251#endif 278#endif
diff --git a/include/linux/crush/mapper.h b/include/linux/crush/mapper.h
index 5dfd5b1125d2..c95e19e1ff11 100644
--- a/include/linux/crush/mapper.h
+++ b/include/linux/crush/mapper.h
@@ -15,6 +15,20 @@ extern int crush_do_rule(const struct crush_map *map,
15 int ruleno, 15 int ruleno,
16 int x, int *result, int result_max, 16 int x, int *result, int result_max,
17 const __u32 *weights, int weight_max, 17 const __u32 *weights, int weight_max,
18 int *scratch); 18 void *cwin);
19
20/*
21 * Returns the exact amount of workspace that will need to be used
22 * for a given combination of crush_map and result_max. The caller can
23 * then allocate this much on its own, either on the stack, in a
24 * per-thread long-lived buffer, or however it likes.
25 */
26static inline size_t crush_work_size(const struct crush_map *map,
27 int result_max)
28{
29 return map->working_size + result_max * 3 * sizeof(__u32);
30}
31
32void crush_init_workspace(const struct crush_map *map, void *v);
19 33
20#endif 34#endif
diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
index 50f040fdb2a9..b9233b990399 100644
--- a/net/ceph/cls_lock_client.c
+++ b/net/ceph/cls_lock_client.c
@@ -69,8 +69,8 @@ int ceph_cls_lock(struct ceph_osd_client *osdc,
69 dout("%s lock_name %s type %d cookie %s tag %s desc %s flags 0x%x\n", 69 dout("%s lock_name %s type %d cookie %s tag %s desc %s flags 0x%x\n",
70 __func__, lock_name, type, cookie, tag, desc, flags); 70 __func__, lock_name, type, cookie, tag, desc, flags);
71 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "lock", 71 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "lock",
72 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 72 CEPH_OSD_FLAG_WRITE, lock_op_page,
73 lock_op_page, lock_op_buf_size, NULL, NULL); 73 lock_op_buf_size, NULL, NULL);
74 74
75 dout("%s: status %d\n", __func__, ret); 75 dout("%s: status %d\n", __func__, ret);
76 __free_page(lock_op_page); 76 __free_page(lock_op_page);
@@ -117,8 +117,8 @@ int ceph_cls_unlock(struct ceph_osd_client *osdc,
117 117
118 dout("%s lock_name %s cookie %s\n", __func__, lock_name, cookie); 118 dout("%s lock_name %s cookie %s\n", __func__, lock_name, cookie);
119 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "unlock", 119 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "unlock",
120 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 120 CEPH_OSD_FLAG_WRITE, unlock_op_page,
121 unlock_op_page, unlock_op_buf_size, NULL, NULL); 121 unlock_op_buf_size, NULL, NULL);
122 122
123 dout("%s: status %d\n", __func__, ret); 123 dout("%s: status %d\n", __func__, ret);
124 __free_page(unlock_op_page); 124 __free_page(unlock_op_page);
@@ -170,8 +170,8 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
170 dout("%s lock_name %s cookie %s locker %s%llu\n", __func__, lock_name, 170 dout("%s lock_name %s cookie %s locker %s%llu\n", __func__, lock_name,
171 cookie, ENTITY_NAME(*locker)); 171 cookie, ENTITY_NAME(*locker));
172 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "break_lock", 172 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "break_lock",
173 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 173 CEPH_OSD_FLAG_WRITE, break_op_page,
174 break_op_page, break_op_buf_size, NULL, NULL); 174 break_op_buf_size, NULL, NULL);
175 175
176 dout("%s: status %d\n", __func__, ret); 176 dout("%s: status %d\n", __func__, ret);
177 __free_page(break_op_page); 177 __free_page(break_op_page);
@@ -278,7 +278,7 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc,
278 int get_info_op_buf_size; 278 int get_info_op_buf_size;
279 int name_len = strlen(lock_name); 279 int name_len = strlen(lock_name);
280 struct page *get_info_op_page, *reply_page; 280 struct page *get_info_op_page, *reply_page;
281 size_t reply_len; 281 size_t reply_len = PAGE_SIZE;
282 void *p, *end; 282 void *p, *end;
283 int ret; 283 int ret;
284 284
diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c
index 80d7c3a97cb8..5bf94c04f645 100644
--- a/net/ceph/crush/crush.c
+++ b/net/ceph/crush/crush.c
@@ -45,7 +45,6 @@ int crush_get_bucket_item_weight(const struct crush_bucket *b, int p)
45 45
46void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b) 46void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b)
47{ 47{
48 kfree(b->h.perm);
49 kfree(b->h.items); 48 kfree(b->h.items);
50 kfree(b); 49 kfree(b);
51} 50}
@@ -54,14 +53,12 @@ void crush_destroy_bucket_list(struct crush_bucket_list *b)
54{ 53{
55 kfree(b->item_weights); 54 kfree(b->item_weights);
56 kfree(b->sum_weights); 55 kfree(b->sum_weights);
57 kfree(b->h.perm);
58 kfree(b->h.items); 56 kfree(b->h.items);
59 kfree(b); 57 kfree(b);
60} 58}
61 59
62void crush_destroy_bucket_tree(struct crush_bucket_tree *b) 60void crush_destroy_bucket_tree(struct crush_bucket_tree *b)
63{ 61{
64 kfree(b->h.perm);
65 kfree(b->h.items); 62 kfree(b->h.items);
66 kfree(b->node_weights); 63 kfree(b->node_weights);
67 kfree(b); 64 kfree(b);
@@ -71,7 +68,6 @@ void crush_destroy_bucket_straw(struct crush_bucket_straw *b)
71{ 68{
72 kfree(b->straws); 69 kfree(b->straws);
73 kfree(b->item_weights); 70 kfree(b->item_weights);
74 kfree(b->h.perm);
75 kfree(b->h.items); 71 kfree(b->h.items);
76 kfree(b); 72 kfree(b);
77} 73}
@@ -79,7 +75,6 @@ void crush_destroy_bucket_straw(struct crush_bucket_straw *b)
79void crush_destroy_bucket_straw2(struct crush_bucket_straw2 *b) 75void crush_destroy_bucket_straw2(struct crush_bucket_straw2 *b)
80{ 76{
81 kfree(b->item_weights); 77 kfree(b->item_weights);
82 kfree(b->h.perm);
83 kfree(b->h.items); 78 kfree(b->h.items);
84 kfree(b); 79 kfree(b);
85} 80}
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 130ab407c5ec..b5cd8c21bfdf 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -54,7 +54,6 @@ int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size
54 return -1; 54 return -1;
55} 55}
56 56
57
58/* 57/*
59 * bucket choose methods 58 * bucket choose methods
60 * 59 *
@@ -72,59 +71,60 @@ int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size
72 * Since this is expensive, we optimize for the r=0 case, which 71 * Since this is expensive, we optimize for the r=0 case, which
73 * captures the vast majority of calls. 72 * captures the vast majority of calls.
74 */ 73 */
75static int bucket_perm_choose(struct crush_bucket *bucket, 74static int bucket_perm_choose(const struct crush_bucket *bucket,
75 struct crush_work_bucket *work,
76 int x, int r) 76 int x, int r)
77{ 77{
78 unsigned int pr = r % bucket->size; 78 unsigned int pr = r % bucket->size;
79 unsigned int i, s; 79 unsigned int i, s;
80 80
81 /* start a new permutation if @x has changed */ 81 /* start a new permutation if @x has changed */
82 if (bucket->perm_x != (__u32)x || bucket->perm_n == 0) { 82 if (work->perm_x != (__u32)x || work->perm_n == 0) {
83 dprintk("bucket %d new x=%d\n", bucket->id, x); 83 dprintk("bucket %d new x=%d\n", bucket->id, x);
84 bucket->perm_x = x; 84 work->perm_x = x;
85 85
86 /* optimize common r=0 case */ 86 /* optimize common r=0 case */
87 if (pr == 0) { 87 if (pr == 0) {
88 s = crush_hash32_3(bucket->hash, x, bucket->id, 0) % 88 s = crush_hash32_3(bucket->hash, x, bucket->id, 0) %
89 bucket->size; 89 bucket->size;
90 bucket->perm[0] = s; 90 work->perm[0] = s;
91 bucket->perm_n = 0xffff; /* magic value, see below */ 91 work->perm_n = 0xffff; /* magic value, see below */
92 goto out; 92 goto out;
93 } 93 }
94 94
95 for (i = 0; i < bucket->size; i++) 95 for (i = 0; i < bucket->size; i++)
96 bucket->perm[i] = i; 96 work->perm[i] = i;
97 bucket->perm_n = 0; 97 work->perm_n = 0;
98 } else if (bucket->perm_n == 0xffff) { 98 } else if (work->perm_n == 0xffff) {
99 /* clean up after the r=0 case above */ 99 /* clean up after the r=0 case above */
100 for (i = 1; i < bucket->size; i++) 100 for (i = 1; i < bucket->size; i++)
101 bucket->perm[i] = i; 101 work->perm[i] = i;
102 bucket->perm[bucket->perm[0]] = 0; 102 work->perm[work->perm[0]] = 0;
103 bucket->perm_n = 1; 103 work->perm_n = 1;
104 } 104 }
105 105
106 /* calculate permutation up to pr */ 106 /* calculate permutation up to pr */
107 for (i = 0; i < bucket->perm_n; i++) 107 for (i = 0; i < work->perm_n; i++)
108 dprintk(" perm_choose have %d: %d\n", i, bucket->perm[i]); 108 dprintk(" perm_choose have %d: %d\n", i, work->perm[i]);
109 while (bucket->perm_n <= pr) { 109 while (work->perm_n <= pr) {
110 unsigned int p = bucket->perm_n; 110 unsigned int p = work->perm_n;
111 /* no point in swapping the final entry */ 111 /* no point in swapping the final entry */
112 if (p < bucket->size - 1) { 112 if (p < bucket->size - 1) {
113 i = crush_hash32_3(bucket->hash, x, bucket->id, p) % 113 i = crush_hash32_3(bucket->hash, x, bucket->id, p) %
114 (bucket->size - p); 114 (bucket->size - p);
115 if (i) { 115 if (i) {
116 unsigned int t = bucket->perm[p + i]; 116 unsigned int t = work->perm[p + i];
117 bucket->perm[p + i] = bucket->perm[p]; 117 work->perm[p + i] = work->perm[p];
118 bucket->perm[p] = t; 118 work->perm[p] = t;
119 } 119 }
120 dprintk(" perm_choose swap %d with %d\n", p, p+i); 120 dprintk(" perm_choose swap %d with %d\n", p, p+i);
121 } 121 }
122 bucket->perm_n++; 122 work->perm_n++;
123 } 123 }
124 for (i = 0; i < bucket->size; i++) 124 for (i = 0; i < bucket->size; i++)
125 dprintk(" perm_choose %d: %d\n", i, bucket->perm[i]); 125 dprintk(" perm_choose %d: %d\n", i, work->perm[i]);
126 126
127 s = bucket->perm[pr]; 127 s = work->perm[pr];
128out: 128out:
129 dprintk(" perm_choose %d sz=%d x=%d r=%d (%d) s=%d\n", bucket->id, 129 dprintk(" perm_choose %d sz=%d x=%d r=%d (%d) s=%d\n", bucket->id,
130 bucket->size, x, r, pr, s); 130 bucket->size, x, r, pr, s);
@@ -132,14 +132,14 @@ out:
132} 132}
133 133
134/* uniform */ 134/* uniform */
135static int bucket_uniform_choose(struct crush_bucket_uniform *bucket, 135static int bucket_uniform_choose(const struct crush_bucket_uniform *bucket,
136 int x, int r) 136 struct crush_work_bucket *work, int x, int r)
137{ 137{
138 return bucket_perm_choose(&bucket->h, x, r); 138 return bucket_perm_choose(&bucket->h, work, x, r);
139} 139}
140 140
141/* list */ 141/* list */
142static int bucket_list_choose(struct crush_bucket_list *bucket, 142static int bucket_list_choose(const struct crush_bucket_list *bucket,
143 int x, int r) 143 int x, int r)
144{ 144{
145 int i; 145 int i;
@@ -155,8 +155,9 @@ static int bucket_list_choose(struct crush_bucket_list *bucket,
155 w *= bucket->sum_weights[i]; 155 w *= bucket->sum_weights[i];
156 w = w >> 16; 156 w = w >> 16;
157 /*dprintk(" scaled %llx\n", w);*/ 157 /*dprintk(" scaled %llx\n", w);*/
158 if (w < bucket->item_weights[i]) 158 if (w < bucket->item_weights[i]) {
159 return bucket->h.items[i]; 159 return bucket->h.items[i];
160 }
160 } 161 }
161 162
162 dprintk("bad list sums for bucket %d\n", bucket->h.id); 163 dprintk("bad list sums for bucket %d\n", bucket->h.id);
@@ -192,7 +193,7 @@ static int terminal(int x)
192 return x & 1; 193 return x & 1;
193} 194}
194 195
195static int bucket_tree_choose(struct crush_bucket_tree *bucket, 196static int bucket_tree_choose(const struct crush_bucket_tree *bucket,
196 int x, int r) 197 int x, int r)
197{ 198{
198 int n; 199 int n;
@@ -224,7 +225,7 @@ static int bucket_tree_choose(struct crush_bucket_tree *bucket,
224 225
225/* straw */ 226/* straw */
226 227
227static int bucket_straw_choose(struct crush_bucket_straw *bucket, 228static int bucket_straw_choose(const struct crush_bucket_straw *bucket,
228 int x, int r) 229 int x, int r)
229{ 230{
230 __u32 i; 231 __u32 i;
@@ -301,7 +302,7 @@ static __u64 crush_ln(unsigned int xin)
301 * 302 *
302 */ 303 */
303 304
304static int bucket_straw2_choose(struct crush_bucket_straw2 *bucket, 305static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket,
305 int x, int r) 306 int x, int r)
306{ 307{
307 unsigned int i, high = 0; 308 unsigned int i, high = 0;
@@ -344,37 +345,42 @@ static int bucket_straw2_choose(struct crush_bucket_straw2 *bucket,
344 high_draw = draw; 345 high_draw = draw;
345 } 346 }
346 } 347 }
348
347 return bucket->h.items[high]; 349 return bucket->h.items[high];
348} 350}
349 351
350 352
351static int crush_bucket_choose(struct crush_bucket *in, int x, int r) 353static int crush_bucket_choose(const struct crush_bucket *in,
354 struct crush_work_bucket *work,
355 int x, int r)
352{ 356{
353 dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r); 357 dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r);
354 BUG_ON(in->size == 0); 358 BUG_ON(in->size == 0);
355 switch (in->alg) { 359 switch (in->alg) {
356 case CRUSH_BUCKET_UNIFORM: 360 case CRUSH_BUCKET_UNIFORM:
357 return bucket_uniform_choose((struct crush_bucket_uniform *)in, 361 return bucket_uniform_choose(
358 x, r); 362 (const struct crush_bucket_uniform *)in,
363 work, x, r);
359 case CRUSH_BUCKET_LIST: 364 case CRUSH_BUCKET_LIST:
360 return bucket_list_choose((struct crush_bucket_list *)in, 365 return bucket_list_choose((const struct crush_bucket_list *)in,
361 x, r); 366 x, r);
362 case CRUSH_BUCKET_TREE: 367 case CRUSH_BUCKET_TREE:
363 return bucket_tree_choose((struct crush_bucket_tree *)in, 368 return bucket_tree_choose((const struct crush_bucket_tree *)in,
364 x, r); 369 x, r);
365 case CRUSH_BUCKET_STRAW: 370 case CRUSH_BUCKET_STRAW:
366 return bucket_straw_choose((struct crush_bucket_straw *)in, 371 return bucket_straw_choose(
367 x, r); 372 (const struct crush_bucket_straw *)in,
373 x, r);
368 case CRUSH_BUCKET_STRAW2: 374 case CRUSH_BUCKET_STRAW2:
369 return bucket_straw2_choose((struct crush_bucket_straw2 *)in, 375 return bucket_straw2_choose(
370 x, r); 376 (const struct crush_bucket_straw2 *)in,
377 x, r);
371 default: 378 default:
372 dprintk("unknown bucket %d alg %d\n", in->id, in->alg); 379 dprintk("unknown bucket %d alg %d\n", in->id, in->alg);
373 return in->items[0]; 380 return in->items[0];
374 } 381 }
375} 382}
376 383
377
378/* 384/*
379 * true if device is marked "out" (failed, fully offloaded) 385 * true if device is marked "out" (failed, fully offloaded)
380 * of the cluster 386 * of the cluster
@@ -416,7 +422,8 @@ static int is_out(const struct crush_map *map,
416 * @parent_r: r value passed from the parent 422 * @parent_r: r value passed from the parent
417 */ 423 */
418static int crush_choose_firstn(const struct crush_map *map, 424static int crush_choose_firstn(const struct crush_map *map,
419 struct crush_bucket *bucket, 425 struct crush_work *work,
426 const struct crush_bucket *bucket,
420 const __u32 *weight, int weight_max, 427 const __u32 *weight, int weight_max,
421 int x, int numrep, int type, 428 int x, int numrep, int type,
422 int *out, int outpos, 429 int *out, int outpos,
@@ -434,7 +441,7 @@ static int crush_choose_firstn(const struct crush_map *map,
434 int rep; 441 int rep;
435 unsigned int ftotal, flocal; 442 unsigned int ftotal, flocal;
436 int retry_descent, retry_bucket, skip_rep; 443 int retry_descent, retry_bucket, skip_rep;
437 struct crush_bucket *in = bucket; 444 const struct crush_bucket *in = bucket;
438 int r; 445 int r;
439 int i; 446 int i;
440 int item = 0; 447 int item = 0;
@@ -473,9 +480,13 @@ static int crush_choose_firstn(const struct crush_map *map,
473 if (local_fallback_retries > 0 && 480 if (local_fallback_retries > 0 &&
474 flocal >= (in->size>>1) && 481 flocal >= (in->size>>1) &&
475 flocal > local_fallback_retries) 482 flocal > local_fallback_retries)
476 item = bucket_perm_choose(in, x, r); 483 item = bucket_perm_choose(
484 in, work->work[-1-in->id],
485 x, r);
477 else 486 else
478 item = crush_bucket_choose(in, x, r); 487 item = crush_bucket_choose(
488 in, work->work[-1-in->id],
489 x, r);
479 if (item >= map->max_devices) { 490 if (item >= map->max_devices) {
480 dprintk(" bad item %d\n", item); 491 dprintk(" bad item %d\n", item);
481 skip_rep = 1; 492 skip_rep = 1;
@@ -518,19 +529,21 @@ static int crush_choose_firstn(const struct crush_map *map,
518 sub_r = r >> (vary_r-1); 529 sub_r = r >> (vary_r-1);
519 else 530 else
520 sub_r = 0; 531 sub_r = 0;
521 if (crush_choose_firstn(map, 532 if (crush_choose_firstn(
522 map->buckets[-1-item], 533 map,
523 weight, weight_max, 534 work,
524 x, stable ? 1 : outpos+1, 0, 535 map->buckets[-1-item],
525 out2, outpos, count, 536 weight, weight_max,
526 recurse_tries, 0, 537 x, stable ? 1 : outpos+1, 0,
527 local_retries, 538 out2, outpos, count,
528 local_fallback_retries, 539 recurse_tries, 0,
529 0, 540 local_retries,
530 vary_r, 541 local_fallback_retries,
531 stable, 542 0,
532 NULL, 543 vary_r,
533 sub_r) <= outpos) 544 stable,
545 NULL,
546 sub_r) <= outpos)
534 /* didn't get leaf */ 547 /* didn't get leaf */
535 reject = 1; 548 reject = 1;
536 } else { 549 } else {
@@ -539,14 +552,12 @@ static int crush_choose_firstn(const struct crush_map *map,
539 } 552 }
540 } 553 }
541 554
542 if (!reject) { 555 if (!reject && !collide) {
543 /* out? */ 556 /* out? */
544 if (itemtype == 0) 557 if (itemtype == 0)
545 reject = is_out(map, weight, 558 reject = is_out(map, weight,
546 weight_max, 559 weight_max,
547 item, x); 560 item, x);
548 else
549 reject = 0;
550 } 561 }
551 562
552reject: 563reject:
@@ -600,7 +611,8 @@ reject:
600 * 611 *
601 */ 612 */
602static void crush_choose_indep(const struct crush_map *map, 613static void crush_choose_indep(const struct crush_map *map,
603 struct crush_bucket *bucket, 614 struct crush_work *work,
615 const struct crush_bucket *bucket,
604 const __u32 *weight, int weight_max, 616 const __u32 *weight, int weight_max,
605 int x, int left, int numrep, int type, 617 int x, int left, int numrep, int type,
606 int *out, int outpos, 618 int *out, int outpos,
@@ -610,7 +622,7 @@ static void crush_choose_indep(const struct crush_map *map,
610 int *out2, 622 int *out2,
611 int parent_r) 623 int parent_r)
612{ 624{
613 struct crush_bucket *in = bucket; 625 const struct crush_bucket *in = bucket;
614 int endpos = outpos + left; 626 int endpos = outpos + left;
615 int rep; 627 int rep;
616 unsigned int ftotal; 628 unsigned int ftotal;
@@ -678,7 +690,9 @@ static void crush_choose_indep(const struct crush_map *map,
678 break; 690 break;
679 } 691 }
680 692
681 item = crush_bucket_choose(in, x, r); 693 item = crush_bucket_choose(
694 in, work->work[-1-in->id],
695 x, r);
682 if (item >= map->max_devices) { 696 if (item >= map->max_devices) {
683 dprintk(" bad item %d\n", item); 697 dprintk(" bad item %d\n", item);
684 out[rep] = CRUSH_ITEM_NONE; 698 out[rep] = CRUSH_ITEM_NONE;
@@ -724,13 +738,15 @@ static void crush_choose_indep(const struct crush_map *map,
724 738
725 if (recurse_to_leaf) { 739 if (recurse_to_leaf) {
726 if (item < 0) { 740 if (item < 0) {
727 crush_choose_indep(map, 741 crush_choose_indep(
728 map->buckets[-1-item], 742 map,
729 weight, weight_max, 743 work,
730 x, 1, numrep, 0, 744 map->buckets[-1-item],
731 out2, rep, 745 weight, weight_max,
732 recurse_tries, 0, 746 x, 1, numrep, 0,
733 0, NULL, r); 747 out2, rep,
748 recurse_tries, 0,
749 0, NULL, r);
734 if (out2[rep] == CRUSH_ITEM_NONE) { 750 if (out2[rep] == CRUSH_ITEM_NONE) {
735 /* placed nothing; no leaf */ 751 /* placed nothing; no leaf */
736 break; 752 break;
@@ -781,6 +797,53 @@ static void crush_choose_indep(const struct crush_map *map,
781#endif 797#endif
782} 798}
783 799
800
801/*
802 * This takes a chunk of memory and sets it up to be a shiny new
803 * working area for a CRUSH placement computation. It must be called
804 * on any newly allocated memory before passing it in to
805 * crush_do_rule. It may be used repeatedly after that, so long as the
806 * map has not changed. If the map /has/ changed, you must make sure
807 * the working size is no smaller than what was allocated and re-run
808 * crush_init_workspace.
809 *
810 * If you do retain the working space between calls to crush, make it
811 * thread-local.
812 */
813void crush_init_workspace(const struct crush_map *map, void *v)
814{
815 struct crush_work *w = v;
816 __s32 b;
817
818 /*
819 * We work by moving through the available space and setting
820 * values and pointers as we go.
821 *
822 * It's a bit like Forth's use of the 'allot' word since we
823 * set the pointer first and then reserve the space for it to
824 * point to by incrementing the point.
825 */
826 v += sizeof(struct crush_work *);
827 w->work = v;
828 v += map->max_buckets * sizeof(struct crush_work_bucket *);
829 for (b = 0; b < map->max_buckets; ++b) {
830 if (!map->buckets[b])
831 continue;
832
833 w->work[b] = v;
834 switch (map->buckets[b]->alg) {
835 default:
836 v += sizeof(struct crush_work_bucket);
837 break;
838 }
839 w->work[b]->perm_x = 0;
840 w->work[b]->perm_n = 0;
841 w->work[b]->perm = v;
842 v += map->buckets[b]->size * sizeof(__u32);
843 }
844 BUG_ON(v - (void *)w != map->working_size);
845}
846
784/** 847/**
785 * crush_do_rule - calculate a mapping with the given input and rule 848 * crush_do_rule - calculate a mapping with the given input and rule
786 * @map: the crush_map 849 * @map: the crush_map
@@ -790,24 +853,25 @@ static void crush_choose_indep(const struct crush_map *map,
790 * @result_max: maximum result size 853 * @result_max: maximum result size
791 * @weight: weight vector (for map leaves) 854 * @weight: weight vector (for map leaves)
792 * @weight_max: size of weight vector 855 * @weight_max: size of weight vector
793 * @scratch: scratch vector for private use; must be >= 3 * result_max 856 * @cwin: pointer to at least crush_work_size() bytes of memory
794 */ 857 */
795int crush_do_rule(const struct crush_map *map, 858int crush_do_rule(const struct crush_map *map,
796 int ruleno, int x, int *result, int result_max, 859 int ruleno, int x, int *result, int result_max,
797 const __u32 *weight, int weight_max, 860 const __u32 *weight, int weight_max,
798 int *scratch) 861 void *cwin)
799{ 862{
800 int result_len; 863 int result_len;
801 int *a = scratch; 864 struct crush_work *cw = cwin;
802 int *b = scratch + result_max; 865 int *a = cwin + map->working_size;
803 int *c = scratch + result_max*2; 866 int *b = a + result_max;
867 int *c = b + result_max;
868 int *w = a;
869 int *o = b;
804 int recurse_to_leaf; 870 int recurse_to_leaf;
805 int *w;
806 int wsize = 0; 871 int wsize = 0;
807 int *o;
808 int osize; 872 int osize;
809 int *tmp; 873 int *tmp;
810 struct crush_rule *rule; 874 const struct crush_rule *rule;
811 __u32 step; 875 __u32 step;
812 int i, j; 876 int i, j;
813 int numrep; 877 int numrep;
@@ -835,12 +899,10 @@ int crush_do_rule(const struct crush_map *map,
835 899
836 rule = map->rules[ruleno]; 900 rule = map->rules[ruleno];
837 result_len = 0; 901 result_len = 0;
838 w = a;
839 o = b;
840 902
841 for (step = 0; step < rule->len; step++) { 903 for (step = 0; step < rule->len; step++) {
842 int firstn = 0; 904 int firstn = 0;
843 struct crush_rule_step *curstep = &rule->steps[step]; 905 const struct crush_rule_step *curstep = &rule->steps[step];
844 906
845 switch (curstep->op) { 907 switch (curstep->op) {
846 case CRUSH_RULE_TAKE: 908 case CRUSH_RULE_TAKE:
@@ -936,6 +998,7 @@ int crush_do_rule(const struct crush_map *map,
936 recurse_tries = choose_tries; 998 recurse_tries = choose_tries;
937 osize += crush_choose_firstn( 999 osize += crush_choose_firstn(
938 map, 1000 map,
1001 cw,
939 map->buckets[bno], 1002 map->buckets[bno],
940 weight, weight_max, 1003 weight, weight_max,
941 x, numrep, 1004 x, numrep,
@@ -956,6 +1019,7 @@ int crush_do_rule(const struct crush_map *map,
956 numrep : (result_max-osize)); 1019 numrep : (result_max-osize));
957 crush_choose_indep( 1020 crush_choose_indep(
958 map, 1021 map,
1022 cw,
959 map->buckets[bno], 1023 map->buckets[bno],
960 weight, weight_max, 1024 weight, weight_max,
961 x, out_size, numrep, 1025 x, out_size, numrep,
@@ -997,5 +1061,6 @@ int crush_do_rule(const struct crush_map *map,
997 break; 1061 break;
998 } 1062 }
999 } 1063 }
1064
1000 return result_len; 1065 return result_len;
1001} 1066}
diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c
index 292e33bd916e..85747b7f91a9 100644
--- a/net/ceph/crypto.c
+++ b/net/ceph/crypto.c
@@ -3,6 +3,7 @@
3 3
4#include <linux/err.h> 4#include <linux/err.h>
5#include <linux/scatterlist.h> 5#include <linux/scatterlist.h>
6#include <linux/sched.h>
6#include <linux/slab.h> 7#include <linux/slab.h>
7#include <crypto/aes.h> 8#include <crypto/aes.h>
8#include <crypto/skcipher.h> 9#include <crypto/skcipher.h>
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index f3378ba1a828..b65bbf9f45eb 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -460,7 +460,6 @@ static void request_init(struct ceph_osd_request *req)
460 460
461 kref_init(&req->r_kref); 461 kref_init(&req->r_kref);
462 init_completion(&req->r_completion); 462 init_completion(&req->r_completion);
463 init_completion(&req->r_done_completion);
464 RB_CLEAR_NODE(&req->r_node); 463 RB_CLEAR_NODE(&req->r_node);
465 RB_CLEAR_NODE(&req->r_mc_node); 464 RB_CLEAR_NODE(&req->r_mc_node);
466 INIT_LIST_HEAD(&req->r_unsafe_item); 465 INIT_LIST_HEAD(&req->r_unsafe_item);
@@ -672,7 +671,8 @@ void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
672 BUG_ON(length > previous); 671 BUG_ON(length > previous);
673 672
674 op->extent.length = length; 673 op->extent.length = length;
675 op->indata_len -= previous - length; 674 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
675 op->indata_len -= previous - length;
676} 676}
677EXPORT_SYMBOL(osd_req_op_extent_update); 677EXPORT_SYMBOL(osd_req_op_extent_update);
678 678
@@ -1636,7 +1636,7 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
1636 bool need_send = false; 1636 bool need_send = false;
1637 bool promoted = false; 1637 bool promoted = false;
1638 1638
1639 WARN_ON(req->r_tid || req->r_got_reply); 1639 WARN_ON(req->r_tid);
1640 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); 1640 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
1641 1641
1642again: 1642again:
@@ -1704,17 +1704,10 @@ promote:
1704 1704
1705static void account_request(struct ceph_osd_request *req) 1705static void account_request(struct ceph_osd_request *req)
1706{ 1706{
1707 unsigned int mask = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; 1707 WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK));
1708 WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)));
1708 1709
1709 if (req->r_flags & CEPH_OSD_FLAG_READ) { 1710 req->r_flags |= CEPH_OSD_FLAG_ONDISK;
1710 WARN_ON(req->r_flags & mask);
1711 req->r_flags |= CEPH_OSD_FLAG_ACK;
1712 } else if (req->r_flags & CEPH_OSD_FLAG_WRITE)
1713 WARN_ON(!(req->r_flags & mask));
1714 else
1715 WARN_ON(1);
1716
1717 WARN_ON(req->r_unsafe_callback && (req->r_flags & mask) != mask);
1718 atomic_inc(&req->r_osdc->num_requests); 1711 atomic_inc(&req->r_osdc->num_requests);
1719} 1712}
1720 1713
@@ -1749,15 +1742,15 @@ static void finish_request(struct ceph_osd_request *req)
1749 1742
1750static void __complete_request(struct ceph_osd_request *req) 1743static void __complete_request(struct ceph_osd_request *req)
1751{ 1744{
1752 if (req->r_callback) 1745 if (req->r_callback) {
1746 dout("%s req %p tid %llu cb %pf result %d\n", __func__, req,
1747 req->r_tid, req->r_callback, req->r_result);
1753 req->r_callback(req); 1748 req->r_callback(req);
1754 else 1749 }
1755 complete_all(&req->r_completion);
1756} 1750}
1757 1751
1758/* 1752/*
1759 * Note that this is open-coded in handle_reply(), which has to deal 1753 * This is open-coded in handle_reply().
1760 * with ack vs commit, dup acks, etc.
1761 */ 1754 */
1762static void complete_request(struct ceph_osd_request *req, int err) 1755static void complete_request(struct ceph_osd_request *req, int err)
1763{ 1756{
@@ -1766,7 +1759,7 @@ static void complete_request(struct ceph_osd_request *req, int err)
1766 req->r_result = err; 1759 req->r_result = err;
1767 finish_request(req); 1760 finish_request(req);
1768 __complete_request(req); 1761 __complete_request(req);
1769 complete_all(&req->r_done_completion); 1762 complete_all(&req->r_completion);
1770 ceph_osdc_put_request(req); 1763 ceph_osdc_put_request(req);
1771} 1764}
1772 1765
@@ -1792,7 +1785,7 @@ static void cancel_request(struct ceph_osd_request *req)
1792 1785
1793 cancel_map_check(req); 1786 cancel_map_check(req);
1794 finish_request(req); 1787 finish_request(req);
1795 complete_all(&req->r_done_completion); 1788 complete_all(&req->r_completion);
1796 ceph_osdc_put_request(req); 1789 ceph_osdc_put_request(req);
1797} 1790}
1798 1791
@@ -2169,7 +2162,6 @@ static void linger_commit_cb(struct ceph_osd_request *req)
2169 mutex_lock(&lreq->lock); 2162 mutex_lock(&lreq->lock);
2170 dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq, 2163 dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
2171 lreq->linger_id, req->r_result); 2164 lreq->linger_id, req->r_result);
2172 WARN_ON(!__linger_registered(lreq));
2173 linger_reg_commit_complete(lreq, req->r_result); 2165 linger_reg_commit_complete(lreq, req->r_result);
2174 lreq->committed = true; 2166 lreq->committed = true;
2175 2167
@@ -2785,31 +2777,8 @@ e_inval:
2785} 2777}
2786 2778
2787/* 2779/*
2788 * We are done with @req if 2780 * Handle MOSDOpReply. Set ->r_result and call the callback if it is
2789 * - @m is a safe reply, or 2781 * specified.
2790 * - @m is an unsafe reply and we didn't want a safe one
2791 */
2792static bool done_request(const struct ceph_osd_request *req,
2793 const struct MOSDOpReply *m)
2794{
2795 return (m->result < 0 ||
2796 (m->flags & CEPH_OSD_FLAG_ONDISK) ||
2797 !(req->r_flags & CEPH_OSD_FLAG_ONDISK));
2798}
2799
2800/*
2801 * handle osd op reply. either call the callback if it is specified,
2802 * or do the completion to wake up the waiting thread.
2803 *
2804 * ->r_unsafe_callback is set? yes no
2805 *
2806 * first reply is OK (needed r_cb/r_completion, r_cb/r_completion,
2807 * any or needed/got safe) r_done_completion r_done_completion
2808 *
2809 * first reply is unsafe r_unsafe_cb(true) (nothing)
2810 *
2811 * when we get the safe reply r_unsafe_cb(false), r_cb/r_completion,
2812 * r_done_completion r_done_completion
2813 */ 2782 */
2814static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) 2783static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
2815{ 2784{
@@ -2818,7 +2787,6 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
2818 struct MOSDOpReply m; 2787 struct MOSDOpReply m;
2819 u64 tid = le64_to_cpu(msg->hdr.tid); 2788 u64 tid = le64_to_cpu(msg->hdr.tid);
2820 u32 data_len = 0; 2789 u32 data_len = 0;
2821 bool already_acked;
2822 int ret; 2790 int ret;
2823 int i; 2791 int i;
2824 2792
@@ -2897,50 +2865,22 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
2897 le32_to_cpu(msg->hdr.data_len), req->r_tid); 2865 le32_to_cpu(msg->hdr.data_len), req->r_tid);
2898 goto fail_request; 2866 goto fail_request;
2899 } 2867 }
2900 dout("%s req %p tid %llu acked %d result %d data_len %u\n", __func__, 2868 dout("%s req %p tid %llu result %d data_len %u\n", __func__,
2901 req, req->r_tid, req->r_got_reply, m.result, data_len); 2869 req, req->r_tid, m.result, data_len);
2902
2903 already_acked = req->r_got_reply;
2904 if (!already_acked) {
2905 req->r_result = m.result ?: data_len;
2906 req->r_replay_version = m.replay_version; /* struct */
2907 req->r_got_reply = true;
2908 } else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) {
2909 dout("req %p tid %llu dup ack\n", req, req->r_tid);
2910 goto out_unlock_session;
2911 }
2912
2913 if (done_request(req, &m)) {
2914 finish_request(req);
2915 if (req->r_linger) {
2916 WARN_ON(req->r_unsafe_callback);
2917 dout("req %p tid %llu cb (locked)\n", req, req->r_tid);
2918 __complete_request(req);
2919 }
2920 }
2921 2870
2871 /*
2872 * Since we only ever request ONDISK, we should only ever get
2873 * one (type of) reply back.
2874 */
2875 WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK));
2876 req->r_result = m.result ?: data_len;
2877 finish_request(req);
2922 mutex_unlock(&osd->lock); 2878 mutex_unlock(&osd->lock);
2923 up_read(&osdc->lock); 2879 up_read(&osdc->lock);
2924 2880
2925 if (done_request(req, &m)) { 2881 __complete_request(req);
2926 if (already_acked && req->r_unsafe_callback) { 2882 complete_all(&req->r_completion);
2927 dout("req %p tid %llu safe-cb\n", req, req->r_tid); 2883 ceph_osdc_put_request(req);
2928 req->r_unsafe_callback(req, false);
2929 } else if (!req->r_linger) {
2930 dout("req %p tid %llu cb\n", req, req->r_tid);
2931 __complete_request(req);
2932 }
2933 complete_all(&req->r_done_completion);
2934 ceph_osdc_put_request(req);
2935 } else {
2936 if (req->r_unsafe_callback) {
2937 dout("req %p tid %llu unsafe-cb\n", req, req->r_tid);
2938 req->r_unsafe_callback(req, true);
2939 } else {
2940 WARN_ON(1);
2941 }
2942 }
2943
2944 return; 2884 return;
2945 2885
2946fail_request: 2886fail_request:
@@ -3540,7 +3480,7 @@ again:
3540 up_read(&osdc->lock); 3480 up_read(&osdc->lock);
3541 dout("%s waiting on req %p tid %llu last_tid %llu\n", 3481 dout("%s waiting on req %p tid %llu last_tid %llu\n",
3542 __func__, req, req->r_tid, last_tid); 3482 __func__, req, req->r_tid, last_tid);
3543 wait_for_completion(&req->r_done_completion); 3483 wait_for_completion(&req->r_completion);
3544 ceph_osdc_put_request(req); 3484 ceph_osdc_put_request(req);
3545 goto again; 3485 goto again;
3546 } 3486 }
@@ -3599,7 +3539,7 @@ ceph_osdc_watch(struct ceph_osd_client *osdc,
3599 3539
3600 ceph_oid_copy(&lreq->t.base_oid, oid); 3540 ceph_oid_copy(&lreq->t.base_oid, oid);
3601 ceph_oloc_copy(&lreq->t.base_oloc, oloc); 3541 ceph_oloc_copy(&lreq->t.base_oloc, oloc);
3602 lreq->t.flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; 3542 lreq->t.flags = CEPH_OSD_FLAG_WRITE;
3603 lreq->mtime = CURRENT_TIME; 3543 lreq->mtime = CURRENT_TIME;
3604 3544
3605 lreq->reg_req = alloc_linger_request(lreq); 3545 lreq->reg_req = alloc_linger_request(lreq);
@@ -3657,7 +3597,7 @@ int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
3657 3597
3658 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 3598 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
3659 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 3599 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
3660 req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; 3600 req->r_flags = CEPH_OSD_FLAG_WRITE;
3661 req->r_mtime = CURRENT_TIME; 3601 req->r_mtime = CURRENT_TIME;
3662 osd_req_op_watch_init(req, 0, lreq->linger_id, 3602 osd_req_op_watch_init(req, 0, lreq->linger_id,
3663 CEPH_OSD_WATCH_OP_UNWATCH); 3603 CEPH_OSD_WATCH_OP_UNWATCH);
@@ -4022,7 +3962,7 @@ EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
4022 * Execute an OSD class method on an object. 3962 * Execute an OSD class method on an object.
4023 * 3963 *
4024 * @flags: CEPH_OSD_FLAG_* 3964 * @flags: CEPH_OSD_FLAG_*
4025 * @resp_len: out param for reply length 3965 * @resp_len: in/out param for reply length
4026 */ 3966 */
4027int ceph_osdc_call(struct ceph_osd_client *osdc, 3967int ceph_osdc_call(struct ceph_osd_client *osdc,
4028 struct ceph_object_id *oid, 3968 struct ceph_object_id *oid,
@@ -4035,6 +3975,9 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
4035 struct ceph_osd_request *req; 3975 struct ceph_osd_request *req;
4036 int ret; 3976 int ret;
4037 3977
3978 if (req_len > PAGE_SIZE || (resp_page && *resp_len > PAGE_SIZE))
3979 return -E2BIG;
3980
4038 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 3981 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4039 if (!req) 3982 if (!req)
4040 return -ENOMEM; 3983 return -ENOMEM;
@@ -4053,7 +3996,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
4053 0, false, false); 3996 0, false, false);
4054 if (resp_page) 3997 if (resp_page)
4055 osd_req_op_cls_response_data_pages(req, 0, &resp_page, 3998 osd_req_op_cls_response_data_pages(req, 0, &resp_page,
4056 PAGE_SIZE, 0, false, false); 3999 *resp_len, 0, false, false);
4057 4000
4058 ceph_osdc_start_request(osdc, req, false); 4001 ceph_osdc_start_request(osdc, req, false);
4059 ret = ceph_osdc_wait_request(osdc, req); 4002 ret = ceph_osdc_wait_request(osdc, req);
@@ -4220,8 +4163,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
4220 int page_align = off & ~PAGE_MASK; 4163 int page_align = off & ~PAGE_MASK;
4221 4164
4222 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1, 4165 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
4223 CEPH_OSD_OP_WRITE, 4166 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
4224 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
4225 snapc, truncate_seq, truncate_size, 4167 snapc, truncate_seq, truncate_size,
4226 true); 4168 true);
4227 if (IS_ERR(req)) 4169 if (IS_ERR(req))
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index d2436880b305..6824c0ec8373 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -153,6 +153,32 @@ bad:
153 return -EINVAL; 153 return -EINVAL;
154} 154}
155 155
156static void crush_finalize(struct crush_map *c)
157{
158 __s32 b;
159
160 /* Space for the array of pointers to per-bucket workspace */
161 c->working_size = sizeof(struct crush_work) +
162 c->max_buckets * sizeof(struct crush_work_bucket *);
163
164 for (b = 0; b < c->max_buckets; b++) {
165 if (!c->buckets[b])
166 continue;
167
168 switch (c->buckets[b]->alg) {
169 default:
170 /*
171 * The base case, permutation variables and
172 * the pointer to the permutation array.
173 */
174 c->working_size += sizeof(struct crush_work_bucket);
175 break;
176 }
177 /* Every bucket has a permutation array. */
178 c->working_size += c->buckets[b]->size * sizeof(__u32);
179 }
180}
181
156static struct crush_map *crush_decode(void *pbyval, void *end) 182static struct crush_map *crush_decode(void *pbyval, void *end)
157{ 183{
158 struct crush_map *c; 184 struct crush_map *c;
@@ -246,10 +272,6 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
246 b->items = kcalloc(b->size, sizeof(__s32), GFP_NOFS); 272 b->items = kcalloc(b->size, sizeof(__s32), GFP_NOFS);
247 if (b->items == NULL) 273 if (b->items == NULL)
248 goto badmem; 274 goto badmem;
249 b->perm = kcalloc(b->size, sizeof(u32), GFP_NOFS);
250 if (b->perm == NULL)
251 goto badmem;
252 b->perm_n = 0;
253 275
254 ceph_decode_need(p, end, b->size*sizeof(u32), bad); 276 ceph_decode_need(p, end, b->size*sizeof(u32), bad);
255 for (j = 0; j < b->size; j++) 277 for (j = 0; j < b->size; j++)
@@ -368,6 +390,8 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
368 dout("crush decode tunable chooseleaf_stable = %d\n", 390 dout("crush decode tunable chooseleaf_stable = %d\n",
369 c->chooseleaf_stable); 391 c->chooseleaf_stable);
370 392
393 crush_finalize(c);
394
371done: 395done:
372 dout("crush_decode success\n"); 396 dout("crush_decode success\n");
373 return c; 397 return c;
@@ -719,7 +743,7 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
719 map->pool_max = -1; 743 map->pool_max = -1;
720 map->pg_temp = RB_ROOT; 744 map->pg_temp = RB_ROOT;
721 map->primary_temp = RB_ROOT; 745 map->primary_temp = RB_ROOT;
722 mutex_init(&map->crush_scratch_mutex); 746 mutex_init(&map->crush_workspace_mutex);
723 747
724 return map; 748 return map;
725} 749}
@@ -753,6 +777,7 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
753 kfree(map->osd_weight); 777 kfree(map->osd_weight);
754 kfree(map->osd_addr); 778 kfree(map->osd_addr);
755 kfree(map->osd_primary_affinity); 779 kfree(map->osd_primary_affinity);
780 kfree(map->crush_workspace);
756 kfree(map); 781 kfree(map);
757} 782}
758 783
@@ -808,6 +833,31 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
808 return 0; 833 return 0;
809} 834}
810 835
836static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
837{
838 void *workspace;
839 size_t work_size;
840
841 if (IS_ERR(crush))
842 return PTR_ERR(crush);
843
844 work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
845 dout("%s work_size %zu bytes\n", __func__, work_size);
846 workspace = kmalloc(work_size, GFP_NOIO);
847 if (!workspace) {
848 crush_destroy(crush);
849 return -ENOMEM;
850 }
851 crush_init_workspace(crush, workspace);
852
853 if (map->crush)
854 crush_destroy(map->crush);
855 kfree(map->crush_workspace);
856 map->crush = crush;
857 map->crush_workspace = workspace;
858 return 0;
859}
860
811#define OSDMAP_WRAPPER_COMPAT_VER 7 861#define OSDMAP_WRAPPER_COMPAT_VER 7
812#define OSDMAP_CLIENT_DATA_COMPAT_VER 1 862#define OSDMAP_CLIENT_DATA_COMPAT_VER 1
813 863
@@ -1214,13 +1264,9 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
1214 1264
1215 /* crush */ 1265 /* crush */
1216 ceph_decode_32_safe(p, end, len, e_inval); 1266 ceph_decode_32_safe(p, end, len, e_inval);
1217 map->crush = crush_decode(*p, min(*p + len, end)); 1267 err = osdmap_set_crush(map, crush_decode(*p, min(*p + len, end)));
1218 if (IS_ERR(map->crush)) { 1268 if (err)
1219 err = PTR_ERR(map->crush);
1220 map->crush = NULL;
1221 goto bad; 1269 goto bad;
1222 }
1223 *p += len;
1224 1270
1225 /* ignore the rest */ 1271 /* ignore the rest */
1226 *p = end; 1272 *p = end;
@@ -1375,7 +1421,6 @@ e_inval:
1375struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, 1421struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
1376 struct ceph_osdmap *map) 1422 struct ceph_osdmap *map)
1377{ 1423{
1378 struct crush_map *newcrush = NULL;
1379 struct ceph_fsid fsid; 1424 struct ceph_fsid fsid;
1380 u32 epoch = 0; 1425 u32 epoch = 0;
1381 struct ceph_timespec modified; 1426 struct ceph_timespec modified;
@@ -1414,12 +1459,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
1414 /* new crush? */ 1459 /* new crush? */
1415 ceph_decode_32_safe(p, end, len, e_inval); 1460 ceph_decode_32_safe(p, end, len, e_inval);
1416 if (len > 0) { 1461 if (len > 0) {
1417 newcrush = crush_decode(*p, min(*p+len, end)); 1462 err = osdmap_set_crush(map,
1418 if (IS_ERR(newcrush)) { 1463 crush_decode(*p, min(*p + len, end)));
1419 err = PTR_ERR(newcrush); 1464 if (err)
1420 newcrush = NULL;
1421 goto bad; 1465 goto bad;
1422 }
1423 *p += len; 1466 *p += len;
1424 } 1467 }
1425 1468
@@ -1439,12 +1482,6 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
1439 1482
1440 map->epoch++; 1483 map->epoch++;
1441 map->modified = modified; 1484 map->modified = modified;
1442 if (newcrush) {
1443 if (map->crush)
1444 crush_destroy(map->crush);
1445 map->crush = newcrush;
1446 newcrush = NULL;
1447 }
1448 1485
1449 /* new_pools */ 1486 /* new_pools */
1450 err = decode_new_pools(p, end, map); 1487 err = decode_new_pools(p, end, map);
@@ -1505,8 +1542,6 @@ bad:
1505 print_hex_dump(KERN_DEBUG, "osdmap: ", 1542 print_hex_dump(KERN_DEBUG, "osdmap: ",
1506 DUMP_PREFIX_OFFSET, 16, 1, 1543 DUMP_PREFIX_OFFSET, 16, 1,
1507 start, end - start, true); 1544 start, end - start, true);
1508 if (newcrush)
1509 crush_destroy(newcrush);
1510 return ERR_PTR(err); 1545 return ERR_PTR(err);
1511} 1546}
1512 1547
@@ -1942,10 +1977,10 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
1942 1977
1943 BUG_ON(result_max > CEPH_PG_MAX_SIZE); 1978 BUG_ON(result_max > CEPH_PG_MAX_SIZE);
1944 1979
1945 mutex_lock(&map->crush_scratch_mutex); 1980 mutex_lock(&map->crush_workspace_mutex);
1946 r = crush_do_rule(map->crush, ruleno, x, result, result_max, 1981 r = crush_do_rule(map->crush, ruleno, x, result, result_max,
1947 weight, weight_max, map->crush_scratch_ary); 1982 weight, weight_max, map->crush_workspace);
1948 mutex_unlock(&map->crush_scratch_mutex); 1983 mutex_unlock(&map->crush_workspace_mutex);
1949 1984
1950 return r; 1985 return r;
1951} 1986}
@@ -1978,8 +2013,14 @@ static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
1978 return; 2013 return;
1979 } 2014 }
1980 2015
1981 len = do_crush(osdmap, ruleno, pps, raw->osds, 2016 if (pi->size > ARRAY_SIZE(raw->osds)) {
1982 min_t(int, pi->size, ARRAY_SIZE(raw->osds)), 2017 pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n",
2018 pi->id, pi->crush_ruleset, pi->type, pi->size,
2019 ARRAY_SIZE(raw->osds));
2020 return;
2021 }
2022
2023 len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size,
1983 osdmap->osd_weight, osdmap->max_osd); 2024 osdmap->osd_weight, osdmap->max_osd);
1984 if (len < 0) { 2025 if (len < 0) {
1985 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n", 2026 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
diff --git a/net/ceph/snapshot.c b/net/ceph/snapshot.c
index 154683f5f14c..705414e78ae0 100644
--- a/net/ceph/snapshot.c
+++ b/net/ceph/snapshot.c
@@ -18,8 +18,6 @@
18 * 02110-1301, USA. 18 * 02110-1301, USA.
19 */ 19 */
20 20
21#include <stddef.h>
22
23#include <linux/types.h> 21#include <linux/types.h>
24#include <linux/export.h> 22#include <linux/export.h>
25#include <linux/ceph/libceph.h> 23#include <linux/ceph/libceph.h>