aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-01-28 14:02:23 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2014-01-28 14:02:23 -0500
commitd891ea23d5203e5c47439b2a174f86a00b356a6c (patch)
tree3876cefcced9df5519f437cd8eb275cb979b93f6
parent08d21b5f93eb92a781daea71b6fcb3a340909141 (diff)
parent125d725c923527a85876c031028c7f55c28b74b3 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull ceph updates from Sage Weil: "This is a big batch. From Ilya we have: - rbd support for more than ~250 mapped devices (now uses same scheme that SCSI does for device major/minor numbering) - crush updates for new mapping behaviors (will be needed for coming erasure coding support, among other things) - preliminary support for tiered storage pools There is also a big series fixing a pile cephfs bugs with clustered MDSs from Yan Zheng, ACL support for cephfs from Guangliang Zhao, ceph fscache improvements from Li Wang, improved behavior when we get ENOSPC from Josh Durgin, some readv/writev improvements from Majianpeng, and the usual mix of small cleanups" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (76 commits) ceph: cast PAGE_SIZE to size_t in ceph_sync_write() ceph: fix dout() compile warnings in ceph_filemap_fault() libceph: support CEPH_FEATURE_OSD_CACHEPOOL feature libceph: follow redirect replies from osds libceph: rename ceph_osd_request::r_{oloc,oid} to r_base_{oloc,oid} libceph: follow {read,write}_tier fields on osd request submission libceph: add ceph_pg_pool_by_id() libceph: CEPH_OSD_FLAG_* enum update libceph: replace ceph_calc_ceph_pg() with ceph_oloc_oid_to_pg() libceph: introduce and start using oid abstraction libceph: rename MAX_OBJ_NAME_SIZE to CEPH_MAX_OID_NAME_LEN libceph: move ceph_file_layout helpers to ceph_fs.h libceph: start using oloc abstraction libceph: dout() is missing a newline libceph: add ceph_kv{malloc,free}() and switch to them libceph: support CEPH_FEATURE_EXPORT_PEER ceph: add imported caps when handling cap export message ceph: add open export target session helper ceph: remove exported caps when handling cap import message ceph: handle session flush message ...
-rw-r--r--Documentation/ABI/testing/sysfs-bus-rbd26
-rw-r--r--MAINTAINERS2
-rw-r--r--drivers/block/rbd.c303
-rw-r--r--fs/ceph/Kconfig13
-rw-r--r--fs/ceph/Makefile1
-rw-r--r--fs/ceph/acl.c332
-rw-r--r--fs/ceph/addr.c93
-rw-r--r--fs/ceph/cache.h13
-rw-r--r--fs/ceph/caps.c338
-rw-r--r--fs/ceph/dir.c16
-rw-r--r--fs/ceph/file.c437
-rw-r--r--fs/ceph/inode.c33
-rw-r--r--fs/ceph/ioctl.c8
-rw-r--r--fs/ceph/mds_client.c132
-rw-r--r--fs/ceph/mds_client.h2
-rw-r--r--fs/ceph/strings.c2
-rw-r--r--fs/ceph/super.c9
-rw-r--r--fs/ceph/super.h45
-rw-r--r--fs/ceph/xattr.c60
-rw-r--r--include/linux/ceph/buffer.h1
-rw-r--r--include/linux/ceph/ceph_features.h101
-rw-r--r--include/linux/ceph/ceph_fs.h36
-rw-r--r--include/linux/ceph/libceph.h19
-rw-r--r--include/linux/ceph/messenger.h13
-rw-r--r--include/linux/ceph/osd_client.h19
-rw-r--r--include/linux/ceph/osdmap.h66
-rw-r--r--include/linux/ceph/rados.h4
-rw-r--r--include/linux/crush/crush.h20
-rw-r--r--include/linux/crush/mapper.h3
-rw-r--r--net/ceph/buffer.c22
-rw-r--r--net/ceph/ceph_common.c24
-rw-r--r--net/ceph/crush/crush.c7
-rw-r--r--net/ceph/crush/mapper.c336
-rw-r--r--net/ceph/debugfs.c3
-rw-r--r--net/ceph/messenger.c32
-rw-r--r--net/ceph/mon_client.c8
-rw-r--r--net/ceph/osd_client.c283
-rw-r--r--net/ceph/osdmap.c78
38 files changed, 2261 insertions, 679 deletions
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index 0a306476424e..501adc2a9ec7 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -18,6 +18,28 @@ Removal of a device:
18 18
19 $ echo <dev-id> > /sys/bus/rbd/remove 19 $ echo <dev-id> > /sys/bus/rbd/remove
20 20
21What: /sys/bus/rbd/add_single_major
22Date: December 2013
23KernelVersion: 3.14
24Contact: Sage Weil <sage@inktank.com>
25Description: Available only if rbd module is inserted with single_major
26 parameter set to true.
27 Usage is the same as for /sys/bus/rbd/add. If present,
28 should be used instead of the latter: any attempts to use
29 /sys/bus/rbd/add if /sys/bus/rbd/add_single_major is
30 available will fail for backwards compatibility reasons.
31
32What: /sys/bus/rbd/remove_single_major
33Date: December 2013
34KernelVersion: 3.14
35Contact: Sage Weil <sage@inktank.com>
36Description: Available only if rbd module is inserted with single_major
37 parameter set to true.
38 Usage is the same as for /sys/bus/rbd/remove. If present,
39 should be used instead of the latter: any attempts to use
40 /sys/bus/rbd/remove if /sys/bus/rbd/remove_single_major is
41 available will fail for backwards compatibility reasons.
42
21Entries under /sys/bus/rbd/devices/<dev-id>/ 43Entries under /sys/bus/rbd/devices/<dev-id>/
22-------------------------------------------- 44--------------------------------------------
23 45
@@ -33,6 +55,10 @@ major
33 55
34 The block device major number. 56 The block device major number.
35 57
58minor
59
60 The block device minor number. (December 2013, since 3.14.)
61
36name 62name
37 63
38 The name of the rbd image. 64 The name of the rbd image.
diff --git a/MAINTAINERS b/MAINTAINERS
index 2507f38b208f..9bf651c57806 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -7075,7 +7075,7 @@ F: drivers/media/parport/*-qcam*
7075RADOS BLOCK DEVICE (RBD) 7075RADOS BLOCK DEVICE (RBD)
7076M: Yehuda Sadeh <yehuda@inktank.com> 7076M: Yehuda Sadeh <yehuda@inktank.com>
7077M: Sage Weil <sage@inktank.com> 7077M: Sage Weil <sage@inktank.com>
7078M: Alex Elder <elder@inktank.com> 7078M: Alex Elder <elder@kernel.org>
7079M: ceph-devel@vger.kernel.org 7079M: ceph-devel@vger.kernel.org
7080W: http://ceph.com/ 7080W: http://ceph.com/
7081T: git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git 7081T: git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index cb1db2979d3d..16cab6635163 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -41,6 +41,7 @@
41#include <linux/fs.h> 41#include <linux/fs.h>
42#include <linux/blkdev.h> 42#include <linux/blkdev.h>
43#include <linux/slab.h> 43#include <linux/slab.h>
44#include <linux/idr.h>
44 45
45#include "rbd_types.h" 46#include "rbd_types.h"
46 47
@@ -89,9 +90,9 @@ static int atomic_dec_return_safe(atomic_t *v)
89} 90}
90 91
91#define RBD_DRV_NAME "rbd" 92#define RBD_DRV_NAME "rbd"
92#define RBD_DRV_NAME_LONG "rbd (rados block device)"
93 93
94#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ 94#define RBD_MINORS_PER_MAJOR 256
95#define RBD_SINGLE_MAJOR_PART_SHIFT 4
95 96
96#define RBD_SNAP_DEV_NAME_PREFIX "snap_" 97#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
97#define RBD_MAX_SNAP_NAME_LEN \ 98#define RBD_MAX_SNAP_NAME_LEN \
@@ -323,6 +324,7 @@ struct rbd_device {
323 int dev_id; /* blkdev unique id */ 324 int dev_id; /* blkdev unique id */
324 325
325 int major; /* blkdev assigned major */ 326 int major; /* blkdev assigned major */
327 int minor;
326 struct gendisk *disk; /* blkdev's gendisk and rq */ 328 struct gendisk *disk; /* blkdev's gendisk and rq */
327 329
328 u32 image_format; /* Either 1 or 2 */ 330 u32 image_format; /* Either 1 or 2 */
@@ -386,6 +388,17 @@ static struct kmem_cache *rbd_img_request_cache;
386static struct kmem_cache *rbd_obj_request_cache; 388static struct kmem_cache *rbd_obj_request_cache;
387static struct kmem_cache *rbd_segment_name_cache; 389static struct kmem_cache *rbd_segment_name_cache;
388 390
391static int rbd_major;
392static DEFINE_IDA(rbd_dev_id_ida);
393
394/*
395 * Default to false for now, as single-major requires >= 0.75 version of
396 * userspace rbd utility.
397 */
398static bool single_major = false;
399module_param(single_major, bool, S_IRUGO);
400MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
401
389static int rbd_img_request_submit(struct rbd_img_request *img_request); 402static int rbd_img_request_submit(struct rbd_img_request *img_request);
390 403
391static void rbd_dev_device_release(struct device *dev); 404static void rbd_dev_device_release(struct device *dev);
@@ -394,18 +407,52 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf,
394 size_t count); 407 size_t count);
395static ssize_t rbd_remove(struct bus_type *bus, const char *buf, 408static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
396 size_t count); 409 size_t count);
410static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
411 size_t count);
412static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
413 size_t count);
397static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping); 414static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
398static void rbd_spec_put(struct rbd_spec *spec); 415static void rbd_spec_put(struct rbd_spec *spec);
399 416
417static int rbd_dev_id_to_minor(int dev_id)
418{
419 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
420}
421
422static int minor_to_rbd_dev_id(int minor)
423{
424 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
425}
426
400static BUS_ATTR(add, S_IWUSR, NULL, rbd_add); 427static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
401static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove); 428static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
429static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
430static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
402 431
403static struct attribute *rbd_bus_attrs[] = { 432static struct attribute *rbd_bus_attrs[] = {
404 &bus_attr_add.attr, 433 &bus_attr_add.attr,
405 &bus_attr_remove.attr, 434 &bus_attr_remove.attr,
435 &bus_attr_add_single_major.attr,
436 &bus_attr_remove_single_major.attr,
406 NULL, 437 NULL,
407}; 438};
408ATTRIBUTE_GROUPS(rbd_bus); 439
440static umode_t rbd_bus_is_visible(struct kobject *kobj,
441 struct attribute *attr, int index)
442{
443 if (!single_major &&
444 (attr == &bus_attr_add_single_major.attr ||
445 attr == &bus_attr_remove_single_major.attr))
446 return 0;
447
448 return attr->mode;
449}
450
451static const struct attribute_group rbd_bus_group = {
452 .attrs = rbd_bus_attrs,
453 .is_visible = rbd_bus_is_visible,
454};
455__ATTRIBUTE_GROUPS(rbd_bus);
409 456
410static struct bus_type rbd_bus_type = { 457static struct bus_type rbd_bus_type = {
411 .name = "rbd", 458 .name = "rbd",
@@ -1041,9 +1088,9 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1041 name_format = "%s.%012llx"; 1088 name_format = "%s.%012llx";
1042 if (rbd_dev->image_format == 2) 1089 if (rbd_dev->image_format == 2)
1043 name_format = "%s.%016llx"; 1090 name_format = "%s.%016llx";
1044 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format, 1091 ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
1045 rbd_dev->header.object_prefix, segment); 1092 rbd_dev->header.object_prefix, segment);
1046 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) { 1093 if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
1047 pr_err("error formatting segment name for #%llu (%d)\n", 1094 pr_err("error formatting segment name for #%llu (%d)\n",
1048 segment, ret); 1095 segment, ret);
1049 kfree(name); 1096 kfree(name);
@@ -1761,11 +1808,8 @@ static struct ceph_osd_request *rbd_osd_req_create(
1761 osd_req->r_callback = rbd_osd_req_callback; 1808 osd_req->r_callback = rbd_osd_req_callback;
1762 osd_req->r_priv = obj_request; 1809 osd_req->r_priv = obj_request;
1763 1810
1764 osd_req->r_oid_len = strlen(obj_request->object_name); 1811 osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1765 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); 1812 ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1766 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1767
1768 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1769 1813
1770 return osd_req; 1814 return osd_req;
1771} 1815}
@@ -1802,11 +1846,8 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1802 osd_req->r_callback = rbd_osd_req_callback; 1846 osd_req->r_callback = rbd_osd_req_callback;
1803 osd_req->r_priv = obj_request; 1847 osd_req->r_priv = obj_request;
1804 1848
1805 osd_req->r_oid_len = strlen(obj_request->object_name); 1849 osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1806 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); 1850 ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1807 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1808
1809 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1810 1851
1811 return osd_req; 1852 return osd_req;
1812} 1853}
@@ -2866,7 +2907,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2866 * Request sync osd watch/unwatch. The value of "start" determines 2907 * Request sync osd watch/unwatch. The value of "start" determines
2867 * whether a watch request is being initiated or torn down. 2908 * whether a watch request is being initiated or torn down.
2868 */ 2909 */
2869static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start) 2910static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2870{ 2911{
2871 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2912 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2872 struct rbd_obj_request *obj_request; 2913 struct rbd_obj_request *obj_request;
@@ -2941,6 +2982,22 @@ out_cancel:
2941 return ret; 2982 return ret;
2942} 2983}
2943 2984
2985static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
2986{
2987 return __rbd_dev_header_watch_sync(rbd_dev, true);
2988}
2989
2990static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
2991{
2992 int ret;
2993
2994 ret = __rbd_dev_header_watch_sync(rbd_dev, false);
2995 if (ret) {
2996 rbd_warn(rbd_dev, "unable to tear down watch request: %d\n",
2997 ret);
2998 }
2999}
3000
2944/* 3001/*
2945 * Synchronous osd object method call. Returns the number of bytes 3002 * Synchronous osd object method call. Returns the number of bytes
2946 * returned in the outbound buffer, or a negative error code. 3003 * returned in the outbound buffer, or a negative error code.
@@ -3388,14 +3445,18 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
3388 u64 segment_size; 3445 u64 segment_size;
3389 3446
3390 /* create gendisk info */ 3447 /* create gendisk info */
3391 disk = alloc_disk(RBD_MINORS_PER_MAJOR); 3448 disk = alloc_disk(single_major ?
3449 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3450 RBD_MINORS_PER_MAJOR);
3392 if (!disk) 3451 if (!disk)
3393 return -ENOMEM; 3452 return -ENOMEM;
3394 3453
3395 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", 3454 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3396 rbd_dev->dev_id); 3455 rbd_dev->dev_id);
3397 disk->major = rbd_dev->major; 3456 disk->major = rbd_dev->major;
3398 disk->first_minor = 0; 3457 disk->first_minor = rbd_dev->minor;
3458 if (single_major)
3459 disk->flags |= GENHD_FL_EXT_DEVT;
3399 disk->fops = &rbd_bd_ops; 3460 disk->fops = &rbd_bd_ops;
3400 disk->private_data = rbd_dev; 3461 disk->private_data = rbd_dev;
3401 3462
@@ -3467,7 +3528,14 @@ static ssize_t rbd_major_show(struct device *dev,
3467 return sprintf(buf, "%d\n", rbd_dev->major); 3528 return sprintf(buf, "%d\n", rbd_dev->major);
3468 3529
3469 return sprintf(buf, "(none)\n"); 3530 return sprintf(buf, "(none)\n");
3531}
3470 3532
3533static ssize_t rbd_minor_show(struct device *dev,
3534 struct device_attribute *attr, char *buf)
3535{
3536 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3537
3538 return sprintf(buf, "%d\n", rbd_dev->minor);
3471} 3539}
3472 3540
3473static ssize_t rbd_client_id_show(struct device *dev, 3541static ssize_t rbd_client_id_show(struct device *dev,
@@ -3589,6 +3657,7 @@ static ssize_t rbd_image_refresh(struct device *dev,
3589static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); 3657static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3590static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); 3658static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3591static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); 3659static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3660static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
3592static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); 3661static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3593static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); 3662static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3594static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); 3663static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
@@ -3602,6 +3671,7 @@ static struct attribute *rbd_attrs[] = {
3602 &dev_attr_size.attr, 3671 &dev_attr_size.attr,
3603 &dev_attr_features.attr, 3672 &dev_attr_features.attr,
3604 &dev_attr_major.attr, 3673 &dev_attr_major.attr,
3674 &dev_attr_minor.attr,
3605 &dev_attr_client_id.attr, 3675 &dev_attr_client_id.attr,
3606 &dev_attr_pool.attr, 3676 &dev_attr_pool.attr,
3607 &dev_attr_pool_id.attr, 3677 &dev_attr_pool_id.attr,
@@ -4372,21 +4442,29 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4372 device_unregister(&rbd_dev->dev); 4442 device_unregister(&rbd_dev->dev);
4373} 4443}
4374 4444
4375static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4376
4377/* 4445/*
4378 * Get a unique rbd identifier for the given new rbd_dev, and add 4446 * Get a unique rbd identifier for the given new rbd_dev, and add
4379 * the rbd_dev to the global list. The minimum rbd id is 1. 4447 * the rbd_dev to the global list.
4380 */ 4448 */
4381static void rbd_dev_id_get(struct rbd_device *rbd_dev) 4449static int rbd_dev_id_get(struct rbd_device *rbd_dev)
4382{ 4450{
4383 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max); 4451 int new_dev_id;
4452
4453 new_dev_id = ida_simple_get(&rbd_dev_id_ida,
4454 0, minor_to_rbd_dev_id(1 << MINORBITS),
4455 GFP_KERNEL);
4456 if (new_dev_id < 0)
4457 return new_dev_id;
4458
4459 rbd_dev->dev_id = new_dev_id;
4384 4460
4385 spin_lock(&rbd_dev_list_lock); 4461 spin_lock(&rbd_dev_list_lock);
4386 list_add_tail(&rbd_dev->node, &rbd_dev_list); 4462 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4387 spin_unlock(&rbd_dev_list_lock); 4463 spin_unlock(&rbd_dev_list_lock);
4388 dout("rbd_dev %p given dev id %llu\n", rbd_dev, 4464
4389 (unsigned long long) rbd_dev->dev_id); 4465 dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
4466
4467 return 0;
4390} 4468}
4391 4469
4392/* 4470/*
@@ -4395,49 +4473,13 @@ static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4395 */ 4473 */
4396static void rbd_dev_id_put(struct rbd_device *rbd_dev) 4474static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4397{ 4475{
4398 struct list_head *tmp;
4399 int rbd_id = rbd_dev->dev_id;
4400 int max_id;
4401
4402 rbd_assert(rbd_id > 0);
4403
4404 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4405 (unsigned long long) rbd_dev->dev_id);
4406 spin_lock(&rbd_dev_list_lock); 4476 spin_lock(&rbd_dev_list_lock);
4407 list_del_init(&rbd_dev->node); 4477 list_del_init(&rbd_dev->node);
4408
4409 /*
4410 * If the id being "put" is not the current maximum, there
4411 * is nothing special we need to do.
4412 */
4413 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4414 spin_unlock(&rbd_dev_list_lock);
4415 return;
4416 }
4417
4418 /*
4419 * We need to update the current maximum id. Search the
4420 * list to find out what it is. We're more likely to find
4421 * the maximum at the end, so search the list backward.
4422 */
4423 max_id = 0;
4424 list_for_each_prev(tmp, &rbd_dev_list) {
4425 struct rbd_device *rbd_dev;
4426
4427 rbd_dev = list_entry(tmp, struct rbd_device, node);
4428 if (rbd_dev->dev_id > max_id)
4429 max_id = rbd_dev->dev_id;
4430 }
4431 spin_unlock(&rbd_dev_list_lock); 4478 spin_unlock(&rbd_dev_list_lock);
4432 4479
4433 /* 4480 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4434 * The max id could have been updated by rbd_dev_id_get(), in 4481
4435 * which case it now accurately reflects the new maximum. 4482 dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
4436 * Be careful not to overwrite the maximum value in that
4437 * case.
4438 */
4439 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4440 dout(" max dev id has been reset\n");
4441} 4483}
4442 4484
4443/* 4485/*
@@ -4860,20 +4902,29 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4860{ 4902{
4861 int ret; 4903 int ret;
4862 4904
4863 /* generate unique id: find highest unique id, add one */ 4905 /* Get an id and fill in device name. */
4864 rbd_dev_id_get(rbd_dev); 4906
4907 ret = rbd_dev_id_get(rbd_dev);
4908 if (ret)
4909 return ret;
4865 4910
4866 /* Fill in the device name, now that we have its id. */
4867 BUILD_BUG_ON(DEV_NAME_LEN 4911 BUILD_BUG_ON(DEV_NAME_LEN
4868 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); 4912 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4869 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id); 4913 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4870 4914
4871 /* Get our block major device number. */ 4915 /* Record our major and minor device numbers. */
4872 4916
4873 ret = register_blkdev(0, rbd_dev->name); 4917 if (!single_major) {
4874 if (ret < 0) 4918 ret = register_blkdev(0, rbd_dev->name);
4875 goto err_out_id; 4919 if (ret < 0)
4876 rbd_dev->major = ret; 4920 goto err_out_id;
4921
4922 rbd_dev->major = ret;
4923 rbd_dev->minor = 0;
4924 } else {
4925 rbd_dev->major = rbd_major;
4926 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
4927 }
4877 4928
4878 /* Set up the blkdev mapping. */ 4929 /* Set up the blkdev mapping. */
4879 4930
@@ -4905,7 +4956,8 @@ err_out_mapping:
4905err_out_disk: 4956err_out_disk:
4906 rbd_free_disk(rbd_dev); 4957 rbd_free_disk(rbd_dev);
4907err_out_blkdev: 4958err_out_blkdev:
4908 unregister_blkdev(rbd_dev->major, rbd_dev->name); 4959 if (!single_major)
4960 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4909err_out_id: 4961err_out_id:
4910 rbd_dev_id_put(rbd_dev); 4962 rbd_dev_id_put(rbd_dev);
4911 rbd_dev_mapping_clear(rbd_dev); 4963 rbd_dev_mapping_clear(rbd_dev);
@@ -4961,7 +5013,6 @@ static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4961static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping) 5013static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4962{ 5014{
4963 int ret; 5015 int ret;
4964 int tmp;
4965 5016
4966 /* 5017 /*
4967 * Get the id from the image id object. Unless there's an 5018 * Get the id from the image id object. Unless there's an
@@ -4980,7 +5031,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4980 goto err_out_format; 5031 goto err_out_format;
4981 5032
4982 if (mapping) { 5033 if (mapping) {
4983 ret = rbd_dev_header_watch_sync(rbd_dev, true); 5034 ret = rbd_dev_header_watch_sync(rbd_dev);
4984 if (ret) 5035 if (ret)
4985 goto out_header_name; 5036 goto out_header_name;
4986 } 5037 }
@@ -5007,12 +5058,8 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
5007err_out_probe: 5058err_out_probe:
5008 rbd_dev_unprobe(rbd_dev); 5059 rbd_dev_unprobe(rbd_dev);
5009err_out_watch: 5060err_out_watch:
5010 if (mapping) { 5061 if (mapping)
5011 tmp = rbd_dev_header_watch_sync(rbd_dev, false); 5062 rbd_dev_header_unwatch_sync(rbd_dev);
5012 if (tmp)
5013 rbd_warn(rbd_dev, "unable to tear down "
5014 "watch request (%d)\n", tmp);
5015 }
5016out_header_name: 5063out_header_name:
5017 kfree(rbd_dev->header_name); 5064 kfree(rbd_dev->header_name);
5018 rbd_dev->header_name = NULL; 5065 rbd_dev->header_name = NULL;
@@ -5026,9 +5073,9 @@ err_out_format:
5026 return ret; 5073 return ret;
5027} 5074}
5028 5075
5029static ssize_t rbd_add(struct bus_type *bus, 5076static ssize_t do_rbd_add(struct bus_type *bus,
5030 const char *buf, 5077 const char *buf,
5031 size_t count) 5078 size_t count)
5032{ 5079{
5033 struct rbd_device *rbd_dev = NULL; 5080 struct rbd_device *rbd_dev = NULL;
5034 struct ceph_options *ceph_opts = NULL; 5081 struct ceph_options *ceph_opts = NULL;
@@ -5090,6 +5137,12 @@ static ssize_t rbd_add(struct bus_type *bus,
5090 5137
5091 rc = rbd_dev_device_setup(rbd_dev); 5138 rc = rbd_dev_device_setup(rbd_dev);
5092 if (rc) { 5139 if (rc) {
5140 /*
5141 * rbd_dev_header_unwatch_sync() can't be moved into
5142 * rbd_dev_image_release() without refactoring, see
5143 * commit 1f3ef78861ac.
5144 */
5145 rbd_dev_header_unwatch_sync(rbd_dev);
5093 rbd_dev_image_release(rbd_dev); 5146 rbd_dev_image_release(rbd_dev);
5094 goto err_out_module; 5147 goto err_out_module;
5095 } 5148 }
@@ -5110,6 +5163,23 @@ err_out_module:
5110 return (ssize_t)rc; 5163 return (ssize_t)rc;
5111} 5164}
5112 5165
5166static ssize_t rbd_add(struct bus_type *bus,
5167 const char *buf,
5168 size_t count)
5169{
5170 if (single_major)
5171 return -EINVAL;
5172
5173 return do_rbd_add(bus, buf, count);
5174}
5175
5176static ssize_t rbd_add_single_major(struct bus_type *bus,
5177 const char *buf,
5178 size_t count)
5179{
5180 return do_rbd_add(bus, buf, count);
5181}
5182
5113static void rbd_dev_device_release(struct device *dev) 5183static void rbd_dev_device_release(struct device *dev)
5114{ 5184{
5115 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5185 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
@@ -5117,8 +5187,8 @@ static void rbd_dev_device_release(struct device *dev)
5117 rbd_free_disk(rbd_dev); 5187 rbd_free_disk(rbd_dev);
5118 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 5188 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5119 rbd_dev_mapping_clear(rbd_dev); 5189 rbd_dev_mapping_clear(rbd_dev);
5120 unregister_blkdev(rbd_dev->major, rbd_dev->name); 5190 if (!single_major)
5121 rbd_dev->major = 0; 5191 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5122 rbd_dev_id_put(rbd_dev); 5192 rbd_dev_id_put(rbd_dev);
5123 rbd_dev_mapping_clear(rbd_dev); 5193 rbd_dev_mapping_clear(rbd_dev);
5124} 5194}
@@ -5149,9 +5219,9 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5149 } 5219 }
5150} 5220}
5151 5221
5152static ssize_t rbd_remove(struct bus_type *bus, 5222static ssize_t do_rbd_remove(struct bus_type *bus,
5153 const char *buf, 5223 const char *buf,
5154 size_t count) 5224 size_t count)
5155{ 5225{
5156 struct rbd_device *rbd_dev = NULL; 5226 struct rbd_device *rbd_dev = NULL;
5157 struct list_head *tmp; 5227 struct list_head *tmp;
@@ -5191,16 +5261,14 @@ static ssize_t rbd_remove(struct bus_type *bus,
5191 if (ret < 0 || already) 5261 if (ret < 0 || already)
5192 return ret; 5262 return ret;
5193 5263
5194 ret = rbd_dev_header_watch_sync(rbd_dev, false); 5264 rbd_dev_header_unwatch_sync(rbd_dev);
5195 if (ret)
5196 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
5197
5198 /* 5265 /*
5199 * flush remaining watch callbacks - these must be complete 5266 * flush remaining watch callbacks - these must be complete
5200 * before the osd_client is shutdown 5267 * before the osd_client is shutdown
5201 */ 5268 */
5202 dout("%s: flushing notifies", __func__); 5269 dout("%s: flushing notifies", __func__);
5203 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); 5270 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
5271
5204 /* 5272 /*
5205 * Don't free anything from rbd_dev->disk until after all 5273 * Don't free anything from rbd_dev->disk until after all
5206 * notifies are completely processed. Otherwise 5274 * notifies are completely processed. Otherwise
@@ -5214,6 +5282,23 @@ static ssize_t rbd_remove(struct bus_type *bus,
5214 return count; 5282 return count;
5215} 5283}
5216 5284
5285static ssize_t rbd_remove(struct bus_type *bus,
5286 const char *buf,
5287 size_t count)
5288{
5289 if (single_major)
5290 return -EINVAL;
5291
5292 return do_rbd_remove(bus, buf, count);
5293}
5294
5295static ssize_t rbd_remove_single_major(struct bus_type *bus,
5296 const char *buf,
5297 size_t count)
5298{
5299 return do_rbd_remove(bus, buf, count);
5300}
5301
5217/* 5302/*
5218 * create control files in sysfs 5303 * create control files in sysfs
5219 * /sys/bus/rbd/... 5304 * /sys/bus/rbd/...
@@ -5259,7 +5344,7 @@ static int rbd_slab_init(void)
5259 5344
5260 rbd_assert(!rbd_segment_name_cache); 5345 rbd_assert(!rbd_segment_name_cache);
5261 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name", 5346 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5262 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL); 5347 CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
5263 if (rbd_segment_name_cache) 5348 if (rbd_segment_name_cache)
5264 return 0; 5349 return 0;
5265out_err: 5350out_err:
@@ -5295,24 +5380,45 @@ static int __init rbd_init(void)
5295 5380
5296 if (!libceph_compatible(NULL)) { 5381 if (!libceph_compatible(NULL)) {
5297 rbd_warn(NULL, "libceph incompatibility (quitting)"); 5382 rbd_warn(NULL, "libceph incompatibility (quitting)");
5298
5299 return -EINVAL; 5383 return -EINVAL;
5300 } 5384 }
5385
5301 rc = rbd_slab_init(); 5386 rc = rbd_slab_init();
5302 if (rc) 5387 if (rc)
5303 return rc; 5388 return rc;
5389
5390 if (single_major) {
5391 rbd_major = register_blkdev(0, RBD_DRV_NAME);
5392 if (rbd_major < 0) {
5393 rc = rbd_major;
5394 goto err_out_slab;
5395 }
5396 }
5397
5304 rc = rbd_sysfs_init(); 5398 rc = rbd_sysfs_init();
5305 if (rc) 5399 if (rc)
5306 rbd_slab_exit(); 5400 goto err_out_blkdev;
5401
5402 if (single_major)
5403 pr_info("loaded (major %d)\n", rbd_major);
5307 else 5404 else
5308 pr_info("loaded " RBD_DRV_NAME_LONG "\n"); 5405 pr_info("loaded\n");
5406
5407 return 0;
5309 5408
5409err_out_blkdev:
5410 if (single_major)
5411 unregister_blkdev(rbd_major, RBD_DRV_NAME);
5412err_out_slab:
5413 rbd_slab_exit();
5310 return rc; 5414 return rc;
5311} 5415}
5312 5416
5313static void __exit rbd_exit(void) 5417static void __exit rbd_exit(void)
5314{ 5418{
5315 rbd_sysfs_cleanup(); 5419 rbd_sysfs_cleanup();
5420 if (single_major)
5421 unregister_blkdev(rbd_major, RBD_DRV_NAME);
5316 rbd_slab_exit(); 5422 rbd_slab_exit();
5317} 5423}
5318 5424
@@ -5322,9 +5428,8 @@ module_exit(rbd_exit);
5322MODULE_AUTHOR("Alex Elder <elder@inktank.com>"); 5428MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
5323MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); 5429MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5324MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); 5430MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5325MODULE_DESCRIPTION("rados block device");
5326
5327/* following authorship retained from original osdblk.c */ 5431/* following authorship retained from original osdblk.c */
5328MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); 5432MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5329 5433
5434MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
5330MODULE_LICENSE("GPL"); 5435MODULE_LICENSE("GPL");
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
index ac9a2ef5bb9b..264e9bf83ff3 100644
--- a/fs/ceph/Kconfig
+++ b/fs/ceph/Kconfig
@@ -25,3 +25,16 @@ config CEPH_FSCACHE
25 caching support for Ceph clients using FS-Cache 25 caching support for Ceph clients using FS-Cache
26 26
27endif 27endif
28
29config CEPH_FS_POSIX_ACL
30 bool "Ceph POSIX Access Control Lists"
31 depends on CEPH_FS
32 select FS_POSIX_ACL
33 help
34 POSIX Access Control Lists (ACLs) support permissions for users and
35 groups beyond the owner/group/world scheme.
36
37 To learn more about Access Control Lists, visit the POSIX ACLs for
38 Linux website <http://acl.bestbits.at/>.
39
40 If you don't know what Access Control Lists are, say N
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index 32e30106a2f0..85a4230b9bff 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -10,3 +10,4 @@ ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
10 debugfs.o 10 debugfs.o
11 11
12ceph-$(CONFIG_CEPH_FSCACHE) += cache.o 12ceph-$(CONFIG_CEPH_FSCACHE) += cache.o
13ceph-$(CONFIG_CEPH_FS_POSIX_ACL) += acl.o
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
new file mode 100644
index 000000000000..64fddbc1d17b
--- /dev/null
+++ b/fs/ceph/acl.c
@@ -0,0 +1,332 @@
1/*
2 * linux/fs/ceph/acl.c
3 *
4 * Copyright (C) 2013 Guangliang Zhao, <lucienchao@gmail.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public
8 * License v2 as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public
16 * License along with this program; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 021110-1307, USA.
19 */
20
21#include <linux/ceph/ceph_debug.h>
22#include <linux/fs.h>
23#include <linux/string.h>
24#include <linux/xattr.h>
25#include <linux/posix_acl_xattr.h>
26#include <linux/posix_acl.h>
27#include <linux/sched.h>
28#include <linux/slab.h>
29
30#include "super.h"
31
32static inline void ceph_set_cached_acl(struct inode *inode,
33 int type, struct posix_acl *acl)
34{
35 struct ceph_inode_info *ci = ceph_inode(inode);
36
37 spin_lock(&ci->i_ceph_lock);
38 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0))
39 set_cached_acl(inode, type, acl);
40 spin_unlock(&ci->i_ceph_lock);
41}
42
43static inline struct posix_acl *ceph_get_cached_acl(struct inode *inode,
44 int type)
45{
46 struct ceph_inode_info *ci = ceph_inode(inode);
47 struct posix_acl *acl = ACL_NOT_CACHED;
48
49 spin_lock(&ci->i_ceph_lock);
50 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0))
51 acl = get_cached_acl(inode, type);
52 spin_unlock(&ci->i_ceph_lock);
53
54 return acl;
55}
56
57void ceph_forget_all_cached_acls(struct inode *inode)
58{
59 forget_all_cached_acls(inode);
60}
61
62struct posix_acl *ceph_get_acl(struct inode *inode, int type)
63{
64 int size;
65 const char *name;
66 char *value = NULL;
67 struct posix_acl *acl;
68
69 if (!IS_POSIXACL(inode))
70 return NULL;
71
72 acl = ceph_get_cached_acl(inode, type);
73 if (acl != ACL_NOT_CACHED)
74 return acl;
75
76 switch (type) {
77 case ACL_TYPE_ACCESS:
78 name = POSIX_ACL_XATTR_ACCESS;
79 break;
80 case ACL_TYPE_DEFAULT:
81 name = POSIX_ACL_XATTR_DEFAULT;
82 break;
83 default:
84 BUG();
85 }
86
87 size = __ceph_getxattr(inode, name, "", 0);
88 if (size > 0) {
89 value = kzalloc(size, GFP_NOFS);
90 if (!value)
91 return ERR_PTR(-ENOMEM);
92 size = __ceph_getxattr(inode, name, value, size);
93 }
94
95 if (size > 0)
96 acl = posix_acl_from_xattr(&init_user_ns, value, size);
97 else if (size == -ERANGE || size == -ENODATA || size == 0)
98 acl = NULL;
99 else
100 acl = ERR_PTR(-EIO);
101
102 kfree(value);
103
104 if (!IS_ERR(acl))
105 ceph_set_cached_acl(inode, type, acl);
106
107 return acl;
108}
109
110static int ceph_set_acl(struct dentry *dentry, struct inode *inode,
111 struct posix_acl *acl, int type)
112{
113 int ret = 0, size = 0;
114 const char *name = NULL;
115 char *value = NULL;
116 struct iattr newattrs;
117 umode_t new_mode = inode->i_mode, old_mode = inode->i_mode;
118
119 if (acl) {
120 ret = posix_acl_valid(acl);
121 if (ret < 0)
122 goto out;
123 }
124
125 switch (type) {
126 case ACL_TYPE_ACCESS:
127 name = POSIX_ACL_XATTR_ACCESS;
128 if (acl) {
129 ret = posix_acl_equiv_mode(acl, &new_mode);
130 if (ret < 0)
131 goto out;
132 if (ret == 0)
133 acl = NULL;
134 }
135 break;
136 case ACL_TYPE_DEFAULT:
137 if (!S_ISDIR(inode->i_mode)) {
138 ret = acl ? -EINVAL : 0;
139 goto out;
140 }
141 name = POSIX_ACL_XATTR_DEFAULT;
142 break;
143 default:
144 ret = -EINVAL;
145 goto out;
146 }
147
148 if (acl) {
149 size = posix_acl_xattr_size(acl->a_count);
150 value = kmalloc(size, GFP_NOFS);
151 if (!value) {
152 ret = -ENOMEM;
153 goto out;
154 }
155
156 ret = posix_acl_to_xattr(&init_user_ns, acl, value, size);
157 if (ret < 0)
158 goto out_free;
159 }
160
161 if (new_mode != old_mode) {
162 newattrs.ia_mode = new_mode;
163 newattrs.ia_valid = ATTR_MODE;
164 ret = ceph_setattr(dentry, &newattrs);
165 if (ret)
166 goto out_free;
167 }
168
169 if (value)
170 ret = __ceph_setxattr(dentry, name, value, size, 0);
171 else
172 ret = __ceph_removexattr(dentry, name);
173
174 if (ret) {
175 if (new_mode != old_mode) {
176 newattrs.ia_mode = old_mode;
177 newattrs.ia_valid = ATTR_MODE;
178 ceph_setattr(dentry, &newattrs);
179 }
180 goto out_free;
181 }
182
183 ceph_set_cached_acl(inode, type, acl);
184
185out_free:
186 kfree(value);
187out:
188 return ret;
189}
190
191int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir)
192{
193 struct posix_acl *acl = NULL;
194 int ret = 0;
195
196 if (!S_ISLNK(inode->i_mode)) {
197 if (IS_POSIXACL(dir)) {
198 acl = ceph_get_acl(dir, ACL_TYPE_DEFAULT);
199 if (IS_ERR(acl)) {
200 ret = PTR_ERR(acl);
201 goto out;
202 }
203 }
204
205 if (!acl)
206 inode->i_mode &= ~current_umask();
207 }
208
209 if (IS_POSIXACL(dir) && acl) {
210 if (S_ISDIR(inode->i_mode)) {
211 ret = ceph_set_acl(dentry, inode, acl,
212 ACL_TYPE_DEFAULT);
213 if (ret)
214 goto out_release;
215 }
216 ret = posix_acl_create(&acl, GFP_NOFS, &inode->i_mode);
217 if (ret < 0)
218 goto out;
219 else if (ret > 0)
220 ret = ceph_set_acl(dentry, inode, acl, ACL_TYPE_ACCESS);
221 else
222 cache_no_acl(inode);
223 } else {
224 cache_no_acl(inode);
225 }
226
227out_release:
228 posix_acl_release(acl);
229out:
230 return ret;
231}
232
233int ceph_acl_chmod(struct dentry *dentry, struct inode *inode)
234{
235 struct posix_acl *acl;
236 int ret = 0;
237
238 if (S_ISLNK(inode->i_mode)) {
239 ret = -EOPNOTSUPP;
240 goto out;
241 }
242
243 if (!IS_POSIXACL(inode))
244 goto out;
245
246 acl = ceph_get_acl(inode, ACL_TYPE_ACCESS);
247 if (IS_ERR_OR_NULL(acl)) {
248 ret = PTR_ERR(acl);
249 goto out;
250 }
251
252 ret = posix_acl_chmod(&acl, GFP_KERNEL, inode->i_mode);
253 if (ret)
254 goto out;
255 ret = ceph_set_acl(dentry, inode, acl, ACL_TYPE_ACCESS);
256 posix_acl_release(acl);
257out:
258 return ret;
259}
260
261static int ceph_xattr_acl_get(struct dentry *dentry, const char *name,
262 void *value, size_t size, int type)
263{
264 struct posix_acl *acl;
265 int ret = 0;
266
267 if (!IS_POSIXACL(dentry->d_inode))
268 return -EOPNOTSUPP;
269
270 acl = ceph_get_acl(dentry->d_inode, type);
271 if (IS_ERR(acl))
272 return PTR_ERR(acl);
273 if (acl == NULL)
274 return -ENODATA;
275
276 ret = posix_acl_to_xattr(&init_user_ns, acl, value, size);
277 posix_acl_release(acl);
278
279 return ret;
280}
281
282static int ceph_xattr_acl_set(struct dentry *dentry, const char *name,
283 const void *value, size_t size, int flags, int type)
284{
285 int ret = 0;
286 struct posix_acl *acl = NULL;
287
288 if (!inode_owner_or_capable(dentry->d_inode)) {
289 ret = -EPERM;
290 goto out;
291 }
292
293 if (!IS_POSIXACL(dentry->d_inode)) {
294 ret = -EOPNOTSUPP;
295 goto out;
296 }
297
298 if (value) {
299 acl = posix_acl_from_xattr(&init_user_ns, value, size);
300 if (IS_ERR(acl)) {
301 ret = PTR_ERR(acl);
302 goto out;
303 }
304
305 if (acl) {
306 ret = posix_acl_valid(acl);
307 if (ret)
308 goto out_release;
309 }
310 }
311
312 ret = ceph_set_acl(dentry, dentry->d_inode, acl, type);
313
314out_release:
315 posix_acl_release(acl);
316out:
317 return ret;
318}
319
320const struct xattr_handler ceph_xattr_acl_default_handler = {
321 .prefix = POSIX_ACL_XATTR_DEFAULT,
322 .flags = ACL_TYPE_DEFAULT,
323 .get = ceph_xattr_acl_get,
324 .set = ceph_xattr_acl_set,
325};
326
327const struct xattr_handler ceph_xattr_acl_access_handler = {
328 .prefix = POSIX_ACL_XATTR_ACCESS,
329 .flags = ACL_TYPE_ACCESS,
330 .get = ceph_xattr_acl_get,
331 .set = ceph_xattr_acl_set,
332};
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index ec3ba43b9faa..b53278c9fd97 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -209,6 +209,7 @@ static int readpage_nounlock(struct file *filp, struct page *page)
209 err = 0; 209 err = 0;
210 if (err < 0) { 210 if (err < 0) {
211 SetPageError(page); 211 SetPageError(page);
212 ceph_fscache_readpage_cancel(inode, page);
212 goto out; 213 goto out;
213 } else { 214 } else {
214 if (err < PAGE_CACHE_SIZE) { 215 if (err < PAGE_CACHE_SIZE) {
@@ -256,6 +257,8 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
256 for (i = 0; i < num_pages; i++) { 257 for (i = 0; i < num_pages; i++) {
257 struct page *page = osd_data->pages[i]; 258 struct page *page = osd_data->pages[i];
258 259
260 if (rc < 0)
261 goto unlock;
259 if (bytes < (int)PAGE_CACHE_SIZE) { 262 if (bytes < (int)PAGE_CACHE_SIZE) {
260 /* zero (remainder of) page */ 263 /* zero (remainder of) page */
261 int s = bytes < 0 ? 0 : bytes; 264 int s = bytes < 0 ? 0 : bytes;
@@ -266,6 +269,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
266 flush_dcache_page(page); 269 flush_dcache_page(page);
267 SetPageUptodate(page); 270 SetPageUptodate(page);
268 ceph_readpage_to_fscache(inode, page); 271 ceph_readpage_to_fscache(inode, page);
272unlock:
269 unlock_page(page); 273 unlock_page(page);
270 page_cache_release(page); 274 page_cache_release(page);
271 bytes -= PAGE_CACHE_SIZE; 275 bytes -= PAGE_CACHE_SIZE;
@@ -1207,6 +1211,41 @@ const struct address_space_operations ceph_aops = {
1207/* 1211/*
1208 * vm ops 1212 * vm ops
1209 */ 1213 */
1214static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1215{
1216 struct inode *inode = file_inode(vma->vm_file);
1217 struct ceph_inode_info *ci = ceph_inode(inode);
1218 struct ceph_file_info *fi = vma->vm_file->private_data;
1219 loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT;
1220 int want, got, ret;
1221
1222 dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n",
1223 inode, ceph_vinop(inode), off, (size_t)PAGE_CACHE_SIZE);
1224 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1225 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
1226 else
1227 want = CEPH_CAP_FILE_CACHE;
1228 while (1) {
1229 got = 0;
1230 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
1231 if (ret == 0)
1232 break;
1233 if (ret != -ERESTARTSYS) {
1234 WARN_ON(1);
1235 return VM_FAULT_SIGBUS;
1236 }
1237 }
1238 dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
1239 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got));
1240
1241 ret = filemap_fault(vma, vmf);
1242
1243 dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
1244 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret);
1245 ceph_put_cap_refs(ci, got);
1246
1247 return ret;
1248}
1210 1249
1211/* 1250/*
1212 * Reuse write_begin here for simplicity. 1251 * Reuse write_begin here for simplicity.
@@ -1214,23 +1253,41 @@ const struct address_space_operations ceph_aops = {
1214static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) 1253static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1215{ 1254{
1216 struct inode *inode = file_inode(vma->vm_file); 1255 struct inode *inode = file_inode(vma->vm_file);
1217 struct page *page = vmf->page; 1256 struct ceph_inode_info *ci = ceph_inode(inode);
1257 struct ceph_file_info *fi = vma->vm_file->private_data;
1218 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 1258 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
1259 struct page *page = vmf->page;
1219 loff_t off = page_offset(page); 1260 loff_t off = page_offset(page);
1220 loff_t size, len; 1261 loff_t size = i_size_read(inode);
1221 int ret; 1262 size_t len;
1222 1263 int want, got, ret;
1223 /* Update time before taking page lock */
1224 file_update_time(vma->vm_file);
1225 1264
1226 size = i_size_read(inode);
1227 if (off + PAGE_CACHE_SIZE <= size) 1265 if (off + PAGE_CACHE_SIZE <= size)
1228 len = PAGE_CACHE_SIZE; 1266 len = PAGE_CACHE_SIZE;
1229 else 1267 else
1230 len = size & ~PAGE_CACHE_MASK; 1268 len = size & ~PAGE_CACHE_MASK;
1231 1269
1232 dout("page_mkwrite %p %llu~%llu page %p idx %lu\n", inode, 1270 dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n",
1233 off, len, page, page->index); 1271 inode, ceph_vinop(inode), off, len, size);
1272 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1273 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1274 else
1275 want = CEPH_CAP_FILE_BUFFER;
1276 while (1) {
1277 got = 0;
1278 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len);
1279 if (ret == 0)
1280 break;
1281 if (ret != -ERESTARTSYS) {
1282 WARN_ON(1);
1283 return VM_FAULT_SIGBUS;
1284 }
1285 }
1286 dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
1287 inode, off, len, ceph_cap_string(got));
1288
1289 /* Update time before taking page lock */
1290 file_update_time(vma->vm_file);
1234 1291
1235 lock_page(page); 1292 lock_page(page);
1236 1293
@@ -1252,14 +1309,26 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1252 ret = VM_FAULT_SIGBUS; 1309 ret = VM_FAULT_SIGBUS;
1253 } 1310 }
1254out: 1311out:
1255 dout("page_mkwrite %p %llu~%llu = %d\n", inode, off, len, ret); 1312 if (ret != VM_FAULT_LOCKED) {
1256 if (ret != VM_FAULT_LOCKED)
1257 unlock_page(page); 1313 unlock_page(page);
1314 } else {
1315 int dirty;
1316 spin_lock(&ci->i_ceph_lock);
1317 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
1318 spin_unlock(&ci->i_ceph_lock);
1319 if (dirty)
1320 __mark_inode_dirty(inode, dirty);
1321 }
1322
1323 dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n",
1324 inode, off, len, ceph_cap_string(got), ret);
1325 ceph_put_cap_refs(ci, got);
1326
1258 return ret; 1327 return ret;
1259} 1328}
1260 1329
1261static struct vm_operations_struct ceph_vmops = { 1330static struct vm_operations_struct ceph_vmops = {
1262 .fault = filemap_fault, 1331 .fault = ceph_filemap_fault,
1263 .page_mkwrite = ceph_page_mkwrite, 1332 .page_mkwrite = ceph_page_mkwrite,
1264 .remap_pages = generic_file_remap_pages, 1333 .remap_pages = generic_file_remap_pages,
1265}; 1334};
diff --git a/fs/ceph/cache.h b/fs/ceph/cache.h
index ba949408a336..da95f61b7a09 100644
--- a/fs/ceph/cache.h
+++ b/fs/ceph/cache.h
@@ -67,6 +67,14 @@ static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp)
67 return fscache_maybe_release_page(ci->fscache, page, gfp); 67 return fscache_maybe_release_page(ci->fscache, page, gfp);
68} 68}
69 69
70static inline void ceph_fscache_readpage_cancel(struct inode *inode,
71 struct page *page)
72{
73 struct ceph_inode_info *ci = ceph_inode(inode);
74 if (fscache_cookie_valid(ci->fscache) && PageFsCache(page))
75 __fscache_uncache_page(ci->fscache, page);
76}
77
70static inline void ceph_fscache_readpages_cancel(struct inode *inode, 78static inline void ceph_fscache_readpages_cancel(struct inode *inode,
71 struct list_head *pages) 79 struct list_head *pages)
72{ 80{
@@ -145,6 +153,11 @@ static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp)
145 return 1; 153 return 1;
146} 154}
147 155
156static inline void ceph_fscache_readpage_cancel(struct inode *inode,
157 struct page *page)
158{
159}
160
148static inline void ceph_fscache_readpages_cancel(struct inode *inode, 161static inline void ceph_fscache_readpages_cancel(struct inode *inode,
149 struct list_head *pages) 162 struct list_head *pages)
150{ 163{
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 3c0a4bd74996..17543383545c 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -555,21 +555,34 @@ retry:
555 cap->ci = ci; 555 cap->ci = ci;
556 __insert_cap_node(ci, cap); 556 __insert_cap_node(ci, cap);
557 557
558 /* clear out old exporting info? (i.e. on cap import) */
559 if (ci->i_cap_exporting_mds == mds) {
560 ci->i_cap_exporting_issued = 0;
561 ci->i_cap_exporting_mseq = 0;
562 ci->i_cap_exporting_mds = -1;
563 }
564
565 /* add to session cap list */ 558 /* add to session cap list */
566 cap->session = session; 559 cap->session = session;
567 spin_lock(&session->s_cap_lock); 560 spin_lock(&session->s_cap_lock);
568 list_add_tail(&cap->session_caps, &session->s_caps); 561 list_add_tail(&cap->session_caps, &session->s_caps);
569 session->s_nr_caps++; 562 session->s_nr_caps++;
570 spin_unlock(&session->s_cap_lock); 563 spin_unlock(&session->s_cap_lock);
571 } else if (new_cap) 564 } else {
572 ceph_put_cap(mdsc, new_cap); 565 if (new_cap)
566 ceph_put_cap(mdsc, new_cap);
567
568 /*
569 * auth mds of the inode changed. we received the cap export
570 * message, but still haven't received the cap import message.
571 * handle_cap_export() updated the new auth MDS' cap.
572 *
573 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing
574 * a message that was send before the cap import message. So
575 * don't remove caps.
576 */
577 if (ceph_seq_cmp(seq, cap->seq) <= 0) {
578 WARN_ON(cap != ci->i_auth_cap);
579 WARN_ON(cap->cap_id != cap_id);
580 seq = cap->seq;
581 mseq = cap->mseq;
582 issued |= cap->issued;
583 flags |= CEPH_CAP_FLAG_AUTH;
584 }
585 }
573 586
574 if (!ci->i_snap_realm) { 587 if (!ci->i_snap_realm) {
575 /* 588 /*
@@ -611,15 +624,9 @@ retry:
611 if (ci->i_auth_cap == NULL || 624 if (ci->i_auth_cap == NULL ||
612 ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) 625 ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0)
613 ci->i_auth_cap = cap; 626 ci->i_auth_cap = cap;
614 } else if (ci->i_auth_cap == cap) { 627 ci->i_cap_exporting_issued = 0;
615 ci->i_auth_cap = NULL; 628 } else {
616 spin_lock(&mdsc->cap_dirty_lock); 629 WARN_ON(ci->i_auth_cap == cap);
617 if (!list_empty(&ci->i_dirty_item)) {
618 dout(" moving %p to cap_dirty_migrating\n", inode);
619 list_move(&ci->i_dirty_item,
620 &mdsc->cap_dirty_migrating);
621 }
622 spin_unlock(&mdsc->cap_dirty_lock);
623 } 630 }
624 631
625 dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n", 632 dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n",
@@ -628,7 +635,7 @@ retry:
628 cap->cap_id = cap_id; 635 cap->cap_id = cap_id;
629 cap->issued = issued; 636 cap->issued = issued;
630 cap->implemented |= issued; 637 cap->implemented |= issued;
631 if (mseq > cap->mseq) 638 if (ceph_seq_cmp(mseq, cap->mseq) > 0)
632 cap->mds_wanted = wanted; 639 cap->mds_wanted = wanted;
633 else 640 else
634 cap->mds_wanted |= wanted; 641 cap->mds_wanted |= wanted;
@@ -816,7 +823,7 @@ int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
816 823
817 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 824 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
818 cap = rb_entry(p, struct ceph_cap, ci_node); 825 cap = rb_entry(p, struct ceph_cap, ci_node);
819 if (cap != ocap && __cap_is_valid(cap) && 826 if (cap != ocap &&
820 (cap->implemented & ~cap->issued & mask)) 827 (cap->implemented & ~cap->issued & mask))
821 return 1; 828 return 1;
822 } 829 }
@@ -888,7 +895,19 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)
888 */ 895 */
889static int __ceph_is_any_caps(struct ceph_inode_info *ci) 896static int __ceph_is_any_caps(struct ceph_inode_info *ci)
890{ 897{
891 return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_mds >= 0; 898 return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_issued;
899}
900
901int ceph_is_any_caps(struct inode *inode)
902{
903 struct ceph_inode_info *ci = ceph_inode(inode);
904 int ret;
905
906 spin_lock(&ci->i_ceph_lock);
907 ret = __ceph_is_any_caps(ci);
908 spin_unlock(&ci->i_ceph_lock);
909
910 return ret;
892} 911}
893 912
894/* 913/*
@@ -1383,13 +1402,10 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
1383 ci->i_snap_realm->cached_context); 1402 ci->i_snap_realm->cached_context);
1384 dout(" inode %p now dirty snapc %p auth cap %p\n", 1403 dout(" inode %p now dirty snapc %p auth cap %p\n",
1385 &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); 1404 &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
1405 WARN_ON(!ci->i_auth_cap);
1386 BUG_ON(!list_empty(&ci->i_dirty_item)); 1406 BUG_ON(!list_empty(&ci->i_dirty_item));
1387 spin_lock(&mdsc->cap_dirty_lock); 1407 spin_lock(&mdsc->cap_dirty_lock);
1388 if (ci->i_auth_cap) 1408 list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
1389 list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
1390 else
1391 list_add(&ci->i_dirty_item,
1392 &mdsc->cap_dirty_migrating);
1393 spin_unlock(&mdsc->cap_dirty_lock); 1409 spin_unlock(&mdsc->cap_dirty_lock);
1394 if (ci->i_flushing_caps == 0) { 1410 if (ci->i_flushing_caps == 0) {
1395 ihold(inode); 1411 ihold(inode);
@@ -1735,13 +1751,12 @@ ack:
1735/* 1751/*
1736 * Try to flush dirty caps back to the auth mds. 1752 * Try to flush dirty caps back to the auth mds.
1737 */ 1753 */
1738static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session, 1754static int try_flush_caps(struct inode *inode, unsigned *flush_tid)
1739 unsigned *flush_tid)
1740{ 1755{
1741 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 1756 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
1742 struct ceph_inode_info *ci = ceph_inode(inode); 1757 struct ceph_inode_info *ci = ceph_inode(inode);
1743 int unlock_session = session ? 0 : 1;
1744 int flushing = 0; 1758 int flushing = 0;
1759 struct ceph_mds_session *session = NULL;
1745 1760
1746retry: 1761retry:
1747 spin_lock(&ci->i_ceph_lock); 1762 spin_lock(&ci->i_ceph_lock);
@@ -1755,13 +1770,14 @@ retry:
1755 int want = __ceph_caps_wanted(ci); 1770 int want = __ceph_caps_wanted(ci);
1756 int delayed; 1771 int delayed;
1757 1772
1758 if (!session) { 1773 if (!session || session != cap->session) {
1759 spin_unlock(&ci->i_ceph_lock); 1774 spin_unlock(&ci->i_ceph_lock);
1775 if (session)
1776 mutex_unlock(&session->s_mutex);
1760 session = cap->session; 1777 session = cap->session;
1761 mutex_lock(&session->s_mutex); 1778 mutex_lock(&session->s_mutex);
1762 goto retry; 1779 goto retry;
1763 } 1780 }
1764 BUG_ON(session != cap->session);
1765 if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) 1781 if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
1766 goto out; 1782 goto out;
1767 1783
@@ -1780,7 +1796,7 @@ retry:
1780out: 1796out:
1781 spin_unlock(&ci->i_ceph_lock); 1797 spin_unlock(&ci->i_ceph_lock);
1782out_unlocked: 1798out_unlocked:
1783 if (session && unlock_session) 1799 if (session)
1784 mutex_unlock(&session->s_mutex); 1800 mutex_unlock(&session->s_mutex);
1785 return flushing; 1801 return flushing;
1786} 1802}
@@ -1865,7 +1881,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
1865 return ret; 1881 return ret;
1866 mutex_lock(&inode->i_mutex); 1882 mutex_lock(&inode->i_mutex);
1867 1883
1868 dirty = try_flush_caps(inode, NULL, &flush_tid); 1884 dirty = try_flush_caps(inode, &flush_tid);
1869 dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); 1885 dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
1870 1886
1871 /* 1887 /*
@@ -1900,7 +1916,7 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
1900 1916
1901 dout("write_inode %p wait=%d\n", inode, wait); 1917 dout("write_inode %p wait=%d\n", inode, wait);
1902 if (wait) { 1918 if (wait) {
1903 dirty = try_flush_caps(inode, NULL, &flush_tid); 1919 dirty = try_flush_caps(inode, &flush_tid);
1904 if (dirty) 1920 if (dirty)
1905 err = wait_event_interruptible(ci->i_cap_wq, 1921 err = wait_event_interruptible(ci->i_cap_wq,
1906 caps_are_flushed(inode, flush_tid)); 1922 caps_are_flushed(inode, flush_tid));
@@ -2350,11 +2366,11 @@ static void invalidate_aliases(struct inode *inode)
2350 d_prune_aliases(inode); 2366 d_prune_aliases(inode);
2351 /* 2367 /*
2352 * For non-directory inode, d_find_alias() only returns 2368 * For non-directory inode, d_find_alias() only returns
2353 * connected dentry. After calling d_invalidate(), the 2369 * hashed dentry. After calling d_invalidate(), the
2354 * dentry become disconnected. 2370 * dentry becomes unhashed.
2355 * 2371 *
2356 * For directory inode, d_find_alias() can return 2372 * For directory inode, d_find_alias() can return
2357 * disconnected dentry. But directory inode should have 2373 * unhashed dentry. But directory inode should have
2358 * one alias at most. 2374 * one alias at most.
2359 */ 2375 */
2360 while ((dn = d_find_alias(inode))) { 2376 while ((dn = d_find_alias(inode))) {
@@ -2408,6 +2424,22 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2408 dout(" size %llu max_size %llu, i_size %llu\n", size, max_size, 2424 dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
2409 inode->i_size); 2425 inode->i_size);
2410 2426
2427
2428 /*
2429 * auth mds of the inode changed. we received the cap export message,
2430 * but still haven't received the cap import message. handle_cap_export
2431 * updated the new auth MDS' cap.
2432 *
2433 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message
2434 * that was sent before the cap import message. So don't remove caps.
2435 */
2436 if (ceph_seq_cmp(seq, cap->seq) <= 0) {
2437 WARN_ON(cap != ci->i_auth_cap);
2438 WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
2439 seq = cap->seq;
2440 newcaps |= cap->issued;
2441 }
2442
2411 /* 2443 /*
2412 * If CACHE is being revoked, and we have no dirty buffers, 2444 * If CACHE is being revoked, and we have no dirty buffers,
2413 * try to invalidate (once). (If there are dirty buffers, we 2445 * try to invalidate (once). (If there are dirty buffers, we
@@ -2434,6 +2466,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2434 issued |= implemented | __ceph_caps_dirty(ci); 2466 issued |= implemented | __ceph_caps_dirty(ci);
2435 2467
2436 cap->cap_gen = session->s_cap_gen; 2468 cap->cap_gen = session->s_cap_gen;
2469 cap->seq = seq;
2437 2470
2438 __check_cap_issue(ci, cap, newcaps); 2471 __check_cap_issue(ci, cap, newcaps);
2439 2472
@@ -2464,6 +2497,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2464 ceph_buffer_put(ci->i_xattrs.blob); 2497 ceph_buffer_put(ci->i_xattrs.blob);
2465 ci->i_xattrs.blob = ceph_buffer_get(xattr_buf); 2498 ci->i_xattrs.blob = ceph_buffer_get(xattr_buf);
2466 ci->i_xattrs.version = version; 2499 ci->i_xattrs.version = version;
2500 ceph_forget_all_cached_acls(inode);
2467 } 2501 }
2468 } 2502 }
2469 2503
@@ -2483,6 +2517,10 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2483 le32_to_cpu(grant->time_warp_seq), &ctime, &mtime, 2517 le32_to_cpu(grant->time_warp_seq), &ctime, &mtime,
2484 &atime); 2518 &atime);
2485 2519
2520
2521 /* file layout may have changed */
2522 ci->i_layout = grant->layout;
2523
2486 /* max size increase? */ 2524 /* max size increase? */
2487 if (ci->i_auth_cap == cap && max_size != ci->i_max_size) { 2525 if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
2488 dout("max_size %lld -> %llu\n", ci->i_max_size, max_size); 2526 dout("max_size %lld -> %llu\n", ci->i_max_size, max_size);
@@ -2511,11 +2549,6 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2511 check_caps = 1; 2549 check_caps = 1;
2512 } 2550 }
2513 2551
2514 cap->seq = seq;
2515
2516 /* file layout may have changed */
2517 ci->i_layout = grant->layout;
2518
2519 /* revocation, grant, or no-op? */ 2552 /* revocation, grant, or no-op? */
2520 if (cap->issued & ~newcaps) { 2553 if (cap->issued & ~newcaps) {
2521 int revoking = cap->issued & ~newcaps; 2554 int revoking = cap->issued & ~newcaps;
@@ -2741,65 +2774,114 @@ static void handle_cap_trunc(struct inode *inode,
2741 * caller holds s_mutex 2774 * caller holds s_mutex
2742 */ 2775 */
2743static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, 2776static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
2744 struct ceph_mds_session *session, 2777 struct ceph_mds_cap_peer *ph,
2745 int *open_target_sessions) 2778 struct ceph_mds_session *session)
2746{ 2779{
2747 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 2780 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
2781 struct ceph_mds_session *tsession = NULL;
2782 struct ceph_cap *cap, *tcap;
2748 struct ceph_inode_info *ci = ceph_inode(inode); 2783 struct ceph_inode_info *ci = ceph_inode(inode);
2749 int mds = session->s_mds; 2784 u64 t_cap_id;
2750 unsigned mseq = le32_to_cpu(ex->migrate_seq); 2785 unsigned mseq = le32_to_cpu(ex->migrate_seq);
2751 struct ceph_cap *cap = NULL, *t; 2786 unsigned t_seq, t_mseq;
2752 struct rb_node *p; 2787 int target, issued;
2753 int remember = 1; 2788 int mds = session->s_mds;
2754 2789
2755 dout("handle_cap_export inode %p ci %p mds%d mseq %d\n", 2790 if (ph) {
2756 inode, ci, mds, mseq); 2791 t_cap_id = le64_to_cpu(ph->cap_id);
2792 t_seq = le32_to_cpu(ph->seq);
2793 t_mseq = le32_to_cpu(ph->mseq);
2794 target = le32_to_cpu(ph->mds);
2795 } else {
2796 t_cap_id = t_seq = t_mseq = 0;
2797 target = -1;
2798 }
2757 2799
2800 dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n",
2801 inode, ci, mds, mseq, target);
2802retry:
2758 spin_lock(&ci->i_ceph_lock); 2803 spin_lock(&ci->i_ceph_lock);
2804 cap = __get_cap_for_mds(ci, mds);
2805 if (!cap)
2806 goto out_unlock;
2759 2807
2760 /* make sure we haven't seen a higher mseq */ 2808 if (target < 0) {
2761 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 2809 __ceph_remove_cap(cap, false);
2762 t = rb_entry(p, struct ceph_cap, ci_node); 2810 goto out_unlock;
2763 if (ceph_seq_cmp(t->mseq, mseq) > 0) {
2764 dout(" higher mseq on cap from mds%d\n",
2765 t->session->s_mds);
2766 remember = 0;
2767 }
2768 if (t->session->s_mds == mds)
2769 cap = t;
2770 } 2811 }
2771 2812
2772 if (cap) { 2813 /*
2773 if (remember) { 2814 * now we know we haven't received the cap import message yet
2774 /* make note */ 2815 * because the exported cap still exist.
2775 ci->i_cap_exporting_mds = mds; 2816 */
2776 ci->i_cap_exporting_mseq = mseq;
2777 ci->i_cap_exporting_issued = cap->issued;
2778
2779 /*
2780 * make sure we have open sessions with all possible
2781 * export targets, so that we get the matching IMPORT
2782 */
2783 *open_target_sessions = 1;
2784 2817
2785 /* 2818 issued = cap->issued;
2786 * we can't flush dirty caps that we've seen the 2819 WARN_ON(issued != cap->implemented);
2787 * EXPORT but no IMPORT for 2820
2788 */ 2821 tcap = __get_cap_for_mds(ci, target);
2789 spin_lock(&mdsc->cap_dirty_lock); 2822 if (tcap) {
2790 if (!list_empty(&ci->i_dirty_item)) { 2823 /* already have caps from the target */
2791 dout(" moving %p to cap_dirty_migrating\n", 2824 if (tcap->cap_id != t_cap_id ||
2792 inode); 2825 ceph_seq_cmp(tcap->seq, t_seq) < 0) {
2793 list_move(&ci->i_dirty_item, 2826 dout(" updating import cap %p mds%d\n", tcap, target);
2794 &mdsc->cap_dirty_migrating); 2827 tcap->cap_id = t_cap_id;
2828 tcap->seq = t_seq - 1;
2829 tcap->issue_seq = t_seq - 1;
2830 tcap->mseq = t_mseq;
2831 tcap->issued |= issued;
2832 tcap->implemented |= issued;
2833 if (cap == ci->i_auth_cap)
2834 ci->i_auth_cap = tcap;
2835 if (ci->i_flushing_caps && ci->i_auth_cap == tcap) {
2836 spin_lock(&mdsc->cap_dirty_lock);
2837 list_move_tail(&ci->i_flushing_item,
2838 &tcap->session->s_cap_flushing);
2839 spin_unlock(&mdsc->cap_dirty_lock);
2795 } 2840 }
2796 spin_unlock(&mdsc->cap_dirty_lock);
2797 } 2841 }
2798 __ceph_remove_cap(cap, false); 2842 __ceph_remove_cap(cap, false);
2843 goto out_unlock;
2799 } 2844 }
2800 /* else, we already released it */
2801 2845
2846 if (tsession) {
2847 int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
2848 spin_unlock(&ci->i_ceph_lock);
2849 /* add placeholder for the export tagert */
2850 ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
2851 t_seq - 1, t_mseq, (u64)-1, flag, NULL);
2852 goto retry;
2853 }
2854
2855 spin_unlock(&ci->i_ceph_lock);
2856 mutex_unlock(&session->s_mutex);
2857
2858 /* open target session */
2859 tsession = ceph_mdsc_open_export_target_session(mdsc, target);
2860 if (!IS_ERR(tsession)) {
2861 if (mds > target) {
2862 mutex_lock(&session->s_mutex);
2863 mutex_lock_nested(&tsession->s_mutex,
2864 SINGLE_DEPTH_NESTING);
2865 } else {
2866 mutex_lock(&tsession->s_mutex);
2867 mutex_lock_nested(&session->s_mutex,
2868 SINGLE_DEPTH_NESTING);
2869 }
2870 ceph_add_cap_releases(mdsc, tsession);
2871 } else {
2872 WARN_ON(1);
2873 tsession = NULL;
2874 target = -1;
2875 }
2876 goto retry;
2877
2878out_unlock:
2802 spin_unlock(&ci->i_ceph_lock); 2879 spin_unlock(&ci->i_ceph_lock);
2880 mutex_unlock(&session->s_mutex);
2881 if (tsession) {
2882 mutex_unlock(&tsession->s_mutex);
2883 ceph_put_mds_session(tsession);
2884 }
2803} 2885}
2804 2886
2805/* 2887/*
@@ -2810,10 +2892,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
2810 */ 2892 */
2811static void handle_cap_import(struct ceph_mds_client *mdsc, 2893static void handle_cap_import(struct ceph_mds_client *mdsc,
2812 struct inode *inode, struct ceph_mds_caps *im, 2894 struct inode *inode, struct ceph_mds_caps *im,
2895 struct ceph_mds_cap_peer *ph,
2813 struct ceph_mds_session *session, 2896 struct ceph_mds_session *session,
2814 void *snaptrace, int snaptrace_len) 2897 void *snaptrace, int snaptrace_len)
2815{ 2898{
2816 struct ceph_inode_info *ci = ceph_inode(inode); 2899 struct ceph_inode_info *ci = ceph_inode(inode);
2900 struct ceph_cap *cap;
2817 int mds = session->s_mds; 2901 int mds = session->s_mds;
2818 unsigned issued = le32_to_cpu(im->caps); 2902 unsigned issued = le32_to_cpu(im->caps);
2819 unsigned wanted = le32_to_cpu(im->wanted); 2903 unsigned wanted = le32_to_cpu(im->wanted);
@@ -2821,28 +2905,44 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
2821 unsigned mseq = le32_to_cpu(im->migrate_seq); 2905 unsigned mseq = le32_to_cpu(im->migrate_seq);
2822 u64 realmino = le64_to_cpu(im->realm); 2906 u64 realmino = le64_to_cpu(im->realm);
2823 u64 cap_id = le64_to_cpu(im->cap_id); 2907 u64 cap_id = le64_to_cpu(im->cap_id);
2908 u64 p_cap_id;
2909 int peer;
2824 2910
2825 if (ci->i_cap_exporting_mds >= 0 && 2911 if (ph) {
2826 ceph_seq_cmp(ci->i_cap_exporting_mseq, mseq) < 0) { 2912 p_cap_id = le64_to_cpu(ph->cap_id);
2827 dout("handle_cap_import inode %p ci %p mds%d mseq %d" 2913 peer = le32_to_cpu(ph->mds);
2828 " - cleared exporting from mds%d\n", 2914 } else {
2829 inode, ci, mds, mseq, 2915 p_cap_id = 0;
2830 ci->i_cap_exporting_mds); 2916 peer = -1;
2831 ci->i_cap_exporting_issued = 0; 2917 }
2832 ci->i_cap_exporting_mseq = 0;
2833 ci->i_cap_exporting_mds = -1;
2834 2918
2835 spin_lock(&mdsc->cap_dirty_lock); 2919 dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
2836 if (!list_empty(&ci->i_dirty_item)) { 2920 inode, ci, mds, mseq, peer);
2837 dout(" moving %p back to cap_dirty\n", inode); 2921
2838 list_move(&ci->i_dirty_item, &mdsc->cap_dirty); 2922 spin_lock(&ci->i_ceph_lock);
2923 cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
2924 if (cap && cap->cap_id == p_cap_id) {
2925 dout(" remove export cap %p mds%d flags %d\n",
2926 cap, peer, ph->flags);
2927 if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
2928 (cap->seq != le32_to_cpu(ph->seq) ||
2929 cap->mseq != le32_to_cpu(ph->mseq))) {
2930 pr_err("handle_cap_import: mismatched seq/mseq: "
2931 "ino (%llx.%llx) mds%d seq %d mseq %d "
2932 "importer mds%d has peer seq %d mseq %d\n",
2933 ceph_vinop(inode), peer, cap->seq,
2934 cap->mseq, mds, le32_to_cpu(ph->seq),
2935 le32_to_cpu(ph->mseq));
2839 } 2936 }
2840 spin_unlock(&mdsc->cap_dirty_lock); 2937 ci->i_cap_exporting_issued = cap->issued;
2841 } else { 2938 __ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
2842 dout("handle_cap_import inode %p ci %p mds%d mseq %d\n",
2843 inode, ci, mds, mseq);
2844 } 2939 }
2845 2940
2941 /* make sure we re-request max_size, if necessary */
2942 ci->i_wanted_max_size = 0;
2943 ci->i_requested_max_size = 0;
2944 spin_unlock(&ci->i_ceph_lock);
2945
2846 down_write(&mdsc->snap_rwsem); 2946 down_write(&mdsc->snap_rwsem);
2847 ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len, 2947 ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
2848 false); 2948 false);
@@ -2853,11 +2953,6 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
2853 kick_flushing_inode_caps(mdsc, session, inode); 2953 kick_flushing_inode_caps(mdsc, session, inode);
2854 up_read(&mdsc->snap_rwsem); 2954 up_read(&mdsc->snap_rwsem);
2855 2955
2856 /* make sure we re-request max_size, if necessary */
2857 spin_lock(&ci->i_ceph_lock);
2858 ci->i_wanted_max_size = 0; /* reset */
2859 ci->i_requested_max_size = 0;
2860 spin_unlock(&ci->i_ceph_lock);
2861} 2956}
2862 2957
2863/* 2958/*
@@ -2875,6 +2970,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2875 struct ceph_inode_info *ci; 2970 struct ceph_inode_info *ci;
2876 struct ceph_cap *cap; 2971 struct ceph_cap *cap;
2877 struct ceph_mds_caps *h; 2972 struct ceph_mds_caps *h;
2973 struct ceph_mds_cap_peer *peer = NULL;
2878 int mds = session->s_mds; 2974 int mds = session->s_mds;
2879 int op; 2975 int op;
2880 u32 seq, mseq; 2976 u32 seq, mseq;
@@ -2885,12 +2981,13 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2885 void *snaptrace; 2981 void *snaptrace;
2886 size_t snaptrace_len; 2982 size_t snaptrace_len;
2887 void *flock; 2983 void *flock;
2984 void *end;
2888 u32 flock_len; 2985 u32 flock_len;
2889 int open_target_sessions = 0;
2890 2986
2891 dout("handle_caps from mds%d\n", mds); 2987 dout("handle_caps from mds%d\n", mds);
2892 2988
2893 /* decode */ 2989 /* decode */
2990 end = msg->front.iov_base + msg->front.iov_len;
2894 tid = le64_to_cpu(msg->hdr.tid); 2991 tid = le64_to_cpu(msg->hdr.tid);
2895 if (msg->front.iov_len < sizeof(*h)) 2992 if (msg->front.iov_len < sizeof(*h))
2896 goto bad; 2993 goto bad;
@@ -2908,17 +3005,28 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2908 snaptrace_len = le32_to_cpu(h->snap_trace_len); 3005 snaptrace_len = le32_to_cpu(h->snap_trace_len);
2909 3006
2910 if (le16_to_cpu(msg->hdr.version) >= 2) { 3007 if (le16_to_cpu(msg->hdr.version) >= 2) {
2911 void *p, *end; 3008 void *p = snaptrace + snaptrace_len;
2912
2913 p = snaptrace + snaptrace_len;
2914 end = msg->front.iov_base + msg->front.iov_len;
2915 ceph_decode_32_safe(&p, end, flock_len, bad); 3009 ceph_decode_32_safe(&p, end, flock_len, bad);
3010 if (p + flock_len > end)
3011 goto bad;
2916 flock = p; 3012 flock = p;
2917 } else { 3013 } else {
2918 flock = NULL; 3014 flock = NULL;
2919 flock_len = 0; 3015 flock_len = 0;
2920 } 3016 }
2921 3017
3018 if (le16_to_cpu(msg->hdr.version) >= 3) {
3019 if (op == CEPH_CAP_OP_IMPORT) {
3020 void *p = flock + flock_len;
3021 if (p + sizeof(*peer) > end)
3022 goto bad;
3023 peer = p;
3024 } else if (op == CEPH_CAP_OP_EXPORT) {
3025 /* recorded in unused fields */
3026 peer = (void *)&h->size;
3027 }
3028 }
3029
2922 mutex_lock(&session->s_mutex); 3030 mutex_lock(&session->s_mutex);
2923 session->s_seq++; 3031 session->s_seq++;
2924 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, 3032 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
@@ -2951,11 +3059,11 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2951 goto done; 3059 goto done;
2952 3060
2953 case CEPH_CAP_OP_EXPORT: 3061 case CEPH_CAP_OP_EXPORT:
2954 handle_cap_export(inode, h, session, &open_target_sessions); 3062 handle_cap_export(inode, h, peer, session);
2955 goto done; 3063 goto done_unlocked;
2956 3064
2957 case CEPH_CAP_OP_IMPORT: 3065 case CEPH_CAP_OP_IMPORT:
2958 handle_cap_import(mdsc, inode, h, session, 3066 handle_cap_import(mdsc, inode, h, peer, session,
2959 snaptrace, snaptrace_len); 3067 snaptrace, snaptrace_len);
2960 } 3068 }
2961 3069
@@ -3007,8 +3115,6 @@ done:
3007done_unlocked: 3115done_unlocked:
3008 if (inode) 3116 if (inode)
3009 iput(inode); 3117 iput(inode);
3010 if (open_target_sessions)
3011 ceph_mdsc_open_export_target_sessions(mdsc, session);
3012 return; 3118 return;
3013 3119
3014bad: 3120bad:
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 2a0bcaeb189a..619616d585b0 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -693,6 +693,10 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
693 if (!err && !req->r_reply_info.head->is_dentry) 693 if (!err && !req->r_reply_info.head->is_dentry)
694 err = ceph_handle_notrace_create(dir, dentry); 694 err = ceph_handle_notrace_create(dir, dentry);
695 ceph_mdsc_put_request(req); 695 ceph_mdsc_put_request(req);
696
697 if (!err)
698 err = ceph_init_acl(dentry, dentry->d_inode, dir);
699
696 if (err) 700 if (err)
697 d_drop(dentry); 701 d_drop(dentry);
698 return err; 702 return err;
@@ -1037,14 +1041,19 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
1037 valid = 1; 1041 valid = 1;
1038 } else if (dentry_lease_is_valid(dentry) || 1042 } else if (dentry_lease_is_valid(dentry) ||
1039 dir_lease_is_valid(dir, dentry)) { 1043 dir_lease_is_valid(dir, dentry)) {
1040 valid = 1; 1044 if (dentry->d_inode)
1045 valid = ceph_is_any_caps(dentry->d_inode);
1046 else
1047 valid = 1;
1041 } 1048 }
1042 1049
1043 dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid"); 1050 dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
1044 if (valid) 1051 if (valid) {
1045 ceph_dentry_lru_touch(dentry); 1052 ceph_dentry_lru_touch(dentry);
1046 else 1053 } else {
1054 ceph_dir_clear_complete(dir);
1047 d_drop(dentry); 1055 d_drop(dentry);
1056 }
1048 iput(dir); 1057 iput(dir);
1049 return valid; 1058 return valid;
1050} 1059}
@@ -1293,6 +1302,7 @@ const struct inode_operations ceph_dir_iops = {
1293 .getxattr = ceph_getxattr, 1302 .getxattr = ceph_getxattr,
1294 .listxattr = ceph_listxattr, 1303 .listxattr = ceph_listxattr,
1295 .removexattr = ceph_removexattr, 1304 .removexattr = ceph_removexattr,
1305 .get_acl = ceph_get_acl,
1296 .mknod = ceph_mknod, 1306 .mknod = ceph_mknod,
1297 .symlink = ceph_symlink, 1307 .symlink = ceph_symlink,
1298 .mkdir = ceph_mkdir, 1308 .mkdir = ceph_mkdir,
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 3de89829e2a1..dfd2ce3419f8 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -408,51 +408,92 @@ more:
408 * 408 *
409 * If the read spans object boundary, just do multiple reads. 409 * If the read spans object boundary, just do multiple reads.
410 */ 410 */
411static ssize_t ceph_sync_read(struct file *file, char __user *data, 411static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
412 unsigned len, loff_t *poff, int *checkeof) 412 int *checkeof)
413{ 413{
414 struct file *file = iocb->ki_filp;
414 struct inode *inode = file_inode(file); 415 struct inode *inode = file_inode(file);
415 struct page **pages; 416 struct page **pages;
416 u64 off = *poff; 417 u64 off = iocb->ki_pos;
417 int num_pages, ret; 418 int num_pages, ret;
419 size_t len = i->count;
418 420
419 dout("sync_read on file %p %llu~%u %s\n", file, off, len, 421 dout("sync_read on file %p %llu~%u %s\n", file, off,
422 (unsigned)len,
420 (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); 423 (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
421
422 if (file->f_flags & O_DIRECT) {
423 num_pages = calc_pages_for((unsigned long)data, len);
424 pages = ceph_get_direct_page_vector(data, num_pages, true);
425 } else {
426 num_pages = calc_pages_for(off, len);
427 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
428 }
429 if (IS_ERR(pages))
430 return PTR_ERR(pages);
431
432 /* 424 /*
433 * flush any page cache pages in this range. this 425 * flush any page cache pages in this range. this
434 * will make concurrent normal and sync io slow, 426 * will make concurrent normal and sync io slow,
435 * but it will at least behave sensibly when they are 427 * but it will at least behave sensibly when they are
436 * in sequence. 428 * in sequence.
437 */ 429 */
438 ret = filemap_write_and_wait(inode->i_mapping); 430 ret = filemap_write_and_wait_range(inode->i_mapping, off,
431 off + len);
439 if (ret < 0) 432 if (ret < 0)
440 goto done; 433 return ret;
441 434
442 ret = striped_read(inode, off, len, pages, num_pages, checkeof, 435 if (file->f_flags & O_DIRECT) {
443 file->f_flags & O_DIRECT, 436 while (iov_iter_count(i)) {
444 (unsigned long)data & ~PAGE_MASK); 437 void __user *data = i->iov[0].iov_base + i->iov_offset;
438 size_t len = i->iov[0].iov_len - i->iov_offset;
439
440 num_pages = calc_pages_for((unsigned long)data, len);
441 pages = ceph_get_direct_page_vector(data,
442 num_pages, true);
443 if (IS_ERR(pages))
444 return PTR_ERR(pages);
445
446 ret = striped_read(inode, off, len,
447 pages, num_pages, checkeof,
448 1, (unsigned long)data & ~PAGE_MASK);
449 ceph_put_page_vector(pages, num_pages, true);
450
451 if (ret <= 0)
452 break;
453 off += ret;
454 iov_iter_advance(i, ret);
455 if (ret < len)
456 break;
457 }
458 } else {
459 num_pages = calc_pages_for(off, len);
460 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
461 if (IS_ERR(pages))
462 return PTR_ERR(pages);
463 ret = striped_read(inode, off, len, pages,
464 num_pages, checkeof, 0, 0);
465 if (ret > 0) {
466 int l, k = 0;
467 size_t left = len = ret;
468
469 while (left) {
470 void __user *data = i->iov[0].iov_base
471 + i->iov_offset;
472 l = min(i->iov[0].iov_len - i->iov_offset,
473 left);
474
475 ret = ceph_copy_page_vector_to_user(&pages[k],
476 data, off,
477 l);
478 if (ret > 0) {
479 iov_iter_advance(i, ret);
480 left -= ret;
481 off += ret;
482 k = calc_pages_for(iocb->ki_pos,
483 len - left + 1) - 1;
484 BUG_ON(k >= num_pages && left);
485 } else
486 break;
487 }
488 }
489 ceph_release_page_vector(pages, num_pages);
490 }
445 491
446 if (ret >= 0 && (file->f_flags & O_DIRECT) == 0) 492 if (off > iocb->ki_pos) {
447 ret = ceph_copy_page_vector_to_user(pages, data, off, ret); 493 ret = off - iocb->ki_pos;
448 if (ret >= 0) 494 iocb->ki_pos = off;
449 *poff = off + ret; 495 }
450 496
451done:
452 if (file->f_flags & O_DIRECT)
453 ceph_put_page_vector(pages, num_pages, true);
454 else
455 ceph_release_page_vector(pages, num_pages);
456 dout("sync_read result %d\n", ret); 497 dout("sync_read result %d\n", ret);
457 return ret; 498 return ret;
458} 499}
@@ -489,83 +530,79 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
489 } 530 }
490} 531}
491 532
533
492/* 534/*
493 * Synchronous write, straight from __user pointer or user pages (if 535 * Synchronous write, straight from __user pointer or user pages.
494 * O_DIRECT).
495 * 536 *
496 * If write spans object boundary, just do multiple writes. (For a 537 * If write spans object boundary, just do multiple writes. (For a
497 * correct atomic write, we should e.g. take write locks on all 538 * correct atomic write, we should e.g. take write locks on all
498 * objects, rollback on failure, etc.) 539 * objects, rollback on failure, etc.)
499 */ 540 */
500static ssize_t ceph_sync_write(struct file *file, const char __user *data, 541static ssize_t
501 size_t left, loff_t pos, loff_t *ppos) 542ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,
543 unsigned long nr_segs, size_t count)
502{ 544{
545 struct file *file = iocb->ki_filp;
503 struct inode *inode = file_inode(file); 546 struct inode *inode = file_inode(file);
504 struct ceph_inode_info *ci = ceph_inode(inode); 547 struct ceph_inode_info *ci = ceph_inode(inode);
505 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 548 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
506 struct ceph_snap_context *snapc; 549 struct ceph_snap_context *snapc;
507 struct ceph_vino vino; 550 struct ceph_vino vino;
508 struct ceph_osd_request *req; 551 struct ceph_osd_request *req;
509 int num_ops = 1;
510 struct page **pages; 552 struct page **pages;
511 int num_pages; 553 int num_pages;
512 u64 len;
513 int written = 0; 554 int written = 0;
514 int flags; 555 int flags;
515 int check_caps = 0; 556 int check_caps = 0;
516 int page_align, io_align; 557 int page_align;
517 unsigned long buf_align;
518 int ret; 558 int ret;
519 struct timespec mtime = CURRENT_TIME; 559 struct timespec mtime = CURRENT_TIME;
520 bool own_pages = false; 560 loff_t pos = iocb->ki_pos;
561 struct iov_iter i;
521 562
522 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) 563 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
523 return -EROFS; 564 return -EROFS;
524 565
525 dout("sync_write on file %p %lld~%u %s\n", file, pos, 566 dout("sync_direct_write on file %p %lld~%u\n", file, pos,
526 (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); 567 (unsigned)count);
527 568
528 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); 569 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
529 if (ret < 0) 570 if (ret < 0)
530 return ret; 571 return ret;
531 572
532 ret = invalidate_inode_pages2_range(inode->i_mapping, 573 ret = invalidate_inode_pages2_range(inode->i_mapping,
533 pos >> PAGE_CACHE_SHIFT, 574 pos >> PAGE_CACHE_SHIFT,
534 (pos + left) >> PAGE_CACHE_SHIFT); 575 (pos + count) >> PAGE_CACHE_SHIFT);
535 if (ret < 0) 576 if (ret < 0)
536 dout("invalidate_inode_pages2_range returned %d\n", ret); 577 dout("invalidate_inode_pages2_range returned %d\n", ret);
537 578
538 flags = CEPH_OSD_FLAG_ORDERSNAP | 579 flags = CEPH_OSD_FLAG_ORDERSNAP |
539 CEPH_OSD_FLAG_ONDISK | 580 CEPH_OSD_FLAG_ONDISK |
540 CEPH_OSD_FLAG_WRITE; 581 CEPH_OSD_FLAG_WRITE;
541 if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
542 flags |= CEPH_OSD_FLAG_ACK;
543 else
544 num_ops++; /* Also include a 'startsync' command. */
545 582
546 /* 583 iov_iter_init(&i, iov, nr_segs, count, 0);
547 * we may need to do multiple writes here if we span an object 584
548 * boundary. this isn't atomic, unfortunately. :( 585 while (iov_iter_count(&i) > 0) {
549 */ 586 void __user *data = i.iov->iov_base + i.iov_offset;
550more: 587 u64 len = i.iov->iov_len - i.iov_offset;
551 io_align = pos & ~PAGE_MASK; 588
552 buf_align = (unsigned long)data & ~PAGE_MASK; 589 page_align = (unsigned long)data & ~PAGE_MASK;
553 len = left; 590
554 591 snapc = ci->i_snap_realm->cached_context;
555 snapc = ci->i_snap_realm->cached_context; 592 vino = ceph_vino(inode);
556 vino = ceph_vino(inode); 593 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
557 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 594 vino, pos, &len,
558 vino, pos, &len, num_ops, 595 2,/*include a 'startsync' command*/
559 CEPH_OSD_OP_WRITE, flags, snapc, 596 CEPH_OSD_OP_WRITE, flags, snapc,
560 ci->i_truncate_seq, ci->i_truncate_size, 597 ci->i_truncate_seq,
561 false); 598 ci->i_truncate_size,
562 if (IS_ERR(req)) 599 false);
563 return PTR_ERR(req); 600 if (IS_ERR(req)) {
601 ret = PTR_ERR(req);
602 goto out;
603 }
564 604
565 /* write from beginning of first page, regardless of io alignment */ 605 num_pages = calc_pages_for(page_align, len);
566 page_align = file->f_flags & O_DIRECT ? buf_align : io_align;
567 num_pages = calc_pages_for(page_align, len);
568 if (file->f_flags & O_DIRECT) {
569 pages = ceph_get_direct_page_vector(data, num_pages, false); 606 pages = ceph_get_direct_page_vector(data, num_pages, false);
570 if (IS_ERR(pages)) { 607 if (IS_ERR(pages)) {
571 ret = PTR_ERR(pages); 608 ret = PTR_ERR(pages);
@@ -577,60 +614,175 @@ more:
577 * may block. 614 * may block.
578 */ 615 */
579 truncate_inode_pages_range(inode->i_mapping, pos, 616 truncate_inode_pages_range(inode->i_mapping, pos,
580 (pos+len) | (PAGE_CACHE_SIZE-1)); 617 (pos+len) | (PAGE_CACHE_SIZE-1));
581 } else { 618 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
619 false, false);
620
621 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
622 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
623
624 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
625 if (!ret)
626 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
627
628 ceph_put_page_vector(pages, num_pages, false);
629
630out:
631 ceph_osdc_put_request(req);
632 if (ret == 0) {
633 pos += len;
634 written += len;
635 iov_iter_advance(&i, (size_t)len);
636
637 if (pos > i_size_read(inode)) {
638 check_caps = ceph_inode_set_size(inode, pos);
639 if (check_caps)
640 ceph_check_caps(ceph_inode(inode),
641 CHECK_CAPS_AUTHONLY,
642 NULL);
643 }
644 } else
645 break;
646 }
647
648 if (ret != -EOLDSNAPC && written > 0) {
649 iocb->ki_pos = pos;
650 ret = written;
651 }
652 return ret;
653}
654
655
656/*
657 * Synchronous write, straight from __user pointer or user pages.
658 *
659 * If write spans object boundary, just do multiple writes. (For a
660 * correct atomic write, we should e.g. take write locks on all
661 * objects, rollback on failure, etc.)
662 */
663static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov,
664 unsigned long nr_segs, size_t count)
665{
666 struct file *file = iocb->ki_filp;
667 struct inode *inode = file_inode(file);
668 struct ceph_inode_info *ci = ceph_inode(inode);
669 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
670 struct ceph_snap_context *snapc;
671 struct ceph_vino vino;
672 struct ceph_osd_request *req;
673 struct page **pages;
674 u64 len;
675 int num_pages;
676 int written = 0;
677 int flags;
678 int check_caps = 0;
679 int ret;
680 struct timespec mtime = CURRENT_TIME;
681 loff_t pos = iocb->ki_pos;
682 struct iov_iter i;
683
684 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
685 return -EROFS;
686
687 dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
688
689 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
690 if (ret < 0)
691 return ret;
692
693 ret = invalidate_inode_pages2_range(inode->i_mapping,
694 pos >> PAGE_CACHE_SHIFT,
695 (pos + count) >> PAGE_CACHE_SHIFT);
696 if (ret < 0)
697 dout("invalidate_inode_pages2_range returned %d\n", ret);
698
699 flags = CEPH_OSD_FLAG_ORDERSNAP |
700 CEPH_OSD_FLAG_ONDISK |
701 CEPH_OSD_FLAG_WRITE |
702 CEPH_OSD_FLAG_ACK;
703
704 iov_iter_init(&i, iov, nr_segs, count, 0);
705
706 while ((len = iov_iter_count(&i)) > 0) {
707 size_t left;
708 int n;
709
710 snapc = ci->i_snap_realm->cached_context;
711 vino = ceph_vino(inode);
712 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
713 vino, pos, &len, 1,
714 CEPH_OSD_OP_WRITE, flags, snapc,
715 ci->i_truncate_seq,
716 ci->i_truncate_size,
717 false);
718 if (IS_ERR(req)) {
719 ret = PTR_ERR(req);
720 goto out;
721 }
722
723 /*
724 * write from beginning of first page,
725 * regardless of io alignment
726 */
727 num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
728
582 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); 729 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
583 if (IS_ERR(pages)) { 730 if (IS_ERR(pages)) {
584 ret = PTR_ERR(pages); 731 ret = PTR_ERR(pages);
585 goto out; 732 goto out;
586 } 733 }
587 ret = ceph_copy_user_to_page_vector(pages, data, pos, len); 734
735 left = len;
736 for (n = 0; n < num_pages; n++) {
737 size_t plen = min_t(size_t, left, PAGE_SIZE);
738 ret = iov_iter_copy_from_user(pages[n], &i, 0, plen);
739 if (ret != plen) {
740 ret = -EFAULT;
741 break;
742 }
743 left -= ret;
744 iov_iter_advance(&i, ret);
745 }
746
588 if (ret < 0) { 747 if (ret < 0) {
589 ceph_release_page_vector(pages, num_pages); 748 ceph_release_page_vector(pages, num_pages);
590 goto out; 749 goto out;
591 } 750 }
592 751
593 if ((file->f_flags & O_SYNC) == 0) { 752 /* get a second commit callback */
594 /* get a second commit callback */ 753 req->r_unsafe_callback = ceph_sync_write_unsafe;
595 req->r_unsafe_callback = ceph_sync_write_unsafe; 754 req->r_inode = inode;
596 req->r_inode = inode;
597 own_pages = true;
598 }
599 }
600 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
601 false, own_pages);
602 755
603 /* BUG_ON(vino.snap != CEPH_NOSNAP); */ 756 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
604 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); 757 false, true);
605 758
606 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); 759 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
607 if (!ret) 760 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
608 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
609 761
610 if (file->f_flags & O_DIRECT) 762 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
611 ceph_put_page_vector(pages, num_pages, false); 763 if (!ret)
612 else if (file->f_flags & O_SYNC) 764 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
613 ceph_release_page_vector(pages, num_pages);
614 765
615out: 766out:
616 ceph_osdc_put_request(req); 767 ceph_osdc_put_request(req);
617 if (ret == 0) { 768 if (ret == 0) {
618 pos += len; 769 pos += len;
619 written += len; 770 written += len;
620 left -= len; 771
621 data += len; 772 if (pos > i_size_read(inode)) {
622 if (left) 773 check_caps = ceph_inode_set_size(inode, pos);
623 goto more; 774 if (check_caps)
775 ceph_check_caps(ceph_inode(inode),
776 CHECK_CAPS_AUTHONLY,
777 NULL);
778 }
779 } else
780 break;
781 }
624 782
783 if (ret != -EOLDSNAPC && written > 0) {
625 ret = written; 784 ret = written;
626 *ppos = pos; 785 iocb->ki_pos = pos;
627 if (pos > i_size_read(inode))
628 check_caps = ceph_inode_set_size(inode, pos);
629 if (check_caps)
630 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
631 NULL);
632 } else if (ret != -EOLDSNAPC && written > 0) {
633 ret = written;
634 } 786 }
635 return ret; 787 return ret;
636} 788}
@@ -647,55 +799,84 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
647{ 799{
648 struct file *filp = iocb->ki_filp; 800 struct file *filp = iocb->ki_filp;
649 struct ceph_file_info *fi = filp->private_data; 801 struct ceph_file_info *fi = filp->private_data;
650 loff_t *ppos = &iocb->ki_pos; 802 size_t len = iocb->ki_nbytes;
651 size_t len = iov->iov_len;
652 struct inode *inode = file_inode(filp); 803 struct inode *inode = file_inode(filp);
653 struct ceph_inode_info *ci = ceph_inode(inode); 804 struct ceph_inode_info *ci = ceph_inode(inode);
654 void __user *base = iov->iov_base;
655 ssize_t ret; 805 ssize_t ret;
656 int want, got = 0; 806 int want, got = 0;
657 int checkeof = 0, read = 0; 807 int checkeof = 0, read = 0;
658 808
659 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
660 inode, ceph_vinop(inode), pos, (unsigned)len, inode);
661again: 809again:
810 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
811 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
812
662 if (fi->fmode & CEPH_FILE_MODE_LAZY) 813 if (fi->fmode & CEPH_FILE_MODE_LAZY)
663 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; 814 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
664 else 815 else
665 want = CEPH_CAP_FILE_CACHE; 816 want = CEPH_CAP_FILE_CACHE;
666 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); 817 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
667 if (ret < 0) 818 if (ret < 0)
668 goto out; 819 return ret;
669 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
670 inode, ceph_vinop(inode), pos, (unsigned)len,
671 ceph_cap_string(got));
672 820
673 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || 821 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
674 (iocb->ki_filp->f_flags & O_DIRECT) || 822 (iocb->ki_filp->f_flags & O_DIRECT) ||
675 (fi->flags & CEPH_F_SYNC)) 823 (fi->flags & CEPH_F_SYNC)) {
824 struct iov_iter i;
825
826 dout("aio_sync_read %p %llx.%llx %llu~%u got cap refs on %s\n",
827 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
828 ceph_cap_string(got));
829
830 if (!read) {
831 ret = generic_segment_checks(iov, &nr_segs,
832 &len, VERIFY_WRITE);
833 if (ret)
834 goto out;
835 }
836
837 iov_iter_init(&i, iov, nr_segs, len, read);
838
676 /* hmm, this isn't really async... */ 839 /* hmm, this isn't really async... */
677 ret = ceph_sync_read(filp, base, len, ppos, &checkeof); 840 ret = ceph_sync_read(iocb, &i, &checkeof);
678 else 841 } else {
679 ret = generic_file_aio_read(iocb, iov, nr_segs, pos); 842 /*
843 * We can't modify the content of iov,
844 * so we only read from beginning.
845 */
846 if (read) {
847 iocb->ki_pos = pos;
848 len = iocb->ki_nbytes;
849 read = 0;
850 }
851 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
852 inode, ceph_vinop(inode), pos, (unsigned)len,
853 ceph_cap_string(got));
680 854
855 ret = generic_file_aio_read(iocb, iov, nr_segs, pos);
856 }
681out: 857out:
682 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", 858 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
683 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); 859 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
684 ceph_put_cap_refs(ci, got); 860 ceph_put_cap_refs(ci, got);
685 861
686 if (checkeof && ret >= 0) { 862 if (checkeof && ret >= 0) {
687 int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); 863 int statret = ceph_do_getattr(inode,
864 CEPH_STAT_CAP_SIZE);
688 865
689 /* hit EOF or hole? */ 866 /* hit EOF or hole? */
690 if (statret == 0 && *ppos < inode->i_size) { 867 if (statret == 0 && iocb->ki_pos < inode->i_size &&
691 dout("aio_read sync_read hit hole, ppos %lld < size %lld, reading more\n", *ppos, inode->i_size); 868 ret < len) {
869 dout("sync_read hit hole, ppos %lld < size %lld"
870 ", reading more\n", iocb->ki_pos,
871 inode->i_size);
872
692 read += ret; 873 read += ret;
693 base += ret;
694 len -= ret; 874 len -= ret;
695 checkeof = 0; 875 checkeof = 0;
696 goto again; 876 goto again;
697 } 877 }
698 } 878 }
879
699 if (ret >= 0) 880 if (ret >= 0)
700 ret += read; 881 ret += read;
701 882
@@ -772,11 +953,13 @@ retry_snap:
772 inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); 953 inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
773 954
774 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || 955 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
775 (iocb->ki_filp->f_flags & O_DIRECT) || 956 (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
776 (fi->flags & CEPH_F_SYNC)) {
777 mutex_unlock(&inode->i_mutex); 957 mutex_unlock(&inode->i_mutex);
778 written = ceph_sync_write(file, iov->iov_base, count, 958 if (file->f_flags & O_DIRECT)
779 pos, &iocb->ki_pos); 959 written = ceph_sync_direct_write(iocb, iov,
960 nr_segs, count);
961 else
962 written = ceph_sync_write(iocb, iov, nr_segs, count);
780 if (written == -EOLDSNAPC) { 963 if (written == -EOLDSNAPC) {
781 dout("aio_write %p %llx.%llx %llu~%u" 964 dout("aio_write %p %llx.%llx %llu~%u"
782 "got EOLDSNAPC, retrying\n", 965 "got EOLDSNAPC, retrying\n",
@@ -1018,7 +1201,7 @@ static long ceph_fallocate(struct file *file, int mode,
1018 loff_t offset, loff_t length) 1201 loff_t offset, loff_t length)
1019{ 1202{
1020 struct ceph_file_info *fi = file->private_data; 1203 struct ceph_file_info *fi = file->private_data;
1021 struct inode *inode = file->f_dentry->d_inode; 1204 struct inode *inode = file_inode(file);
1022 struct ceph_inode_info *ci = ceph_inode(inode); 1205 struct ceph_inode_info *ci = ceph_inode(inode);
1023 struct ceph_osd_client *osdc = 1206 struct ceph_osd_client *osdc =
1024 &ceph_inode_to_client(inode)->client->osdc; 1207 &ceph_inode_to_client(inode)->client->osdc;
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 278fd2891288..6fc10a7d7c59 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -95,6 +95,7 @@ const struct inode_operations ceph_file_iops = {
95 .getxattr = ceph_getxattr, 95 .getxattr = ceph_getxattr,
96 .listxattr = ceph_listxattr, 96 .listxattr = ceph_listxattr,
97 .removexattr = ceph_removexattr, 97 .removexattr = ceph_removexattr,
98 .get_acl = ceph_get_acl,
98}; 99};
99 100
100 101
@@ -335,12 +336,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
335 ci->i_hold_caps_min = 0; 336 ci->i_hold_caps_min = 0;
336 ci->i_hold_caps_max = 0; 337 ci->i_hold_caps_max = 0;
337 INIT_LIST_HEAD(&ci->i_cap_delay_list); 338 INIT_LIST_HEAD(&ci->i_cap_delay_list);
338 ci->i_cap_exporting_mds = 0;
339 ci->i_cap_exporting_mseq = 0;
340 ci->i_cap_exporting_issued = 0;
341 INIT_LIST_HEAD(&ci->i_cap_snaps); 339 INIT_LIST_HEAD(&ci->i_cap_snaps);
342 ci->i_head_snapc = NULL; 340 ci->i_head_snapc = NULL;
343 ci->i_snap_caps = 0; 341 ci->i_snap_caps = 0;
342 ci->i_cap_exporting_issued = 0;
344 343
345 for (i = 0; i < CEPH_FILE_MODE_NUM; i++) 344 for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
346 ci->i_nr_by_mode[i] = 0; 345 ci->i_nr_by_mode[i] = 0;
@@ -436,6 +435,16 @@ void ceph_destroy_inode(struct inode *inode)
436 call_rcu(&inode->i_rcu, ceph_i_callback); 435 call_rcu(&inode->i_rcu, ceph_i_callback);
437} 436}
438 437
438int ceph_drop_inode(struct inode *inode)
439{
440 /*
441 * Positve dentry and corresponding inode are always accompanied
442 * in MDS reply. So no need to keep inode in the cache after
443 * dropping all its aliases.
444 */
445 return 1;
446}
447
439/* 448/*
440 * Helpers to fill in size, ctime, mtime, and atime. We have to be 449 * Helpers to fill in size, ctime, mtime, and atime. We have to be
441 * careful because either the client or MDS may have more up to date 450 * careful because either the client or MDS may have more up to date
@@ -670,6 +679,7 @@ static int fill_inode(struct inode *inode,
670 memcpy(ci->i_xattrs.blob->vec.iov_base, 679 memcpy(ci->i_xattrs.blob->vec.iov_base,
671 iinfo->xattr_data, iinfo->xattr_len); 680 iinfo->xattr_data, iinfo->xattr_len);
672 ci->i_xattrs.version = le64_to_cpu(info->xattr_version); 681 ci->i_xattrs.version = le64_to_cpu(info->xattr_version);
682 ceph_forget_all_cached_acls(inode);
673 xattr_blob = NULL; 683 xattr_blob = NULL;
674 } 684 }
675 685
@@ -1454,7 +1464,8 @@ static void ceph_invalidate_work(struct work_struct *work)
1454 dout("invalidate_pages %p gen %d revoking %d\n", inode, 1464 dout("invalidate_pages %p gen %d revoking %d\n", inode,
1455 ci->i_rdcache_gen, ci->i_rdcache_revoking); 1465 ci->i_rdcache_gen, ci->i_rdcache_revoking);
1456 if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { 1466 if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
1457 /* nevermind! */ 1467 if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE))
1468 check = 1;
1458 spin_unlock(&ci->i_ceph_lock); 1469 spin_unlock(&ci->i_ceph_lock);
1459 mutex_unlock(&ci->i_truncate_mutex); 1470 mutex_unlock(&ci->i_truncate_mutex);
1460 goto out; 1471 goto out;
@@ -1475,13 +1486,14 @@ static void ceph_invalidate_work(struct work_struct *work)
1475 dout("invalidate_pages %p gen %d raced, now %d revoking %d\n", 1486 dout("invalidate_pages %p gen %d raced, now %d revoking %d\n",
1476 inode, orig_gen, ci->i_rdcache_gen, 1487 inode, orig_gen, ci->i_rdcache_gen,
1477 ci->i_rdcache_revoking); 1488 ci->i_rdcache_revoking);
1489 if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE))
1490 check = 1;
1478 } 1491 }
1479 spin_unlock(&ci->i_ceph_lock); 1492 spin_unlock(&ci->i_ceph_lock);
1480 mutex_unlock(&ci->i_truncate_mutex); 1493 mutex_unlock(&ci->i_truncate_mutex);
1481 1494out:
1482 if (check) 1495 if (check)
1483 ceph_check_caps(ci, 0, NULL); 1496 ceph_check_caps(ci, 0, NULL);
1484out:
1485 iput(inode); 1497 iput(inode);
1486} 1498}
1487 1499
@@ -1602,6 +1614,7 @@ static const struct inode_operations ceph_symlink_iops = {
1602 .getxattr = ceph_getxattr, 1614 .getxattr = ceph_getxattr,
1603 .listxattr = ceph_listxattr, 1615 .listxattr = ceph_listxattr,
1604 .removexattr = ceph_removexattr, 1616 .removexattr = ceph_removexattr,
1617 .get_acl = ceph_get_acl,
1605}; 1618};
1606 1619
1607/* 1620/*
@@ -1675,6 +1688,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1675 dirtied |= CEPH_CAP_AUTH_EXCL; 1688 dirtied |= CEPH_CAP_AUTH_EXCL;
1676 } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 || 1689 } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||
1677 attr->ia_mode != inode->i_mode) { 1690 attr->ia_mode != inode->i_mode) {
1691 inode->i_mode = attr->ia_mode;
1678 req->r_args.setattr.mode = cpu_to_le32(attr->ia_mode); 1692 req->r_args.setattr.mode = cpu_to_le32(attr->ia_mode);
1679 mask |= CEPH_SETATTR_MODE; 1693 mask |= CEPH_SETATTR_MODE;
1680 release |= CEPH_CAP_AUTH_SHARED; 1694 release |= CEPH_CAP_AUTH_SHARED;
@@ -1790,6 +1804,12 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1790 if (inode_dirty_flags) 1804 if (inode_dirty_flags)
1791 __mark_inode_dirty(inode, inode_dirty_flags); 1805 __mark_inode_dirty(inode, inode_dirty_flags);
1792 1806
1807 if (ia_valid & ATTR_MODE) {
1808 err = ceph_acl_chmod(dentry, inode);
1809 if (err)
1810 goto out_put;
1811 }
1812
1793 if (mask) { 1813 if (mask) {
1794 req->r_inode = inode; 1814 req->r_inode = inode;
1795 ihold(inode); 1815 ihold(inode);
@@ -1809,6 +1829,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1809 return err; 1829 return err;
1810out: 1830out:
1811 spin_unlock(&ci->i_ceph_lock); 1831 spin_unlock(&ci->i_ceph_lock);
1832out_put:
1812 ceph_mdsc_put_request(req); 1833 ceph_mdsc_put_request(req);
1813 return err; 1834 return err;
1814} 1835}
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index 669622fd1ae3..dc66c9e023e4 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -183,6 +183,8 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
183 struct ceph_inode_info *ci = ceph_inode(inode); 183 struct ceph_inode_info *ci = ceph_inode(inode);
184 struct ceph_osd_client *osdc = 184 struct ceph_osd_client *osdc =
185 &ceph_sb_to_client(inode->i_sb)->client->osdc; 185 &ceph_sb_to_client(inode->i_sb)->client->osdc;
186 struct ceph_object_locator oloc;
187 struct ceph_object_id oid;
186 u64 len = 1, olen; 188 u64 len = 1, olen;
187 u64 tmp; 189 u64 tmp;
188 struct ceph_pg pgid; 190 struct ceph_pg pgid;
@@ -211,8 +213,10 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
211 snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx", 213 snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx",
212 ceph_ino(inode), dl.object_no); 214 ceph_ino(inode), dl.object_no);
213 215
214 r = ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap, 216 oloc.pool = ceph_file_layout_pg_pool(ci->i_layout);
215 ceph_file_layout_pg_pool(ci->i_layout)); 217 ceph_oid_set_name(&oid, dl.object_name);
218
219 r = ceph_oloc_oid_to_pg(osdc->osdmap, &oloc, &oid, &pgid);
216 if (r < 0) { 220 if (r < 0) {
217 up_read(&osdc->map_sem); 221 up_read(&osdc->map_sem);
218 return r; 222 return r;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index d90861f45210..f4f050a69a48 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -63,7 +63,7 @@ static const struct ceph_connection_operations mds_con_ops;
63 */ 63 */
64static int parse_reply_info_in(void **p, void *end, 64static int parse_reply_info_in(void **p, void *end,
65 struct ceph_mds_reply_info_in *info, 65 struct ceph_mds_reply_info_in *info,
66 int features) 66 u64 features)
67{ 67{
68 int err = -EIO; 68 int err = -EIO;
69 69
@@ -98,7 +98,7 @@ bad:
98 */ 98 */
99static int parse_reply_info_trace(void **p, void *end, 99static int parse_reply_info_trace(void **p, void *end,
100 struct ceph_mds_reply_info_parsed *info, 100 struct ceph_mds_reply_info_parsed *info,
101 int features) 101 u64 features)
102{ 102{
103 int err; 103 int err;
104 104
@@ -145,7 +145,7 @@ out_bad:
145 */ 145 */
146static int parse_reply_info_dir(void **p, void *end, 146static int parse_reply_info_dir(void **p, void *end,
147 struct ceph_mds_reply_info_parsed *info, 147 struct ceph_mds_reply_info_parsed *info,
148 int features) 148 u64 features)
149{ 149{
150 u32 num, i = 0; 150 u32 num, i = 0;
151 int err; 151 int err;
@@ -217,7 +217,7 @@ out_bad:
217 */ 217 */
218static int parse_reply_info_filelock(void **p, void *end, 218static int parse_reply_info_filelock(void **p, void *end,
219 struct ceph_mds_reply_info_parsed *info, 219 struct ceph_mds_reply_info_parsed *info,
220 int features) 220 u64 features)
221{ 221{
222 if (*p + sizeof(*info->filelock_reply) > end) 222 if (*p + sizeof(*info->filelock_reply) > end)
223 goto bad; 223 goto bad;
@@ -238,7 +238,7 @@ bad:
238 */ 238 */
239static int parse_reply_info_create(void **p, void *end, 239static int parse_reply_info_create(void **p, void *end,
240 struct ceph_mds_reply_info_parsed *info, 240 struct ceph_mds_reply_info_parsed *info,
241 int features) 241 u64 features)
242{ 242{
243 if (features & CEPH_FEATURE_REPLY_CREATE_INODE) { 243 if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
244 if (*p == end) { 244 if (*p == end) {
@@ -262,7 +262,7 @@ bad:
262 */ 262 */
263static int parse_reply_info_extra(void **p, void *end, 263static int parse_reply_info_extra(void **p, void *end,
264 struct ceph_mds_reply_info_parsed *info, 264 struct ceph_mds_reply_info_parsed *info,
265 int features) 265 u64 features)
266{ 266{
267 if (info->head->op == CEPH_MDS_OP_GETFILELOCK) 267 if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
268 return parse_reply_info_filelock(p, end, info, features); 268 return parse_reply_info_filelock(p, end, info, features);
@@ -280,7 +280,7 @@ static int parse_reply_info_extra(void **p, void *end,
280 */ 280 */
281static int parse_reply_info(struct ceph_msg *msg, 281static int parse_reply_info(struct ceph_msg *msg,
282 struct ceph_mds_reply_info_parsed *info, 282 struct ceph_mds_reply_info_parsed *info,
283 int features) 283 u64 features)
284{ 284{
285 void *p, *end; 285 void *p, *end;
286 u32 len; 286 u32 len;
@@ -713,14 +713,15 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
713 struct dentry *dn = get_nonsnap_parent(parent); 713 struct dentry *dn = get_nonsnap_parent(parent);
714 inode = dn->d_inode; 714 inode = dn->d_inode;
715 dout("__choose_mds using nonsnap parent %p\n", inode); 715 dout("__choose_mds using nonsnap parent %p\n", inode);
716 } else if (req->r_dentry->d_inode) { 716 } else {
717 /* dentry target */ 717 /* dentry target */
718 inode = req->r_dentry->d_inode; 718 inode = req->r_dentry->d_inode;
719 } else { 719 if (!inode || mode == USE_AUTH_MDS) {
720 /* dir + name */ 720 /* dir + name */
721 inode = dir; 721 inode = dir;
722 hash = ceph_dentry_hash(dir, req->r_dentry); 722 hash = ceph_dentry_hash(dir, req->r_dentry);
723 is_hash = true; 723 is_hash = true;
724 }
724 } 725 }
725 } 726 }
726 727
@@ -846,35 +847,56 @@ static int __open_session(struct ceph_mds_client *mdsc,
846 * 847 *
847 * called under mdsc->mutex 848 * called under mdsc->mutex
848 */ 849 */
850static struct ceph_mds_session *
851__open_export_target_session(struct ceph_mds_client *mdsc, int target)
852{
853 struct ceph_mds_session *session;
854
855 session = __ceph_lookup_mds_session(mdsc, target);
856 if (!session) {
857 session = register_session(mdsc, target);
858 if (IS_ERR(session))
859 return session;
860 }
861 if (session->s_state == CEPH_MDS_SESSION_NEW ||
862 session->s_state == CEPH_MDS_SESSION_CLOSING)
863 __open_session(mdsc, session);
864
865 return session;
866}
867
868struct ceph_mds_session *
869ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
870{
871 struct ceph_mds_session *session;
872
873 dout("open_export_target_session to mds%d\n", target);
874
875 mutex_lock(&mdsc->mutex);
876 session = __open_export_target_session(mdsc, target);
877 mutex_unlock(&mdsc->mutex);
878
879 return session;
880}
881
849static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 882static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
850 struct ceph_mds_session *session) 883 struct ceph_mds_session *session)
851{ 884{
852 struct ceph_mds_info *mi; 885 struct ceph_mds_info *mi;
853 struct ceph_mds_session *ts; 886 struct ceph_mds_session *ts;
854 int i, mds = session->s_mds; 887 int i, mds = session->s_mds;
855 int target;
856 888
857 if (mds >= mdsc->mdsmap->m_max_mds) 889 if (mds >= mdsc->mdsmap->m_max_mds)
858 return; 890 return;
891
859 mi = &mdsc->mdsmap->m_info[mds]; 892 mi = &mdsc->mdsmap->m_info[mds];
860 dout("open_export_target_sessions for mds%d (%d targets)\n", 893 dout("open_export_target_sessions for mds%d (%d targets)\n",
861 session->s_mds, mi->num_export_targets); 894 session->s_mds, mi->num_export_targets);
862 895
863 for (i = 0; i < mi->num_export_targets; i++) { 896 for (i = 0; i < mi->num_export_targets; i++) {
864 target = mi->export_targets[i]; 897 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
865 ts = __ceph_lookup_mds_session(mdsc, target); 898 if (!IS_ERR(ts))
866 if (!ts) { 899 ceph_put_mds_session(ts);
867 ts = register_session(mdsc, target);
868 if (IS_ERR(ts))
869 return;
870 }
871 if (session->s_state == CEPH_MDS_SESSION_NEW ||
872 session->s_state == CEPH_MDS_SESSION_CLOSING)
873 __open_session(mdsc, session);
874 else
875 dout(" mds%d target mds%d %p is %s\n", session->s_mds,
876 i, ts, session_state_name(ts->s_state));
877 ceph_put_mds_session(ts);
878 } 900 }
879} 901}
880 902
@@ -1136,6 +1158,21 @@ static int send_renew_caps(struct ceph_mds_client *mdsc,
1136 return 0; 1158 return 0;
1137} 1159}
1138 1160
1161static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1162 struct ceph_mds_session *session, u64 seq)
1163{
1164 struct ceph_msg *msg;
1165
1166 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1167 session->s_mds, session_state_name(session->s_state), seq);
1168 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1169 if (!msg)
1170 return -ENOMEM;
1171 ceph_con_send(&session->s_con, msg);
1172 return 0;
1173}
1174
1175
1139/* 1176/*
1140 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1177 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1141 * 1178 *
@@ -1214,7 +1251,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1214{ 1251{
1215 struct ceph_mds_session *session = arg; 1252 struct ceph_mds_session *session = arg;
1216 struct ceph_inode_info *ci = ceph_inode(inode); 1253 struct ceph_inode_info *ci = ceph_inode(inode);
1217 int used, oissued, mine; 1254 int used, wanted, oissued, mine;
1218 1255
1219 if (session->s_trim_caps <= 0) 1256 if (session->s_trim_caps <= 0)
1220 return -1; 1257 return -1;
@@ -1222,14 +1259,19 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1222 spin_lock(&ci->i_ceph_lock); 1259 spin_lock(&ci->i_ceph_lock);
1223 mine = cap->issued | cap->implemented; 1260 mine = cap->issued | cap->implemented;
1224 used = __ceph_caps_used(ci); 1261 used = __ceph_caps_used(ci);
1262 wanted = __ceph_caps_file_wanted(ci);
1225 oissued = __ceph_caps_issued_other(ci, cap); 1263 oissued = __ceph_caps_issued_other(ci, cap);
1226 1264
1227 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n", 1265 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1228 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1266 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1229 ceph_cap_string(used)); 1267 ceph_cap_string(used), ceph_cap_string(wanted));
1230 if (ci->i_dirty_caps) 1268 if (cap == ci->i_auth_cap) {
1231 goto out; /* dirty caps */ 1269 if (ci->i_dirty_caps | ci->i_flushing_caps)
1232 if ((used & ~oissued) & mine) 1270 goto out;
1271 if ((used | wanted) & CEPH_CAP_ANY_WR)
1272 goto out;
1273 }
1274 if ((used | wanted) & ~oissued & mine)
1233 goto out; /* we need these caps */ 1275 goto out; /* we need these caps */
1234 1276
1235 session->s_trim_caps--; 1277 session->s_trim_caps--;
@@ -2156,26 +2198,16 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2156 */ 2198 */
2157 if (result == -ESTALE) { 2199 if (result == -ESTALE) {
2158 dout("got ESTALE on request %llu", req->r_tid); 2200 dout("got ESTALE on request %llu", req->r_tid);
2159 if (!req->r_inode) { 2201 if (req->r_direct_mode != USE_AUTH_MDS) {
2160 /* do nothing; not an authority problem */
2161 } else if (req->r_direct_mode != USE_AUTH_MDS) {
2162 dout("not using auth, setting for that now"); 2202 dout("not using auth, setting for that now");
2163 req->r_direct_mode = USE_AUTH_MDS; 2203 req->r_direct_mode = USE_AUTH_MDS;
2164 __do_request(mdsc, req); 2204 __do_request(mdsc, req);
2165 mutex_unlock(&mdsc->mutex); 2205 mutex_unlock(&mdsc->mutex);
2166 goto out; 2206 goto out;
2167 } else { 2207 } else {
2168 struct ceph_inode_info *ci = ceph_inode(req->r_inode); 2208 int mds = __choose_mds(mdsc, req);
2169 struct ceph_cap *cap = NULL; 2209 if (mds >= 0 && mds != req->r_session->s_mds) {
2170 2210 dout("but auth changed, so resending");
2171 if (req->r_session)
2172 cap = ceph_get_cap_for_mds(ci,
2173 req->r_session->s_mds);
2174
2175 dout("already using auth");
2176 if ((!cap || cap != ci->i_auth_cap) ||
2177 (cap->mseq != req->r_sent_on_mseq)) {
2178 dout("but cap changed, so resending");
2179 __do_request(mdsc, req); 2211 __do_request(mdsc, req);
2180 mutex_unlock(&mdsc->mutex); 2212 mutex_unlock(&mdsc->mutex);
2181 goto out; 2213 goto out;
@@ -2400,6 +2432,10 @@ static void handle_session(struct ceph_mds_session *session,
2400 trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 2432 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2401 break; 2433 break;
2402 2434
2435 case CEPH_SESSION_FLUSHMSG:
2436 send_flushmsg_ack(mdsc, session, seq);
2437 break;
2438
2403 default: 2439 default:
2404 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 2440 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2405 WARN_ON(1); 2441 WARN_ON(1);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 4c053d099ae4..68288917c737 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -383,6 +383,8 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
383extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, 383extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
384 struct ceph_msg *msg); 384 struct ceph_msg *msg);
385 385
386extern struct ceph_mds_session *
387ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target);
386extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 388extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
387 struct ceph_mds_session *session); 389 struct ceph_mds_session *session);
388 390
diff --git a/fs/ceph/strings.c b/fs/ceph/strings.c
index 89fa4a940a0f..4440f447fd3f 100644
--- a/fs/ceph/strings.c
+++ b/fs/ceph/strings.c
@@ -41,6 +41,8 @@ const char *ceph_session_op_name(int op)
41 case CEPH_SESSION_RENEWCAPS: return "renewcaps"; 41 case CEPH_SESSION_RENEWCAPS: return "renewcaps";
42 case CEPH_SESSION_STALE: return "stale"; 42 case CEPH_SESSION_STALE: return "stale";
43 case CEPH_SESSION_RECALL_STATE: return "recall_state"; 43 case CEPH_SESSION_RECALL_STATE: return "recall_state";
44 case CEPH_SESSION_FLUSHMSG: return "flushmsg";
45 case CEPH_SESSION_FLUSHMSG_ACK: return "flushmsg_ack";
44 } 46 }
45 return "???"; 47 return "???";
46} 48}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 6a0951e43044..2df963f1cf5a 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -490,10 +490,10 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
490 struct ceph_options *opt) 490 struct ceph_options *opt)
491{ 491{
492 struct ceph_fs_client *fsc; 492 struct ceph_fs_client *fsc;
493 const unsigned supported_features = 493 const u64 supported_features =
494 CEPH_FEATURE_FLOCK | 494 CEPH_FEATURE_FLOCK |
495 CEPH_FEATURE_DIRLAYOUTHASH; 495 CEPH_FEATURE_DIRLAYOUTHASH;
496 const unsigned required_features = 0; 496 const u64 required_features = 0;
497 int page_count; 497 int page_count;
498 size_t size; 498 size_t size;
499 int err = -ENOMEM; 499 int err = -ENOMEM;
@@ -686,6 +686,7 @@ static const struct super_operations ceph_super_ops = {
686 .alloc_inode = ceph_alloc_inode, 686 .alloc_inode = ceph_alloc_inode,
687 .destroy_inode = ceph_destroy_inode, 687 .destroy_inode = ceph_destroy_inode,
688 .write_inode = ceph_write_inode, 688 .write_inode = ceph_write_inode,
689 .drop_inode = ceph_drop_inode,
689 .sync_fs = ceph_sync_fs, 690 .sync_fs = ceph_sync_fs,
690 .put_super = ceph_put_super, 691 .put_super = ceph_put_super,
691 .show_options = ceph_show_options, 692 .show_options = ceph_show_options,
@@ -818,7 +819,11 @@ static int ceph_set_super(struct super_block *s, void *data)
818 819
819 s->s_flags = fsc->mount_options->sb_flags; 820 s->s_flags = fsc->mount_options->sb_flags;
820 s->s_maxbytes = 1ULL << 40; /* temp value until we get mdsmap */ 821 s->s_maxbytes = 1ULL << 40; /* temp value until we get mdsmap */
822#ifdef CONFIG_CEPH_FS_POSIX_ACL
823 s->s_flags |= MS_POSIXACL;
824#endif
821 825
826 s->s_xattr = ceph_xattr_handlers;
822 s->s_fs_info = fsc; 827 s->s_fs_info = fsc;
823 fsc->sb = s; 828 fsc->sb = s;
824 829
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index ef4ac38bb614..c299f7d19bf3 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -287,14 +287,12 @@ struct ceph_inode_info {
287 unsigned long i_hold_caps_min; /* jiffies */ 287 unsigned long i_hold_caps_min; /* jiffies */
288 unsigned long i_hold_caps_max; /* jiffies */ 288 unsigned long i_hold_caps_max; /* jiffies */
289 struct list_head i_cap_delay_list; /* for delayed cap release to mds */ 289 struct list_head i_cap_delay_list; /* for delayed cap release to mds */
290 int i_cap_exporting_mds; /* to handle cap migration between */
291 unsigned i_cap_exporting_mseq; /* mds's. */
292 unsigned i_cap_exporting_issued;
293 struct ceph_cap_reservation i_cap_migration_resv; 290 struct ceph_cap_reservation i_cap_migration_resv;
294 struct list_head i_cap_snaps; /* snapped state pending flush to mds */ 291 struct list_head i_cap_snaps; /* snapped state pending flush to mds */
295 struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 or 292 struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 or
296 dirty|flushing caps */ 293 dirty|flushing caps */
297 unsigned i_snap_caps; /* cap bits for snapped files */ 294 unsigned i_snap_caps; /* cap bits for snapped files */
295 unsigned i_cap_exporting_issued;
298 296
299 int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */ 297 int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */
300 298
@@ -335,7 +333,6 @@ struct ceph_inode_info {
335 u32 i_fscache_gen; /* sequence, for delayed fscache validate */ 333 u32 i_fscache_gen; /* sequence, for delayed fscache validate */
336 struct work_struct i_revalidate_work; 334 struct work_struct i_revalidate_work;
337#endif 335#endif
338
339 struct inode vfs_inode; /* at end */ 336 struct inode vfs_inode; /* at end */
340}; 337};
341 338
@@ -529,6 +526,8 @@ static inline int __ceph_caps_dirty(struct ceph_inode_info *ci)
529} 526}
530extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask); 527extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask);
531 528
529extern int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
530 struct ceph_cap *ocap, int mask);
532extern int ceph_caps_revoking(struct ceph_inode_info *ci, int mask); 531extern int ceph_caps_revoking(struct ceph_inode_info *ci, int mask);
533extern int __ceph_caps_used(struct ceph_inode_info *ci); 532extern int __ceph_caps_used(struct ceph_inode_info *ci);
534 533
@@ -691,6 +690,7 @@ extern const struct inode_operations ceph_file_iops;
691 690
692extern struct inode *ceph_alloc_inode(struct super_block *sb); 691extern struct inode *ceph_alloc_inode(struct super_block *sb);
693extern void ceph_destroy_inode(struct inode *inode); 692extern void ceph_destroy_inode(struct inode *inode);
693extern int ceph_drop_inode(struct inode *inode);
694 694
695extern struct inode *ceph_get_inode(struct super_block *sb, 695extern struct inode *ceph_get_inode(struct super_block *sb,
696 struct ceph_vino vino); 696 struct ceph_vino vino);
@@ -724,6 +724,9 @@ extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
724/* xattr.c */ 724/* xattr.c */
725extern int ceph_setxattr(struct dentry *, const char *, const void *, 725extern int ceph_setxattr(struct dentry *, const char *, const void *,
726 size_t, int); 726 size_t, int);
727int __ceph_setxattr(struct dentry *, const char *, const void *, size_t, int);
728ssize_t __ceph_getxattr(struct inode *, const char *, void *, size_t);
729int __ceph_removexattr(struct dentry *, const char *);
727extern ssize_t ceph_getxattr(struct dentry *, const char *, void *, size_t); 730extern ssize_t ceph_getxattr(struct dentry *, const char *, void *, size_t);
728extern ssize_t ceph_listxattr(struct dentry *, char *, size_t); 731extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
729extern int ceph_removexattr(struct dentry *, const char *); 732extern int ceph_removexattr(struct dentry *, const char *);
@@ -732,6 +735,39 @@ extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci);
732extern void __init ceph_xattr_init(void); 735extern void __init ceph_xattr_init(void);
733extern void ceph_xattr_exit(void); 736extern void ceph_xattr_exit(void);
734 737
738/* acl.c */
739extern const struct xattr_handler ceph_xattr_acl_access_handler;
740extern const struct xattr_handler ceph_xattr_acl_default_handler;
741extern const struct xattr_handler *ceph_xattr_handlers[];
742
743#ifdef CONFIG_CEPH_FS_POSIX_ACL
744
745struct posix_acl *ceph_get_acl(struct inode *, int);
746int ceph_init_acl(struct dentry *, struct inode *, struct inode *);
747int ceph_acl_chmod(struct dentry *, struct inode *);
748void ceph_forget_all_cached_acls(struct inode *inode);
749
750#else
751
752#define ceph_get_acl NULL
753
754static inline int ceph_init_acl(struct dentry *dentry, struct inode *inode,
755 struct inode *dir)
756{
757 return 0;
758}
759
760static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode)
761{
762 return 0;
763}
764
765static inline void ceph_forget_all_cached_acls(struct inode *inode)
766{
767}
768
769#endif
770
735/* caps.c */ 771/* caps.c */
736extern const char *ceph_cap_string(int c); 772extern const char *ceph_cap_string(int c);
737extern void ceph_handle_caps(struct ceph_mds_session *session, 773extern void ceph_handle_caps(struct ceph_mds_session *session,
@@ -744,6 +780,7 @@ extern int ceph_add_cap(struct inode *inode,
744extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release); 780extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
745extern void ceph_put_cap(struct ceph_mds_client *mdsc, 781extern void ceph_put_cap(struct ceph_mds_client *mdsc,
746 struct ceph_cap *cap); 782 struct ceph_cap *cap);
783extern int ceph_is_any_caps(struct inode *inode);
747 784
748extern void __queue_cap_release(struct ceph_mds_session *session, u64 ino, 785extern void __queue_cap_release(struct ceph_mds_session *session, u64 ino,
749 u64 cap_id, u32 migrate_seq, u32 issue_seq); 786 u64 cap_id, u32 migrate_seq, u32 issue_seq);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index be661d8f532a..c7581f3733c1 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -11,11 +11,24 @@
11#define XATTR_CEPH_PREFIX "ceph." 11#define XATTR_CEPH_PREFIX "ceph."
12#define XATTR_CEPH_PREFIX_LEN (sizeof (XATTR_CEPH_PREFIX) - 1) 12#define XATTR_CEPH_PREFIX_LEN (sizeof (XATTR_CEPH_PREFIX) - 1)
13 13
14/*
15 * List of handlers for synthetic system.* attributes. Other
16 * attributes are handled directly.
17 */
18const struct xattr_handler *ceph_xattr_handlers[] = {
19#ifdef CONFIG_CEPH_FS_POSIX_ACL
20 &ceph_xattr_acl_access_handler,
21 &ceph_xattr_acl_default_handler,
22#endif
23 NULL,
24};
25
14static bool ceph_is_valid_xattr(const char *name) 26static bool ceph_is_valid_xattr(const char *name)
15{ 27{
16 return !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) || 28 return !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) ||
17 !strncmp(name, XATTR_SECURITY_PREFIX, 29 !strncmp(name, XATTR_SECURITY_PREFIX,
18 XATTR_SECURITY_PREFIX_LEN) || 30 XATTR_SECURITY_PREFIX_LEN) ||
31 !strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN) ||
19 !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) || 32 !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) ||
20 !strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN); 33 !strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN);
21} 34}
@@ -663,10 +676,9 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
663 } 676 }
664} 677}
665 678
666ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, 679ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
667 size_t size) 680 size_t size)
668{ 681{
669 struct inode *inode = dentry->d_inode;
670 struct ceph_inode_info *ci = ceph_inode(inode); 682 struct ceph_inode_info *ci = ceph_inode(inode);
671 int err; 683 int err;
672 struct ceph_inode_xattr *xattr; 684 struct ceph_inode_xattr *xattr;
@@ -675,7 +687,6 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
675 if (!ceph_is_valid_xattr(name)) 687 if (!ceph_is_valid_xattr(name))
676 return -ENODATA; 688 return -ENODATA;
677 689
678
679 /* let's see if a virtual xattr was requested */ 690 /* let's see if a virtual xattr was requested */
680 vxattr = ceph_match_vxattr(inode, name); 691 vxattr = ceph_match_vxattr(inode, name);
681 if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) { 692 if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) {
@@ -725,6 +736,15 @@ out:
725 return err; 736 return err;
726} 737}
727 738
739ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
740 size_t size)
741{
742 if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
743 return generic_getxattr(dentry, name, value, size);
744
745 return __ceph_getxattr(dentry->d_inode, name, value, size);
746}
747
728ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size) 748ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
729{ 749{
730 struct inode *inode = dentry->d_inode; 750 struct inode *inode = dentry->d_inode;
@@ -863,8 +883,8 @@ out:
863 return err; 883 return err;
864} 884}
865 885
866int ceph_setxattr(struct dentry *dentry, const char *name, 886int __ceph_setxattr(struct dentry *dentry, const char *name,
867 const void *value, size_t size, int flags) 887 const void *value, size_t size, int flags)
868{ 888{
869 struct inode *inode = dentry->d_inode; 889 struct inode *inode = dentry->d_inode;
870 struct ceph_vxattr *vxattr; 890 struct ceph_vxattr *vxattr;
@@ -879,9 +899,6 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
879 struct ceph_inode_xattr *xattr = NULL; 899 struct ceph_inode_xattr *xattr = NULL;
880 int required_blob_size; 900 int required_blob_size;
881 901
882 if (ceph_snap(inode) != CEPH_NOSNAP)
883 return -EROFS;
884
885 if (!ceph_is_valid_xattr(name)) 902 if (!ceph_is_valid_xattr(name))
886 return -EOPNOTSUPP; 903 return -EOPNOTSUPP;
887 904
@@ -958,6 +975,18 @@ out:
958 return err; 975 return err;
959} 976}
960 977
978int ceph_setxattr(struct dentry *dentry, const char *name,
979 const void *value, size_t size, int flags)
980{
981 if (ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
982 return -EROFS;
983
984 if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
985 return generic_setxattr(dentry, name, value, size, flags);
986
987 return __ceph_setxattr(dentry, name, value, size, flags);
988}
989
961static int ceph_send_removexattr(struct dentry *dentry, const char *name) 990static int ceph_send_removexattr(struct dentry *dentry, const char *name)
962{ 991{
963 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 992 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
@@ -984,7 +1013,7 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name)
984 return err; 1013 return err;
985} 1014}
986 1015
987int ceph_removexattr(struct dentry *dentry, const char *name) 1016int __ceph_removexattr(struct dentry *dentry, const char *name)
988{ 1017{
989 struct inode *inode = dentry->d_inode; 1018 struct inode *inode = dentry->d_inode;
990 struct ceph_vxattr *vxattr; 1019 struct ceph_vxattr *vxattr;
@@ -994,9 +1023,6 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
994 int required_blob_size; 1023 int required_blob_size;
995 int dirty; 1024 int dirty;
996 1025
997 if (ceph_snap(inode) != CEPH_NOSNAP)
998 return -EROFS;
999
1000 if (!ceph_is_valid_xattr(name)) 1026 if (!ceph_is_valid_xattr(name))
1001 return -EOPNOTSUPP; 1027 return -EOPNOTSUPP;
1002 1028
@@ -1053,3 +1079,13 @@ out:
1053 return err; 1079 return err;
1054} 1080}
1055 1081
1082int ceph_removexattr(struct dentry *dentry, const char *name)
1083{
1084 if (ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
1085 return -EROFS;
1086
1087 if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
1088 return generic_removexattr(dentry, name);
1089
1090 return __ceph_removexattr(dentry, name);
1091}
diff --git a/include/linux/ceph/buffer.h b/include/linux/ceph/buffer.h
index 58d19014068f..07ad423cc37f 100644
--- a/include/linux/ceph/buffer.h
+++ b/include/linux/ceph/buffer.h
@@ -17,7 +17,6 @@ struct ceph_buffer {
17 struct kref kref; 17 struct kref kref;
18 struct kvec vec; 18 struct kvec vec;
19 size_t alloc_len; 19 size_t alloc_len;
20 bool is_vmalloc;
21}; 20};
22 21
23extern struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp); 22extern struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp);
diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h
index 4c42080347af..138448f766b4 100644
--- a/include/linux/ceph/ceph_features.h
+++ b/include/linux/ceph/ceph_features.h
@@ -4,42 +4,73 @@
4/* 4/*
5 * feature bits 5 * feature bits
6 */ 6 */
7#define CEPH_FEATURE_UID (1<<0) 7#define CEPH_FEATURE_UID (1ULL<<0)
8#define CEPH_FEATURE_NOSRCADDR (1<<1) 8#define CEPH_FEATURE_NOSRCADDR (1ULL<<1)
9#define CEPH_FEATURE_MONCLOCKCHECK (1<<2) 9#define CEPH_FEATURE_MONCLOCKCHECK (1ULL<<2)
10#define CEPH_FEATURE_FLOCK (1<<3) 10#define CEPH_FEATURE_FLOCK (1ULL<<3)
11#define CEPH_FEATURE_SUBSCRIBE2 (1<<4) 11#define CEPH_FEATURE_SUBSCRIBE2 (1ULL<<4)
12#define CEPH_FEATURE_MONNAMES (1<<5) 12#define CEPH_FEATURE_MONNAMES (1ULL<<5)
13#define CEPH_FEATURE_RECONNECT_SEQ (1<<6) 13#define CEPH_FEATURE_RECONNECT_SEQ (1ULL<<6)
14#define CEPH_FEATURE_DIRLAYOUTHASH (1<<7) 14#define CEPH_FEATURE_DIRLAYOUTHASH (1ULL<<7)
15#define CEPH_FEATURE_OBJECTLOCATOR (1<<8) 15#define CEPH_FEATURE_OBJECTLOCATOR (1ULL<<8)
16#define CEPH_FEATURE_PGID64 (1<<9) 16#define CEPH_FEATURE_PGID64 (1ULL<<9)
17#define CEPH_FEATURE_INCSUBOSDMAP (1<<10) 17#define CEPH_FEATURE_INCSUBOSDMAP (1ULL<<10)
18#define CEPH_FEATURE_PGPOOL3 (1<<11) 18#define CEPH_FEATURE_PGPOOL3 (1ULL<<11)
19#define CEPH_FEATURE_OSDREPLYMUX (1<<12) 19#define CEPH_FEATURE_OSDREPLYMUX (1ULL<<12)
20#define CEPH_FEATURE_OSDENC (1<<13) 20#define CEPH_FEATURE_OSDENC (1ULL<<13)
21#define CEPH_FEATURE_OMAP (1<<14) 21#define CEPH_FEATURE_OMAP (1ULL<<14)
22#define CEPH_FEATURE_MONENC (1<<15) 22#define CEPH_FEATURE_MONENC (1ULL<<15)
23#define CEPH_FEATURE_QUERY_T (1<<16) 23#define CEPH_FEATURE_QUERY_T (1ULL<<16)
24#define CEPH_FEATURE_INDEP_PG_MAP (1<<17) 24#define CEPH_FEATURE_INDEP_PG_MAP (1ULL<<17)
25#define CEPH_FEATURE_CRUSH_TUNABLES (1<<18) 25#define CEPH_FEATURE_CRUSH_TUNABLES (1ULL<<18)
26#define CEPH_FEATURE_CHUNKY_SCRUB (1<<19) 26#define CEPH_FEATURE_CHUNKY_SCRUB (1ULL<<19)
27#define CEPH_FEATURE_MON_NULLROUTE (1<<20) 27#define CEPH_FEATURE_MON_NULLROUTE (1ULL<<20)
28#define CEPH_FEATURE_MON_GV (1<<21) 28#define CEPH_FEATURE_MON_GV (1ULL<<21)
29#define CEPH_FEATURE_BACKFILL_RESERVATION (1<<22) 29#define CEPH_FEATURE_BACKFILL_RESERVATION (1ULL<<22)
30#define CEPH_FEATURE_MSG_AUTH (1<<23) 30#define CEPH_FEATURE_MSG_AUTH (1ULL<<23)
31#define CEPH_FEATURE_RECOVERY_RESERVATION (1<<24) 31#define CEPH_FEATURE_RECOVERY_RESERVATION (1ULL<<24)
32#define CEPH_FEATURE_CRUSH_TUNABLES2 (1<<25) 32#define CEPH_FEATURE_CRUSH_TUNABLES2 (1ULL<<25)
33#define CEPH_FEATURE_CREATEPOOLID (1<<26) 33#define CEPH_FEATURE_CREATEPOOLID (1ULL<<26)
34#define CEPH_FEATURE_REPLY_CREATE_INODE (1<<27) 34#define CEPH_FEATURE_REPLY_CREATE_INODE (1ULL<<27)
35#define CEPH_FEATURE_OSD_HBMSGS (1<<28) 35#define CEPH_FEATURE_OSD_HBMSGS (1ULL<<28)
36#define CEPH_FEATURE_MDSENC (1<<29) 36#define CEPH_FEATURE_MDSENC (1ULL<<29)
37#define CEPH_FEATURE_OSDHASHPSPOOL (1<<30) 37#define CEPH_FEATURE_OSDHASHPSPOOL (1ULL<<30)
38#define CEPH_FEATURE_MON_SINGLE_PAXOS (1ULL<<31)
39#define CEPH_FEATURE_OSD_SNAPMAPPER (1ULL<<32)
40#define CEPH_FEATURE_MON_SCRUB (1ULL<<33)
41#define CEPH_FEATURE_OSD_PACKED_RECOVERY (1ULL<<34)
42#define CEPH_FEATURE_OSD_CACHEPOOL (1ULL<<35)
43#define CEPH_FEATURE_CRUSH_V2 (1ULL<<36) /* new indep; SET_* steps */
44#define CEPH_FEATURE_EXPORT_PEER (1ULL<<37)
45#define CEPH_FEATURE_OSD_ERASURE_CODES (1ULL<<38)
46
47/*
48 * The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature
49 * vector to evaluate to 64 bit ~0. To cope, we designate 1ULL << 63
50 * to mean 33 bit ~0, and introduce a helper below to do the
51 * translation.
52 *
53 * This was introduced by ceph.git commit
54 * 9ea02b84104045c2ffd7e7f4e7af512953855ecd v0.58-657-g9ea02b8
55 * and fixed by ceph.git commit
56 * 4255b5c2fb54ae40c53284b3ab700fdfc7e61748 v0.65-263-g4255b5c
57 */
58#define CEPH_FEATURE_RESERVED (1ULL<<63)
59
60static inline u64 ceph_sanitize_features(u64 features)
61{
62 if (features & CEPH_FEATURE_RESERVED) {
63 /* everything through OSD_SNAPMAPPER */
64 return 0x1ffffffffull;
65 } else {
66 return features;
67 }
68}
38 69
39/* 70/*
40 * Features supported. 71 * Features supported.
41 */ 72 */
42#define CEPH_FEATURES_SUPPORTED_DEFAULT \ 73#define CEPH_FEATURES_SUPPORTED_DEFAULT \
43 (CEPH_FEATURE_NOSRCADDR | \ 74 (CEPH_FEATURE_NOSRCADDR | \
44 CEPH_FEATURE_RECONNECT_SEQ | \ 75 CEPH_FEATURE_RECONNECT_SEQ | \
45 CEPH_FEATURE_PGID64 | \ 76 CEPH_FEATURE_PGID64 | \
@@ -48,7 +79,10 @@
48 CEPH_FEATURE_CRUSH_TUNABLES | \ 79 CEPH_FEATURE_CRUSH_TUNABLES | \
49 CEPH_FEATURE_CRUSH_TUNABLES2 | \ 80 CEPH_FEATURE_CRUSH_TUNABLES2 | \
50 CEPH_FEATURE_REPLY_CREATE_INODE | \ 81 CEPH_FEATURE_REPLY_CREATE_INODE | \
51 CEPH_FEATURE_OSDHASHPSPOOL) 82 CEPH_FEATURE_OSDHASHPSPOOL | \
83 CEPH_FEATURE_OSD_CACHEPOOL | \
84 CEPH_FEATURE_CRUSH_V2 | \
85 CEPH_FEATURE_EXPORT_PEER)
52 86
53#define CEPH_FEATURES_REQUIRED_DEFAULT \ 87#define CEPH_FEATURES_REQUIRED_DEFAULT \
54 (CEPH_FEATURE_NOSRCADDR | \ 88 (CEPH_FEATURE_NOSRCADDR | \
@@ -56,4 +90,5 @@
56 CEPH_FEATURE_PGID64 | \ 90 CEPH_FEATURE_PGID64 | \
57 CEPH_FEATURE_PGPOOL3 | \ 91 CEPH_FEATURE_PGPOOL3 | \
58 CEPH_FEATURE_OSDENC) 92 CEPH_FEATURE_OSDENC)
93
59#endif 94#endif
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index 2ad7b860f062..2623cffc73a1 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -53,6 +53,29 @@ struct ceph_file_layout {
53 __le32 fl_pg_pool; /* namespace, crush ruleset, rep level */ 53 __le32 fl_pg_pool; /* namespace, crush ruleset, rep level */
54} __attribute__ ((packed)); 54} __attribute__ ((packed));
55 55
56#define ceph_file_layout_su(l) ((__s32)le32_to_cpu((l).fl_stripe_unit))
57#define ceph_file_layout_stripe_count(l) \
58 ((__s32)le32_to_cpu((l).fl_stripe_count))
59#define ceph_file_layout_object_size(l) ((__s32)le32_to_cpu((l).fl_object_size))
60#define ceph_file_layout_cas_hash(l) ((__s32)le32_to_cpu((l).fl_cas_hash))
61#define ceph_file_layout_object_su(l) \
62 ((__s32)le32_to_cpu((l).fl_object_stripe_unit))
63#define ceph_file_layout_pg_pool(l) \
64 ((__s32)le32_to_cpu((l).fl_pg_pool))
65
66static inline unsigned ceph_file_layout_stripe_width(struct ceph_file_layout *l)
67{
68 return le32_to_cpu(l->fl_stripe_unit) *
69 le32_to_cpu(l->fl_stripe_count);
70}
71
72/* "period" == bytes before i start on a new set of objects */
73static inline unsigned ceph_file_layout_period(struct ceph_file_layout *l)
74{
75 return le32_to_cpu(l->fl_object_size) *
76 le32_to_cpu(l->fl_stripe_count);
77}
78
56#define CEPH_MIN_STRIPE_UNIT 65536 79#define CEPH_MIN_STRIPE_UNIT 65536
57 80
58int ceph_file_layout_is_valid(const struct ceph_file_layout *layout); 81int ceph_file_layout_is_valid(const struct ceph_file_layout *layout);
@@ -282,6 +305,8 @@ enum {
282 CEPH_SESSION_RENEWCAPS, 305 CEPH_SESSION_RENEWCAPS,
283 CEPH_SESSION_STALE, 306 CEPH_SESSION_STALE,
284 CEPH_SESSION_RECALL_STATE, 307 CEPH_SESSION_RECALL_STATE,
308 CEPH_SESSION_FLUSHMSG,
309 CEPH_SESSION_FLUSHMSG_ACK,
285}; 310};
286 311
287extern const char *ceph_session_op_name(int op); 312extern const char *ceph_session_op_name(int op);
@@ -457,7 +482,8 @@ struct ceph_mds_reply_cap {
457 __u8 flags; /* CEPH_CAP_FLAG_* */ 482 __u8 flags; /* CEPH_CAP_FLAG_* */
458} __attribute__ ((packed)); 483} __attribute__ ((packed));
459 484
460#define CEPH_CAP_FLAG_AUTH 1 /* cap is issued by auth mds */ 485#define CEPH_CAP_FLAG_AUTH (1 << 0) /* cap is issued by auth mds */
486#define CEPH_CAP_FLAG_RELEASE (1 << 1) /* release the cap */
461 487
462/* inode record, for bundling with mds reply */ 488/* inode record, for bundling with mds reply */
463struct ceph_mds_reply_inode { 489struct ceph_mds_reply_inode {
@@ -658,6 +684,14 @@ struct ceph_mds_caps {
658 __le32 time_warp_seq; 684 __le32 time_warp_seq;
659} __attribute__ ((packed)); 685} __attribute__ ((packed));
660 686
687struct ceph_mds_cap_peer {
688 __le64 cap_id;
689 __le32 seq;
690 __le32 mseq;
691 __le32 mds;
692 __u8 flags;
693} __attribute__ ((packed));
694
661/* cap release msg head */ 695/* cap release msg head */
662struct ceph_mds_cap_release { 696struct ceph_mds_cap_release {
663 __le32 num; /* number of cap_items that follow */ 697 __le32 num; /* number of cap_items that follow */
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 2e3024881a5e..2f49aa4c4f7f 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -122,8 +122,8 @@ struct ceph_client {
122 122
123 int (*extra_mon_dispatch)(struct ceph_client *, struct ceph_msg *); 123 int (*extra_mon_dispatch)(struct ceph_client *, struct ceph_msg *);
124 124
125 u32 supported_features; 125 u64 supported_features;
126 u32 required_features; 126 u64 required_features;
127 127
128 struct ceph_messenger msgr; /* messenger instance */ 128 struct ceph_messenger msgr; /* messenger instance */
129 struct ceph_mon_client monc; 129 struct ceph_mon_client monc;
@@ -173,15 +173,18 @@ static inline int calc_pages_for(u64 off, u64 len)
173 (off >> PAGE_CACHE_SHIFT); 173 (off >> PAGE_CACHE_SHIFT);
174} 174}
175 175
176extern struct kmem_cache *ceph_inode_cachep;
177extern struct kmem_cache *ceph_cap_cachep;
178extern struct kmem_cache *ceph_dentry_cachep;
179extern struct kmem_cache *ceph_file_cachep;
180
176/* ceph_common.c */ 181/* ceph_common.c */
177extern bool libceph_compatible(void *data); 182extern bool libceph_compatible(void *data);
178 183
179extern const char *ceph_msg_type_name(int type); 184extern const char *ceph_msg_type_name(int type);
180extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid); 185extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid);
181extern struct kmem_cache *ceph_inode_cachep; 186extern void *ceph_kvmalloc(size_t size, gfp_t flags);
182extern struct kmem_cache *ceph_cap_cachep; 187extern void ceph_kvfree(const void *ptr);
183extern struct kmem_cache *ceph_dentry_cachep;
184extern struct kmem_cache *ceph_file_cachep;
185 188
186extern struct ceph_options *ceph_parse_options(char *options, 189extern struct ceph_options *ceph_parse_options(char *options,
187 const char *dev_name, const char *dev_name_end, 190 const char *dev_name, const char *dev_name_end,
@@ -192,8 +195,8 @@ extern int ceph_compare_options(struct ceph_options *new_opt,
192 struct ceph_client *client); 195 struct ceph_client *client);
193extern struct ceph_client *ceph_create_client(struct ceph_options *opt, 196extern struct ceph_client *ceph_create_client(struct ceph_options *opt,
194 void *private, 197 void *private,
195 unsigned supported_features, 198 u64 supported_features,
196 unsigned required_features); 199 u64 required_features);
197extern u64 ceph_client_id(struct ceph_client *client); 200extern u64 ceph_client_id(struct ceph_client *client);
198extern void ceph_destroy_client(struct ceph_client *client); 201extern void ceph_destroy_client(struct ceph_client *client);
199extern int __ceph_open_session(struct ceph_client *client, 202extern int __ceph_open_session(struct ceph_client *client,
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 7c1420bb1dce..20ee8b63a968 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -60,8 +60,8 @@ struct ceph_messenger {
60 u32 global_seq; 60 u32 global_seq;
61 spinlock_t global_seq_lock; 61 spinlock_t global_seq_lock;
62 62
63 u32 supported_features; 63 u64 supported_features;
64 u32 required_features; 64 u64 required_features;
65}; 65};
66 66
67enum ceph_msg_data_type { 67enum ceph_msg_data_type {
@@ -154,10 +154,9 @@ struct ceph_msg {
154 struct list_head list_head; /* links for connection lists */ 154 struct list_head list_head; /* links for connection lists */
155 155
156 struct kref kref; 156 struct kref kref;
157 bool front_is_vmalloc;
158 bool more_to_follow; 157 bool more_to_follow;
159 bool needs_out_seq; 158 bool needs_out_seq;
160 int front_max; 159 int front_alloc_len;
161 unsigned long ack_stamp; /* tx: when we were acked */ 160 unsigned long ack_stamp; /* tx: when we were acked */
162 161
163 struct ceph_msgpool *pool; 162 struct ceph_msgpool *pool;
@@ -192,7 +191,7 @@ struct ceph_connection {
192 191
193 struct ceph_entity_name peer_name; /* peer name */ 192 struct ceph_entity_name peer_name; /* peer name */
194 193
195 unsigned peer_features; 194 u64 peer_features;
196 u32 connect_seq; /* identify the most recent connection 195 u32 connect_seq; /* identify the most recent connection
197 attempt for this connection, client */ 196 attempt for this connection, client */
198 u32 peer_global_seq; /* peer's global seq for this connection */ 197 u32 peer_global_seq; /* peer's global seq for this connection */
@@ -256,8 +255,8 @@ extern void ceph_msgr_flush(void);
256 255
257extern void ceph_messenger_init(struct ceph_messenger *msgr, 256extern void ceph_messenger_init(struct ceph_messenger *msgr,
258 struct ceph_entity_addr *myaddr, 257 struct ceph_entity_addr *myaddr,
259 u32 supported_features, 258 u64 supported_features,
260 u32 required_features, 259 u64 required_features,
261 bool nocrc); 260 bool nocrc);
262 261
263extern void ceph_con_init(struct ceph_connection *con, void *private, 262extern void ceph_con_init(struct ceph_connection *con, void *private,
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 8f47625a0661..fd47e872ebcc 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -12,12 +12,6 @@
12#include <linux/ceph/auth.h> 12#include <linux/ceph/auth.h>
13#include <linux/ceph/pagelist.h> 13#include <linux/ceph/pagelist.h>
14 14
15/*
16 * Maximum object name size
17 * (must be at least as big as RBD_MAX_MD_NAME_LEN -- currently 100)
18 */
19#define MAX_OBJ_NAME_SIZE 100
20
21struct ceph_msg; 15struct ceph_msg;
22struct ceph_snap_context; 16struct ceph_snap_context;
23struct ceph_osd_request; 17struct ceph_osd_request;
@@ -138,6 +132,7 @@ struct ceph_osd_request {
138 __le64 *r_request_pool; 132 __le64 *r_request_pool;
139 void *r_request_pgid; 133 void *r_request_pgid;
140 __le32 *r_request_attempts; 134 __le32 *r_request_attempts;
135 bool r_paused;
141 struct ceph_eversion *r_request_reassert_version; 136 struct ceph_eversion *r_request_reassert_version;
142 137
143 int r_result; 138 int r_result;
@@ -158,15 +153,21 @@ struct ceph_osd_request {
158 struct inode *r_inode; /* for use by callbacks */ 153 struct inode *r_inode; /* for use by callbacks */
159 void *r_priv; /* ditto */ 154 void *r_priv; /* ditto */
160 155
161 char r_oid[MAX_OBJ_NAME_SIZE]; /* object name */ 156 struct ceph_object_locator r_base_oloc;
162 int r_oid_len; 157 struct ceph_object_id r_base_oid;
158 struct ceph_object_locator r_target_oloc;
159 struct ceph_object_id r_target_oid;
160
163 u64 r_snapid; 161 u64 r_snapid;
164 unsigned long r_stamp; /* send OR check time */ 162 unsigned long r_stamp; /* send OR check time */
165 163
166 struct ceph_file_layout r_file_layout;
167 struct ceph_snap_context *r_snapc; /* snap context for writes */ 164 struct ceph_snap_context *r_snapc; /* snap context for writes */
168}; 165};
169 166
167struct ceph_request_redirect {
168 struct ceph_object_locator oloc;
169};
170
170struct ceph_osd_event { 171struct ceph_osd_event {
171 u64 cookie; 172 u64 cookie;
172 int one_shot; 173 int one_shot;
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index d05cc4451af6..49ff69f0746b 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -35,13 +35,26 @@ struct ceph_pg_pool_info {
35 u8 object_hash; 35 u8 object_hash;
36 u32 pg_num, pgp_num; 36 u32 pg_num, pgp_num;
37 int pg_num_mask, pgp_num_mask; 37 int pg_num_mask, pgp_num_mask;
38 s64 read_tier;
39 s64 write_tier; /* wins for read+write ops */
38 u64 flags; 40 u64 flags;
39 char *name; 41 char *name;
40}; 42};
41 43
42struct ceph_object_locator { 44struct ceph_object_locator {
43 uint64_t pool; 45 s64 pool;
44 char *key; 46};
47
48/*
49 * Maximum supported by kernel client object name length
50 *
51 * (probably outdated: must be >= RBD_MAX_MD_NAME_LEN -- currently 100)
52 */
53#define CEPH_MAX_OID_NAME_LEN 100
54
55struct ceph_object_id {
56 char name[CEPH_MAX_OID_NAME_LEN];
57 int name_len;
45}; 58};
46 59
47struct ceph_pg_mapping { 60struct ceph_pg_mapping {
@@ -73,33 +86,30 @@ struct ceph_osdmap {
73 struct crush_map *crush; 86 struct crush_map *crush;
74}; 87};
75 88
76/* 89static inline void ceph_oid_set_name(struct ceph_object_id *oid,
77 * file layout helpers 90 const char *name)
78 */
79#define ceph_file_layout_su(l) ((__s32)le32_to_cpu((l).fl_stripe_unit))
80#define ceph_file_layout_stripe_count(l) \
81 ((__s32)le32_to_cpu((l).fl_stripe_count))
82#define ceph_file_layout_object_size(l) ((__s32)le32_to_cpu((l).fl_object_size))
83#define ceph_file_layout_cas_hash(l) ((__s32)le32_to_cpu((l).fl_cas_hash))
84#define ceph_file_layout_object_su(l) \
85 ((__s32)le32_to_cpu((l).fl_object_stripe_unit))
86#define ceph_file_layout_pg_pool(l) \
87 ((__s32)le32_to_cpu((l).fl_pg_pool))
88
89static inline unsigned ceph_file_layout_stripe_width(struct ceph_file_layout *l)
90{ 91{
91 return le32_to_cpu(l->fl_stripe_unit) * 92 int len;
92 le32_to_cpu(l->fl_stripe_count); 93
94 len = strlen(name);
95 if (len > sizeof(oid->name)) {
96 WARN(1, "ceph_oid_set_name '%s' len %d vs %zu, truncating\n",
97 name, len, sizeof(oid->name));
98 len = sizeof(oid->name);
99 }
100
101 memcpy(oid->name, name, len);
102 oid->name_len = len;
93} 103}
94 104
95/* "period" == bytes before i start on a new set of objects */ 105static inline void ceph_oid_copy(struct ceph_object_id *dest,
96static inline unsigned ceph_file_layout_period(struct ceph_file_layout *l) 106 struct ceph_object_id *src)
97{ 107{
98 return le32_to_cpu(l->fl_object_size) * 108 BUG_ON(src->name_len > sizeof(dest->name));
99 le32_to_cpu(l->fl_stripe_count); 109 memcpy(dest->name, src->name, src->name_len);
110 dest->name_len = src->name_len;
100} 111}
101 112
102
103static inline int ceph_osd_is_up(struct ceph_osdmap *map, int osd) 113static inline int ceph_osd_is_up(struct ceph_osdmap *map, int osd)
104{ 114{
105 return (osd < map->max_osd) && (map->osd_state[osd] & CEPH_OSD_UP); 115 return (osd < map->max_osd) && (map->osd_state[osd] & CEPH_OSD_UP);
@@ -155,14 +165,20 @@ extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
155 u64 *bno, u64 *oxoff, u64 *oxlen); 165 u64 *bno, u64 *oxoff, u64 *oxlen);
156 166
157/* calculate mapping of object to a placement group */ 167/* calculate mapping of object to a placement group */
158extern int ceph_calc_ceph_pg(struct ceph_pg *pg, const char *oid, 168extern int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap,
159 struct ceph_osdmap *osdmap, uint64_t pool); 169 struct ceph_object_locator *oloc,
170 struct ceph_object_id *oid,
171 struct ceph_pg *pg_out);
172
160extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, 173extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap,
161 struct ceph_pg pgid, 174 struct ceph_pg pgid,
162 int *acting); 175 int *acting);
163extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, 176extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
164 struct ceph_pg pgid); 177 struct ceph_pg pgid);
165 178
179extern struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map,
180 u64 id);
181
166extern const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id); 182extern const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id);
167extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name); 183extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name);
168 184
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index 68c96a508ac2..96292df4041b 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -344,6 +344,10 @@ enum {
344 CEPH_OSD_FLAG_EXEC_PUBLIC = 0x1000, /* DEPRECATED op may exec (public) */ 344 CEPH_OSD_FLAG_EXEC_PUBLIC = 0x1000, /* DEPRECATED op may exec (public) */
345 CEPH_OSD_FLAG_LOCALIZE_READS = 0x2000, /* read from nearby replica, if any */ 345 CEPH_OSD_FLAG_LOCALIZE_READS = 0x2000, /* read from nearby replica, if any */
346 CEPH_OSD_FLAG_RWORDERED = 0x4000, /* order wrt concurrent reads */ 346 CEPH_OSD_FLAG_RWORDERED = 0x4000, /* order wrt concurrent reads */
347 CEPH_OSD_FLAG_IGNORE_CACHE = 0x8000, /* ignore cache logic */
348 CEPH_OSD_FLAG_SKIPRWLOCKS = 0x10000, /* skip rw locks */
349 CEPH_OSD_FLAG_IGNORE_OVERLAY = 0x20000, /* ignore pool overlay */
350 CEPH_OSD_FLAG_FLUSH = 0x40000, /* this is part of flush */
347}; 351};
348 352
349enum { 353enum {
diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h
index 6a1101f24cfb..acaa5615d634 100644
--- a/include/linux/crush/crush.h
+++ b/include/linux/crush/crush.h
@@ -19,11 +19,12 @@
19 19
20#define CRUSH_MAGIC 0x00010000ul /* for detecting algorithm revisions */ 20#define CRUSH_MAGIC 0x00010000ul /* for detecting algorithm revisions */
21 21
22
23#define CRUSH_MAX_DEPTH 10 /* max crush hierarchy depth */ 22#define CRUSH_MAX_DEPTH 10 /* max crush hierarchy depth */
24#define CRUSH_MAX_SET 10 /* max size of a mapping result */
25 23
26 24
25#define CRUSH_ITEM_UNDEF 0x7ffffffe /* undefined result (internal use only) */
26#define CRUSH_ITEM_NONE 0x7fffffff /* no result */
27
27/* 28/*
28 * CRUSH uses user-defined "rules" to describe how inputs should be 29 * CRUSH uses user-defined "rules" to describe how inputs should be
29 * mapped to devices. A rule consists of sequence of steps to perform 30 * mapped to devices. A rule consists of sequence of steps to perform
@@ -43,8 +44,13 @@ enum {
43 /* arg2 = type */ 44 /* arg2 = type */
44 CRUSH_RULE_CHOOSE_INDEP = 3, /* same */ 45 CRUSH_RULE_CHOOSE_INDEP = 3, /* same */
45 CRUSH_RULE_EMIT = 4, /* no args */ 46 CRUSH_RULE_EMIT = 4, /* no args */
46 CRUSH_RULE_CHOOSE_LEAF_FIRSTN = 6, 47 CRUSH_RULE_CHOOSELEAF_FIRSTN = 6,
47 CRUSH_RULE_CHOOSE_LEAF_INDEP = 7, 48 CRUSH_RULE_CHOOSELEAF_INDEP = 7,
49
50 CRUSH_RULE_SET_CHOOSE_TRIES = 8, /* override choose_total_tries */
51 CRUSH_RULE_SET_CHOOSELEAF_TRIES = 9, /* override chooseleaf_descend_once */
52 CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES = 10,
53 CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES = 11,
48}; 54};
49 55
50/* 56/*
@@ -162,7 +168,10 @@ struct crush_map {
162 __u32 choose_local_fallback_tries; 168 __u32 choose_local_fallback_tries;
163 /* choose attempts before giving up */ 169 /* choose attempts before giving up */
164 __u32 choose_total_tries; 170 __u32 choose_total_tries;
165 /* attempt chooseleaf inner descent once; on failure retry outer descent */ 171 /* attempt chooseleaf inner descent once for firstn mode; on
172 * reject retry outer descent. Note that this does *not*
173 * apply to a collision: in that case we will retry as we used
174 * to. */
166 __u32 chooseleaf_descend_once; 175 __u32 chooseleaf_descend_once;
167}; 176};
168 177
@@ -174,6 +183,7 @@ extern void crush_destroy_bucket_list(struct crush_bucket_list *b);
174extern void crush_destroy_bucket_tree(struct crush_bucket_tree *b); 183extern void crush_destroy_bucket_tree(struct crush_bucket_tree *b);
175extern void crush_destroy_bucket_straw(struct crush_bucket_straw *b); 184extern void crush_destroy_bucket_straw(struct crush_bucket_straw *b);
176extern void crush_destroy_bucket(struct crush_bucket *b); 185extern void crush_destroy_bucket(struct crush_bucket *b);
186extern void crush_destroy_rule(struct crush_rule *r);
177extern void crush_destroy(struct crush_map *map); 187extern void crush_destroy(struct crush_map *map);
178 188
179static inline int crush_calc_tree_node(int i) 189static inline int crush_calc_tree_node(int i)
diff --git a/include/linux/crush/mapper.h b/include/linux/crush/mapper.h
index 5772dee3ecbf..eab367446eea 100644
--- a/include/linux/crush/mapper.h
+++ b/include/linux/crush/mapper.h
@@ -14,6 +14,7 @@ extern int crush_find_rule(const struct crush_map *map, int ruleset, int type, i
14extern int crush_do_rule(const struct crush_map *map, 14extern int crush_do_rule(const struct crush_map *map,
15 int ruleno, 15 int ruleno,
16 int x, int *result, int result_max, 16 int x, int *result, int result_max,
17 const __u32 *weights); 17 const __u32 *weights, int weight_max,
18 int *scratch);
18 19
19#endif 20#endif
diff --git a/net/ceph/buffer.c b/net/ceph/buffer.c
index bf3e6a13c215..621b5f65407f 100644
--- a/net/ceph/buffer.c
+++ b/net/ceph/buffer.c
@@ -6,6 +6,7 @@
6 6
7#include <linux/ceph/buffer.h> 7#include <linux/ceph/buffer.h>
8#include <linux/ceph/decode.h> 8#include <linux/ceph/decode.h>
9#include <linux/ceph/libceph.h> /* for ceph_kv{malloc,free} */
9 10
10struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp) 11struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp)
11{ 12{
@@ -15,16 +16,10 @@ struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp)
15 if (!b) 16 if (!b)
16 return NULL; 17 return NULL;
17 18
18 b->vec.iov_base = kmalloc(len, gfp | __GFP_NOWARN); 19 b->vec.iov_base = ceph_kvmalloc(len, gfp);
19 if (b->vec.iov_base) { 20 if (!b->vec.iov_base) {
20 b->is_vmalloc = false; 21 kfree(b);
21 } else { 22 return NULL;
22 b->vec.iov_base = __vmalloc(len, gfp | __GFP_HIGHMEM, PAGE_KERNEL);
23 if (!b->vec.iov_base) {
24 kfree(b);
25 return NULL;
26 }
27 b->is_vmalloc = true;
28 } 23 }
29 24
30 kref_init(&b->kref); 25 kref_init(&b->kref);
@@ -40,12 +35,7 @@ void ceph_buffer_release(struct kref *kref)
40 struct ceph_buffer *b = container_of(kref, struct ceph_buffer, kref); 35 struct ceph_buffer *b = container_of(kref, struct ceph_buffer, kref);
41 36
42 dout("buffer_release %p\n", b); 37 dout("buffer_release %p\n", b);
43 if (b->vec.iov_base) { 38 ceph_kvfree(b->vec.iov_base);
44 if (b->is_vmalloc)
45 vfree(b->vec.iov_base);
46 else
47 kfree(b->vec.iov_base);
48 }
49 kfree(b); 39 kfree(b);
50} 40}
51EXPORT_SYMBOL(ceph_buffer_release); 41EXPORT_SYMBOL(ceph_buffer_release);
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 34b11ee8124e..67d7721d237e 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -15,6 +15,7 @@
15#include <linux/slab.h> 15#include <linux/slab.h>
16#include <linux/statfs.h> 16#include <linux/statfs.h>
17#include <linux/string.h> 17#include <linux/string.h>
18#include <linux/vmalloc.h>
18#include <linux/nsproxy.h> 19#include <linux/nsproxy.h>
19#include <net/net_namespace.h> 20#include <net/net_namespace.h>
20 21
@@ -170,6 +171,25 @@ int ceph_compare_options(struct ceph_options *new_opt,
170} 171}
171EXPORT_SYMBOL(ceph_compare_options); 172EXPORT_SYMBOL(ceph_compare_options);
172 173
174void *ceph_kvmalloc(size_t size, gfp_t flags)
175{
176 if (size <= (PAGE_SIZE << PAGE_ALLOC_COSTLY_ORDER)) {
177 void *ptr = kmalloc(size, flags | __GFP_NOWARN);
178 if (ptr)
179 return ptr;
180 }
181
182 return __vmalloc(size, flags | __GFP_HIGHMEM, PAGE_KERNEL);
183}
184
185void ceph_kvfree(const void *ptr)
186{
187 if (is_vmalloc_addr(ptr))
188 vfree(ptr);
189 else
190 kfree(ptr);
191}
192
173 193
174static int parse_fsid(const char *str, struct ceph_fsid *fsid) 194static int parse_fsid(const char *str, struct ceph_fsid *fsid)
175{ 195{
@@ -461,8 +481,8 @@ EXPORT_SYMBOL(ceph_client_id);
461 * create a fresh client instance 481 * create a fresh client instance
462 */ 482 */
463struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, 483struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
464 unsigned int supported_features, 484 u64 supported_features,
465 unsigned int required_features) 485 u64 required_features)
466{ 486{
467 struct ceph_client *client; 487 struct ceph_client *client;
468 struct ceph_entity_addr *myaddr = NULL; 488 struct ceph_entity_addr *myaddr = NULL;
diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c
index 089613234f03..16bc199d9a62 100644
--- a/net/ceph/crush/crush.c
+++ b/net/ceph/crush/crush.c
@@ -116,11 +116,14 @@ void crush_destroy(struct crush_map *map)
116 if (map->rules) { 116 if (map->rules) {
117 __u32 b; 117 __u32 b;
118 for (b = 0; b < map->max_rules; b++) 118 for (b = 0; b < map->max_rules; b++)
119 kfree(map->rules[b]); 119 crush_destroy_rule(map->rules[b]);
120 kfree(map->rules); 120 kfree(map->rules);
121 } 121 }
122 122
123 kfree(map); 123 kfree(map);
124} 124}
125 125
126 126void crush_destroy_rule(struct crush_rule *rule)
127{
128 kfree(rule);
129}
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index cbd06a91941c..b703790b4e44 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -189,7 +189,7 @@ static int terminal(int x)
189static int bucket_tree_choose(struct crush_bucket_tree *bucket, 189static int bucket_tree_choose(struct crush_bucket_tree *bucket,
190 int x, int r) 190 int x, int r)
191{ 191{
192 int n, l; 192 int n;
193 __u32 w; 193 __u32 w;
194 __u64 t; 194 __u64 t;
195 195
@@ -197,6 +197,7 @@ static int bucket_tree_choose(struct crush_bucket_tree *bucket,
197 n = bucket->num_nodes >> 1; 197 n = bucket->num_nodes >> 1;
198 198
199 while (!terminal(n)) { 199 while (!terminal(n)) {
200 int l;
200 /* pick point in [0, w) */ 201 /* pick point in [0, w) */
201 w = bucket->node_weights[n]; 202 w = bucket->node_weights[n];
202 t = (__u64)crush_hash32_4(bucket->h.hash, x, n, r, 203 t = (__u64)crush_hash32_4(bucket->h.hash, x, n, r,
@@ -264,8 +265,12 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
264 * true if device is marked "out" (failed, fully offloaded) 265 * true if device is marked "out" (failed, fully offloaded)
265 * of the cluster 266 * of the cluster
266 */ 267 */
267static int is_out(const struct crush_map *map, const __u32 *weight, int item, int x) 268static int is_out(const struct crush_map *map,
269 const __u32 *weight, int weight_max,
270 int item, int x)
268{ 271{
272 if (item >= weight_max)
273 return 1;
269 if (weight[item] >= 0x10000) 274 if (weight[item] >= 0x10000)
270 return 0; 275 return 0;
271 if (weight[item] == 0) 276 if (weight[item] == 0)
@@ -277,7 +282,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in
277} 282}
278 283
279/** 284/**
280 * crush_choose - choose numrep distinct items of given type 285 * crush_choose_firstn - choose numrep distinct items of given type
281 * @map: the crush_map 286 * @map: the crush_map
282 * @bucket: the bucket we are choose an item from 287 * @bucket: the bucket we are choose an item from
283 * @x: crush input value 288 * @x: crush input value
@@ -285,18 +290,24 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in
285 * @type: the type of item to choose 290 * @type: the type of item to choose
286 * @out: pointer to output vector 291 * @out: pointer to output vector
287 * @outpos: our position in that vector 292 * @outpos: our position in that vector
288 * @firstn: true if choosing "first n" items, false if choosing "indep" 293 * @tries: number of attempts to make
289 * @recurse_to_leaf: true if we want one device under each item of given type 294 * @recurse_tries: number of attempts to have recursive chooseleaf make
290 * @descend_once: true if we should only try one descent before giving up 295 * @local_tries: localized retries
296 * @local_fallback_tries: localized fallback retries
297 * @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose)
291 * @out2: second output vector for leaf items (if @recurse_to_leaf) 298 * @out2: second output vector for leaf items (if @recurse_to_leaf)
292 */ 299 */
293static int crush_choose(const struct crush_map *map, 300static int crush_choose_firstn(const struct crush_map *map,
294 struct crush_bucket *bucket, 301 struct crush_bucket *bucket,
295 const __u32 *weight, 302 const __u32 *weight, int weight_max,
296 int x, int numrep, int type, 303 int x, int numrep, int type,
297 int *out, int outpos, 304 int *out, int outpos,
298 int firstn, int recurse_to_leaf, 305 unsigned int tries,
299 int descend_once, int *out2) 306 unsigned int recurse_tries,
307 unsigned int local_tries,
308 unsigned int local_fallback_tries,
309 int recurse_to_leaf,
310 int *out2)
300{ 311{
301 int rep; 312 int rep;
302 unsigned int ftotal, flocal; 313 unsigned int ftotal, flocal;
@@ -325,35 +336,17 @@ static int crush_choose(const struct crush_map *map,
325 collide = 0; 336 collide = 0;
326 retry_bucket = 0; 337 retry_bucket = 0;
327 r = rep; 338 r = rep;
328 if (in->alg == CRUSH_BUCKET_UNIFORM) { 339 /* r' = r + f_total */
329 /* be careful */ 340 r += ftotal;
330 if (firstn || (__u32)numrep >= in->size)
331 /* r' = r + f_total */
332 r += ftotal;
333 else if (in->size % numrep == 0)
334 /* r'=r+(n+1)*f_local */
335 r += (numrep+1) *
336 (flocal+ftotal);
337 else
338 /* r' = r + n*f_local */
339 r += numrep * (flocal+ftotal);
340 } else {
341 if (firstn)
342 /* r' = r + f_total */
343 r += ftotal;
344 else
345 /* r' = r + n*f_local */
346 r += numrep * (flocal+ftotal);
347 }
348 341
349 /* bucket choose */ 342 /* bucket choose */
350 if (in->size == 0) { 343 if (in->size == 0) {
351 reject = 1; 344 reject = 1;
352 goto reject; 345 goto reject;
353 } 346 }
354 if (map->choose_local_fallback_tries > 0 && 347 if (local_fallback_tries > 0 &&
355 flocal >= (in->size>>1) && 348 flocal >= (in->size>>1) &&
356 flocal > map->choose_local_fallback_tries) 349 flocal > local_fallback_tries)
357 item = bucket_perm_choose(in, x, r); 350 item = bucket_perm_choose(in, x, r);
358 else 351 else
359 item = crush_bucket_choose(in, x, r); 352 item = crush_bucket_choose(in, x, r);
@@ -394,13 +387,15 @@ static int crush_choose(const struct crush_map *map,
394 reject = 0; 387 reject = 0;
395 if (!collide && recurse_to_leaf) { 388 if (!collide && recurse_to_leaf) {
396 if (item < 0) { 389 if (item < 0) {
397 if (crush_choose(map, 390 if (crush_choose_firstn(map,
398 map->buckets[-1-item], 391 map->buckets[-1-item],
399 weight, 392 weight, weight_max,
400 x, outpos+1, 0, 393 x, outpos+1, 0,
401 out2, outpos, 394 out2, outpos,
402 firstn, 0, 395 recurse_tries, 0,
403 map->chooseleaf_descend_once, 396 local_tries,
397 local_fallback_tries,
398 0,
404 NULL) <= outpos) 399 NULL) <= outpos)
405 /* didn't get leaf */ 400 /* didn't get leaf */
406 reject = 1; 401 reject = 1;
@@ -414,6 +409,7 @@ static int crush_choose(const struct crush_map *map,
414 /* out? */ 409 /* out? */
415 if (itemtype == 0) 410 if (itemtype == 0)
416 reject = is_out(map, weight, 411 reject = is_out(map, weight,
412 weight_max,
417 item, x); 413 item, x);
418 else 414 else
419 reject = 0; 415 reject = 0;
@@ -424,17 +420,14 @@ reject:
424 ftotal++; 420 ftotal++;
425 flocal++; 421 flocal++;
426 422
427 if (reject && descend_once) 423 if (collide && flocal <= local_tries)
428 /* let outer call try again */
429 skip_rep = 1;
430 else if (collide && flocal <= map->choose_local_tries)
431 /* retry locally a few times */ 424 /* retry locally a few times */
432 retry_bucket = 1; 425 retry_bucket = 1;
433 else if (map->choose_local_fallback_tries > 0 && 426 else if (local_fallback_tries > 0 &&
434 flocal <= in->size + map->choose_local_fallback_tries) 427 flocal <= in->size + local_fallback_tries)
435 /* exhaustive bucket search */ 428 /* exhaustive bucket search */
436 retry_bucket = 1; 429 retry_bucket = 1;
437 else if (ftotal <= map->choose_total_tries) 430 else if (ftotal <= tries)
438 /* then retry descent */ 431 /* then retry descent */
439 retry_descent = 1; 432 retry_descent = 1;
440 else 433 else
@@ -464,21 +457,179 @@ reject:
464 457
465 458
466/** 459/**
460 * crush_choose_indep: alternative breadth-first positionally stable mapping
461 *
462 */
463static void crush_choose_indep(const struct crush_map *map,
464 struct crush_bucket *bucket,
465 const __u32 *weight, int weight_max,
466 int x, int left, int numrep, int type,
467 int *out, int outpos,
468 unsigned int tries,
469 unsigned int recurse_tries,
470 int recurse_to_leaf,
471 int *out2,
472 int parent_r)
473{
474 struct crush_bucket *in = bucket;
475 int endpos = outpos + left;
476 int rep;
477 unsigned int ftotal;
478 int r;
479 int i;
480 int item = 0;
481 int itemtype;
482 int collide;
483
484 dprintk("CHOOSE%s INDEP bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "",
485 bucket->id, x, outpos, numrep);
486
487 /* initially my result is undefined */
488 for (rep = outpos; rep < endpos; rep++) {
489 out[rep] = CRUSH_ITEM_UNDEF;
490 if (out2)
491 out2[rep] = CRUSH_ITEM_UNDEF;
492 }
493
494 for (ftotal = 0; left > 0 && ftotal < tries; ftotal++) {
495 for (rep = outpos; rep < endpos; rep++) {
496 if (out[rep] != CRUSH_ITEM_UNDEF)
497 continue;
498
499 in = bucket; /* initial bucket */
500
501 /* choose through intervening buckets */
502 for (;;) {
503 /* note: we base the choice on the position
504 * even in the nested call. that means that
505 * if the first layer chooses the same bucket
506 * in a different position, we will tend to
507 * choose a different item in that bucket.
508 * this will involve more devices in data
509 * movement and tend to distribute the load.
510 */
511 r = rep + parent_r;
512
513 /* be careful */
514 if (in->alg == CRUSH_BUCKET_UNIFORM &&
515 in->size % numrep == 0)
516 /* r'=r+(n+1)*f_total */
517 r += (numrep+1) * ftotal;
518 else
519 /* r' = r + n*f_total */
520 r += numrep * ftotal;
521
522 /* bucket choose */
523 if (in->size == 0) {
524 dprintk(" empty bucket\n");
525 break;
526 }
527
528 item = crush_bucket_choose(in, x, r);
529 if (item >= map->max_devices) {
530 dprintk(" bad item %d\n", item);
531 out[rep] = CRUSH_ITEM_NONE;
532 if (out2)
533 out2[rep] = CRUSH_ITEM_NONE;
534 left--;
535 break;
536 }
537
538 /* desired type? */
539 if (item < 0)
540 itemtype = map->buckets[-1-item]->type;
541 else
542 itemtype = 0;
543 dprintk(" item %d type %d\n", item, itemtype);
544
545 /* keep going? */
546 if (itemtype != type) {
547 if (item >= 0 ||
548 (-1-item) >= map->max_buckets) {
549 dprintk(" bad item type %d\n", type);
550 out[rep] = CRUSH_ITEM_NONE;
551 if (out2)
552 out2[rep] =
553 CRUSH_ITEM_NONE;
554 left--;
555 break;
556 }
557 in = map->buckets[-1-item];
558 continue;
559 }
560
561 /* collision? */
562 collide = 0;
563 for (i = outpos; i < endpos; i++) {
564 if (out[i] == item) {
565 collide = 1;
566 break;
567 }
568 }
569 if (collide)
570 break;
571
572 if (recurse_to_leaf) {
573 if (item < 0) {
574 crush_choose_indep(map,
575 map->buckets[-1-item],
576 weight, weight_max,
577 x, 1, numrep, 0,
578 out2, rep,
579 recurse_tries, 0,
580 0, NULL, r);
581 if (out2[rep] == CRUSH_ITEM_NONE) {
582 /* placed nothing; no leaf */
583 break;
584 }
585 } else {
586 /* we already have a leaf! */
587 out2[rep] = item;
588 }
589 }
590
591 /* out? */
592 if (itemtype == 0 &&
593 is_out(map, weight, weight_max, item, x))
594 break;
595
596 /* yay! */
597 out[rep] = item;
598 left--;
599 break;
600 }
601 }
602 }
603 for (rep = outpos; rep < endpos; rep++) {
604 if (out[rep] == CRUSH_ITEM_UNDEF) {
605 out[rep] = CRUSH_ITEM_NONE;
606 }
607 if (out2 && out2[rep] == CRUSH_ITEM_UNDEF) {
608 out2[rep] = CRUSH_ITEM_NONE;
609 }
610 }
611}
612
613/**
467 * crush_do_rule - calculate a mapping with the given input and rule 614 * crush_do_rule - calculate a mapping with the given input and rule
468 * @map: the crush_map 615 * @map: the crush_map
469 * @ruleno: the rule id 616 * @ruleno: the rule id
470 * @x: hash input 617 * @x: hash input
471 * @result: pointer to result vector 618 * @result: pointer to result vector
472 * @result_max: maximum result size 619 * @result_max: maximum result size
620 * @weight: weight vector (for map leaves)
621 * @weight_max: size of weight vector
622 * @scratch: scratch vector for private use; must be >= 3 * result_max
473 */ 623 */
474int crush_do_rule(const struct crush_map *map, 624int crush_do_rule(const struct crush_map *map,
475 int ruleno, int x, int *result, int result_max, 625 int ruleno, int x, int *result, int result_max,
476 const __u32 *weight) 626 const __u32 *weight, int weight_max,
627 int *scratch)
477{ 628{
478 int result_len; 629 int result_len;
479 int a[CRUSH_MAX_SET]; 630 int *a = scratch;
480 int b[CRUSH_MAX_SET]; 631 int *b = scratch + result_max;
481 int c[CRUSH_MAX_SET]; 632 int *c = scratch + result_max*2;
482 int recurse_to_leaf; 633 int recurse_to_leaf;
483 int *w; 634 int *w;
484 int wsize = 0; 635 int wsize = 0;
@@ -489,8 +640,10 @@ int crush_do_rule(const struct crush_map *map,
489 __u32 step; 640 __u32 step;
490 int i, j; 641 int i, j;
491 int numrep; 642 int numrep;
492 int firstn; 643 int choose_tries = map->choose_total_tries;
493 const int descend_once = 0; 644 int choose_local_tries = map->choose_local_tries;
645 int choose_local_fallback_tries = map->choose_local_fallback_tries;
646 int choose_leaf_tries = 0;
494 647
495 if ((__u32)ruleno >= map->max_rules) { 648 if ((__u32)ruleno >= map->max_rules) {
496 dprintk(" bad ruleno %d\n", ruleno); 649 dprintk(" bad ruleno %d\n", ruleno);
@@ -503,29 +656,49 @@ int crush_do_rule(const struct crush_map *map,
503 o = b; 656 o = b;
504 657
505 for (step = 0; step < rule->len; step++) { 658 for (step = 0; step < rule->len; step++) {
659 int firstn = 0;
506 struct crush_rule_step *curstep = &rule->steps[step]; 660 struct crush_rule_step *curstep = &rule->steps[step];
507 661
508 firstn = 0;
509 switch (curstep->op) { 662 switch (curstep->op) {
510 case CRUSH_RULE_TAKE: 663 case CRUSH_RULE_TAKE:
511 w[0] = curstep->arg1; 664 w[0] = curstep->arg1;
512 wsize = 1; 665 wsize = 1;
513 break; 666 break;
514 667
515 case CRUSH_RULE_CHOOSE_LEAF_FIRSTN: 668 case CRUSH_RULE_SET_CHOOSE_TRIES:
669 if (curstep->arg1 > 0)
670 choose_tries = curstep->arg1;
671 break;
672
673 case CRUSH_RULE_SET_CHOOSELEAF_TRIES:
674 if (curstep->arg1 > 0)
675 choose_leaf_tries = curstep->arg1;
676 break;
677
678 case CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES:
679 if (curstep->arg1 > 0)
680 choose_local_tries = curstep->arg1;
681 break;
682
683 case CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES:
684 if (curstep->arg1 > 0)
685 choose_local_fallback_tries = curstep->arg1;
686 break;
687
688 case CRUSH_RULE_CHOOSELEAF_FIRSTN:
516 case CRUSH_RULE_CHOOSE_FIRSTN: 689 case CRUSH_RULE_CHOOSE_FIRSTN:
517 firstn = 1; 690 firstn = 1;
518 /* fall through */ 691 /* fall through */
519 case CRUSH_RULE_CHOOSE_LEAF_INDEP: 692 case CRUSH_RULE_CHOOSELEAF_INDEP:
520 case CRUSH_RULE_CHOOSE_INDEP: 693 case CRUSH_RULE_CHOOSE_INDEP:
521 if (wsize == 0) 694 if (wsize == 0)
522 break; 695 break;
523 696
524 recurse_to_leaf = 697 recurse_to_leaf =
525 curstep->op == 698 curstep->op ==
526 CRUSH_RULE_CHOOSE_LEAF_FIRSTN || 699 CRUSH_RULE_CHOOSELEAF_FIRSTN ||
527 curstep->op == 700 curstep->op ==
528 CRUSH_RULE_CHOOSE_LEAF_INDEP; 701 CRUSH_RULE_CHOOSELEAF_INDEP;
529 702
530 /* reset output */ 703 /* reset output */
531 osize = 0; 704 osize = 0;
@@ -543,22 +716,51 @@ int crush_do_rule(const struct crush_map *map,
543 continue; 716 continue;
544 } 717 }
545 j = 0; 718 j = 0;
546 osize += crush_choose(map, 719 if (firstn) {
547 map->buckets[-1-w[i]], 720 int recurse_tries;
548 weight, 721 if (choose_leaf_tries)
549 x, numrep, 722 recurse_tries =
550 curstep->arg2, 723 choose_leaf_tries;
551 o+osize, j, 724 else if (map->chooseleaf_descend_once)
552 firstn, 725 recurse_tries = 1;
553 recurse_to_leaf, 726 else
554 descend_once, c+osize); 727 recurse_tries = choose_tries;
728 osize += crush_choose_firstn(
729 map,
730 map->buckets[-1-w[i]],
731 weight, weight_max,
732 x, numrep,
733 curstep->arg2,
734 o+osize, j,
735 choose_tries,
736 recurse_tries,
737 choose_local_tries,
738 choose_local_fallback_tries,
739 recurse_to_leaf,
740 c+osize);
741 } else {
742 crush_choose_indep(
743 map,
744 map->buckets[-1-w[i]],
745 weight, weight_max,
746 x, numrep, numrep,
747 curstep->arg2,
748 o+osize, j,
749 choose_tries,
750 choose_leaf_tries ?
751 choose_leaf_tries : 1,
752 recurse_to_leaf,
753 c+osize,
754 0);
755 osize += numrep;
756 }
555 } 757 }
556 758
557 if (recurse_to_leaf) 759 if (recurse_to_leaf)
558 /* copy final _leaf_ values to output set */ 760 /* copy final _leaf_ values to output set */
559 memcpy(o, c, osize*sizeof(*o)); 761 memcpy(o, c, osize*sizeof(*o));
560 762
561 /* swap t and w arrays */ 763 /* swap o and w arrays */
562 tmp = o; 764 tmp = o;
563 o = w; 765 o = w;
564 w = tmp; 766 w = tmp;
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 83661cdc0766..258a382e75ed 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -132,7 +132,8 @@ static int osdc_show(struct seq_file *s, void *pp)
132 req->r_osd ? req->r_osd->o_osd : -1, 132 req->r_osd ? req->r_osd->o_osd : -1,
133 req->r_pgid.pool, req->r_pgid.seed); 133 req->r_pgid.pool, req->r_pgid.seed);
134 134
135 seq_printf(s, "%.*s", req->r_oid_len, req->r_oid); 135 seq_printf(s, "%.*s", req->r_base_oid.name_len,
136 req->r_base_oid.name);
136 137
137 if (req->r_reassert_version.epoch) 138 if (req->r_reassert_version.epoch)
138 seq_printf(s, "\t%u'%llu", 139 seq_printf(s, "\t%u'%llu",
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 4a5df7b1cc9f..2ed1304d22a7 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -15,6 +15,7 @@
15#include <linux/dns_resolver.h> 15#include <linux/dns_resolver.h>
16#include <net/tcp.h> 16#include <net/tcp.h>
17 17
18#include <linux/ceph/ceph_features.h>
18#include <linux/ceph/libceph.h> 19#include <linux/ceph/libceph.h>
19#include <linux/ceph/messenger.h> 20#include <linux/ceph/messenger.h>
20#include <linux/ceph/decode.h> 21#include <linux/ceph/decode.h>
@@ -1865,7 +1866,9 @@ int ceph_parse_ips(const char *c, const char *end,
1865 port = (port * 10) + (*p - '0'); 1866 port = (port * 10) + (*p - '0');
1866 p++; 1867 p++;
1867 } 1868 }
1868 if (port > 65535 || port == 0) 1869 if (port == 0)
1870 port = CEPH_MON_PORT;
1871 else if (port > 65535)
1869 goto bad; 1872 goto bad;
1870 } else { 1873 } else {
1871 port = CEPH_MON_PORT; 1874 port = CEPH_MON_PORT;
@@ -1945,7 +1948,8 @@ static int process_connect(struct ceph_connection *con)
1945{ 1948{
1946 u64 sup_feat = con->msgr->supported_features; 1949 u64 sup_feat = con->msgr->supported_features;
1947 u64 req_feat = con->msgr->required_features; 1950 u64 req_feat = con->msgr->required_features;
1948 u64 server_feat = le64_to_cpu(con->in_reply.features); 1951 u64 server_feat = ceph_sanitize_features(
1952 le64_to_cpu(con->in_reply.features));
1949 int ret; 1953 int ret;
1950 1954
1951 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 1955 dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
@@ -2853,8 +2857,8 @@ static void con_fault(struct ceph_connection *con)
2853 */ 2857 */
2854void ceph_messenger_init(struct ceph_messenger *msgr, 2858void ceph_messenger_init(struct ceph_messenger *msgr,
2855 struct ceph_entity_addr *myaddr, 2859 struct ceph_entity_addr *myaddr,
2856 u32 supported_features, 2860 u64 supported_features,
2857 u32 required_features, 2861 u64 required_features,
2858 bool nocrc) 2862 bool nocrc)
2859{ 2863{
2860 msgr->supported_features = supported_features; 2864 msgr->supported_features = supported_features;
@@ -3126,15 +3130,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
3126 INIT_LIST_HEAD(&m->data); 3130 INIT_LIST_HEAD(&m->data);
3127 3131
3128 /* front */ 3132 /* front */
3129 m->front_max = front_len;
3130 if (front_len) { 3133 if (front_len) {
3131 if (front_len > PAGE_CACHE_SIZE) { 3134 m->front.iov_base = ceph_kvmalloc(front_len, flags);
3132 m->front.iov_base = __vmalloc(front_len, flags,
3133 PAGE_KERNEL);
3134 m->front_is_vmalloc = true;
3135 } else {
3136 m->front.iov_base = kmalloc(front_len, flags);
3137 }
3138 if (m->front.iov_base == NULL) { 3135 if (m->front.iov_base == NULL) {
3139 dout("ceph_msg_new can't allocate %d bytes\n", 3136 dout("ceph_msg_new can't allocate %d bytes\n",
3140 front_len); 3137 front_len);
@@ -3143,7 +3140,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
3143 } else { 3140 } else {
3144 m->front.iov_base = NULL; 3141 m->front.iov_base = NULL;
3145 } 3142 }
3146 m->front.iov_len = front_len; 3143 m->front_alloc_len = m->front.iov_len = front_len;
3147 3144
3148 dout("ceph_msg_new %p front %d\n", m, front_len); 3145 dout("ceph_msg_new %p front %d\n", m, front_len);
3149 return m; 3146 return m;
@@ -3256,10 +3253,7 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
3256void ceph_msg_kfree(struct ceph_msg *m) 3253void ceph_msg_kfree(struct ceph_msg *m)
3257{ 3254{
3258 dout("msg_kfree %p\n", m); 3255 dout("msg_kfree %p\n", m);
3259 if (m->front_is_vmalloc) 3256 ceph_kvfree(m->front.iov_base);
3260 vfree(m->front.iov_base);
3261 else
3262 kfree(m->front.iov_base);
3263 kmem_cache_free(ceph_msg_cache, m); 3257 kmem_cache_free(ceph_msg_cache, m);
3264} 3258}
3265 3259
@@ -3301,8 +3295,8 @@ EXPORT_SYMBOL(ceph_msg_last_put);
3301 3295
3302void ceph_msg_dump(struct ceph_msg *msg) 3296void ceph_msg_dump(struct ceph_msg *msg)
3303{ 3297{
3304 pr_debug("msg_dump %p (front_max %d length %zd)\n", msg, 3298 pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg,
3305 msg->front_max, msg->data_length); 3299 msg->front_alloc_len, msg->data_length);
3306 print_hex_dump(KERN_DEBUG, "header: ", 3300 print_hex_dump(KERN_DEBUG, "header: ",
3307 DUMP_PREFIX_OFFSET, 16, 1, 3301 DUMP_PREFIX_OFFSET, 16, 1,
3308 &msg->hdr, sizeof(msg->hdr), true); 3302 &msg->hdr, sizeof(msg->hdr), true);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 1fe25cd29d0e..2ac9ef35110b 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -152,7 +152,7 @@ static int __open_session(struct ceph_mon_client *monc)
152 /* initiatiate authentication handshake */ 152 /* initiatiate authentication handshake */
153 ret = ceph_auth_build_hello(monc->auth, 153 ret = ceph_auth_build_hello(monc->auth,
154 monc->m_auth->front.iov_base, 154 monc->m_auth->front.iov_base,
155 monc->m_auth->front_max); 155 monc->m_auth->front_alloc_len);
156 __send_prepared_auth_request(monc, ret); 156 __send_prepared_auth_request(monc, ret);
157 } else { 157 } else {
158 dout("open_session mon%d already open\n", monc->cur_mon); 158 dout("open_session mon%d already open\n", monc->cur_mon);
@@ -196,7 +196,7 @@ static void __send_subscribe(struct ceph_mon_client *monc)
196 int num; 196 int num;
197 197
198 p = msg->front.iov_base; 198 p = msg->front.iov_base;
199 end = p + msg->front_max; 199 end = p + msg->front_alloc_len;
200 200
201 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; 201 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
202 ceph_encode_32(&p, num); 202 ceph_encode_32(&p, num);
@@ -897,7 +897,7 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
897 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 897 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
898 msg->front.iov_len, 898 msg->front.iov_len,
899 monc->m_auth->front.iov_base, 899 monc->m_auth->front.iov_base,
900 monc->m_auth->front_max); 900 monc->m_auth->front_alloc_len);
901 if (ret < 0) { 901 if (ret < 0) {
902 monc->client->auth_err = ret; 902 monc->client->auth_err = ret;
903 wake_up_all(&monc->client->auth_wq); 903 wake_up_all(&monc->client->auth_wq);
@@ -939,7 +939,7 @@ static int __validate_auth(struct ceph_mon_client *monc)
939 return 0; 939 return 0;
940 940
941 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 941 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
942 monc->m_auth->front_max); 942 monc->m_auth->front_alloc_len);
943 if (ret <= 0) 943 if (ret <= 0)
944 return ret; /* either an error, or no need to authenticate */ 944 return ret; /* either an error, or no need to authenticate */
945 __send_prepared_auth_request(monc, ret); 945 __send_prepared_auth_request(monc, ret);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 2b4b32aaa893..010ff3bd58ad 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -338,7 +338,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
338 msg_size = 4 + 4 + 8 + 8 + 4+8; 338 msg_size = 4 + 4 + 8 + 8 + 4+8;
339 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 339 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
340 msg_size += 1 + 8 + 4 + 4; /* pg_t */ 340 msg_size += 1 + 8 + 4 + 4; /* pg_t */
341 msg_size += 4 + MAX_OBJ_NAME_SIZE; 341 msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
342 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); 342 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
343 msg_size += 8; /* snapid */ 343 msg_size += 8; /* snapid */
344 msg_size += 8; /* snap_seq */ 344 msg_size += 8; /* snap_seq */
@@ -368,6 +368,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
368 INIT_LIST_HEAD(&req->r_req_lru_item); 368 INIT_LIST_HEAD(&req->r_req_lru_item);
369 INIT_LIST_HEAD(&req->r_osd_item); 369 INIT_LIST_HEAD(&req->r_osd_item);
370 370
371 req->r_base_oloc.pool = -1;
372 req->r_target_oloc.pool = -1;
373
371 /* create reply message */ 374 /* create reply message */
372 if (use_mempool) 375 if (use_mempool)
373 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 376 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -761,11 +764,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
761 if (num_ops > 1) 764 if (num_ops > 1)
762 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); 765 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
763 766
764 req->r_file_layout = *layout; /* keep a copy */ 767 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
765 768
766 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", 769 snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name),
767 vino.ino, objnum); 770 "%llx.%08llx", vino.ino, objnum);
768 req->r_oid_len = strlen(req->r_oid); 771 req->r_base_oid.name_len = strlen(req->r_base_oid.name);
769 772
770 return req; 773 return req;
771} 774}
@@ -1044,8 +1047,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1044 !ceph_con_opened(&osd->o_con)) { 1047 !ceph_con_opened(&osd->o_con)) {
1045 struct ceph_osd_request *req; 1048 struct ceph_osd_request *req;
1046 1049
1047 dout(" osd addr hasn't changed and connection never opened," 1050 dout("osd addr hasn't changed and connection never opened, "
1048 " letting msgr retry"); 1051 "letting msgr retry\n");
1049 /* touch each r_stamp for handle_timeout()'s benfit */ 1052 /* touch each r_stamp for handle_timeout()'s benfit */
1050 list_for_each_entry(req, &osd->o_requests, r_osd_item) 1053 list_for_each_entry(req, &osd->o_requests, r_osd_item)
1051 req->r_stamp = jiffies; 1054 req->r_stamp = jiffies;
@@ -1232,6 +1235,61 @@ void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
1232EXPORT_SYMBOL(ceph_osdc_set_request_linger); 1235EXPORT_SYMBOL(ceph_osdc_set_request_linger);
1233 1236
1234/* 1237/*
1238 * Returns whether a request should be blocked from being sent
1239 * based on the current osdmap and osd_client settings.
1240 *
1241 * Caller should hold map_sem for read.
1242 */
1243static bool __req_should_be_paused(struct ceph_osd_client *osdc,
1244 struct ceph_osd_request *req)
1245{
1246 bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
1247 bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
1248 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
1249 return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) ||
1250 (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr);
1251}
1252
1253/*
1254 * Calculate mapping of a request to a PG. Takes tiering into account.
1255 */
1256static int __calc_request_pg(struct ceph_osdmap *osdmap,
1257 struct ceph_osd_request *req,
1258 struct ceph_pg *pg_out)
1259{
1260 bool need_check_tiering;
1261
1262 need_check_tiering = false;
1263 if (req->r_target_oloc.pool == -1) {
1264 req->r_target_oloc = req->r_base_oloc; /* struct */
1265 need_check_tiering = true;
1266 }
1267 if (req->r_target_oid.name_len == 0) {
1268 ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
1269 need_check_tiering = true;
1270 }
1271
1272 if (need_check_tiering &&
1273 (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1274 struct ceph_pg_pool_info *pi;
1275
1276 pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
1277 if (pi) {
1278 if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
1279 pi->read_tier >= 0)
1280 req->r_target_oloc.pool = pi->read_tier;
1281 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
1282 pi->write_tier >= 0)
1283 req->r_target_oloc.pool = pi->write_tier;
1284 }
1285 /* !pi is caught in ceph_oloc_oid_to_pg() */
1286 }
1287
1288 return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
1289 &req->r_target_oid, pg_out);
1290}
1291
1292/*
1235 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 1293 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
1236 * (as needed), and set the request r_osd appropriately. If there is 1294 * (as needed), and set the request r_osd appropriately. If there is
1237 * no up osd, set r_osd to NULL. Move the request to the appropriate list 1295 * no up osd, set r_osd to NULL. Move the request to the appropriate list
@@ -1248,10 +1306,11 @@ static int __map_request(struct ceph_osd_client *osdc,
1248 int acting[CEPH_PG_MAX_SIZE]; 1306 int acting[CEPH_PG_MAX_SIZE];
1249 int o = -1, num = 0; 1307 int o = -1, num = 0;
1250 int err; 1308 int err;
1309 bool was_paused;
1251 1310
1252 dout("map_request %p tid %lld\n", req, req->r_tid); 1311 dout("map_request %p tid %lld\n", req, req->r_tid);
1253 err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, 1312
1254 ceph_file_layout_pg_pool(req->r_file_layout)); 1313 err = __calc_request_pg(osdc->osdmap, req, &pgid);
1255 if (err) { 1314 if (err) {
1256 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1315 list_move(&req->r_req_lru_item, &osdc->req_notarget);
1257 return err; 1316 return err;
@@ -1264,12 +1323,18 @@ static int __map_request(struct ceph_osd_client *osdc,
1264 num = err; 1323 num = err;
1265 } 1324 }
1266 1325
1326 was_paused = req->r_paused;
1327 req->r_paused = __req_should_be_paused(osdc, req);
1328 if (was_paused && !req->r_paused)
1329 force_resend = 1;
1330
1267 if ((!force_resend && 1331 if ((!force_resend &&
1268 req->r_osd && req->r_osd->o_osd == o && 1332 req->r_osd && req->r_osd->o_osd == o &&
1269 req->r_sent >= req->r_osd->o_incarnation && 1333 req->r_sent >= req->r_osd->o_incarnation &&
1270 req->r_num_pg_osds == num && 1334 req->r_num_pg_osds == num &&
1271 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 1335 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
1272 (req->r_osd == NULL && o == -1)) 1336 (req->r_osd == NULL && o == -1) ||
1337 req->r_paused)
1273 return 0; /* no change */ 1338 return 0; /* no change */
1274 1339
1275 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", 1340 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
@@ -1331,7 +1396,7 @@ static void __send_request(struct ceph_osd_client *osdc,
1331 /* fill in message content that changes each time we send it */ 1396 /* fill in message content that changes each time we send it */
1332 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1397 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1333 put_unaligned_le32(req->r_flags, req->r_request_flags); 1398 put_unaligned_le32(req->r_flags, req->r_request_flags);
1334 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); 1399 put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
1335 p = req->r_request_pgid; 1400 p = req->r_request_pgid;
1336 ceph_encode_64(&p, req->r_pgid.pool); 1401 ceph_encode_64(&p, req->r_pgid.pool);
1337 ceph_encode_32(&p, req->r_pgid.seed); 1402 ceph_encode_32(&p, req->r_pgid.seed);
@@ -1432,6 +1497,109 @@ static void handle_osds_timeout(struct work_struct *work)
1432 round_jiffies_relative(delay)); 1497 round_jiffies_relative(delay));
1433} 1498}
1434 1499
1500static int ceph_oloc_decode(void **p, void *end,
1501 struct ceph_object_locator *oloc)
1502{
1503 u8 struct_v, struct_cv;
1504 u32 len;
1505 void *struct_end;
1506 int ret = 0;
1507
1508 ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
1509 struct_v = ceph_decode_8(p);
1510 struct_cv = ceph_decode_8(p);
1511 if (struct_v < 3) {
1512 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
1513 struct_v, struct_cv);
1514 goto e_inval;
1515 }
1516 if (struct_cv > 6) {
1517 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
1518 struct_v, struct_cv);
1519 goto e_inval;
1520 }
1521 len = ceph_decode_32(p);
1522 ceph_decode_need(p, end, len, e_inval);
1523 struct_end = *p + len;
1524
1525 oloc->pool = ceph_decode_64(p);
1526 *p += 4; /* skip preferred */
1527
1528 len = ceph_decode_32(p);
1529 if (len > 0) {
1530 pr_warn("ceph_object_locator::key is set\n");
1531 goto e_inval;
1532 }
1533
1534 if (struct_v >= 5) {
1535 len = ceph_decode_32(p);
1536 if (len > 0) {
1537 pr_warn("ceph_object_locator::nspace is set\n");
1538 goto e_inval;
1539 }
1540 }
1541
1542 if (struct_v >= 6) {
1543 s64 hash = ceph_decode_64(p);
1544 if (hash != -1) {
1545 pr_warn("ceph_object_locator::hash is set\n");
1546 goto e_inval;
1547 }
1548 }
1549
1550 /* skip the rest */
1551 *p = struct_end;
1552out:
1553 return ret;
1554
1555e_inval:
1556 ret = -EINVAL;
1557 goto out;
1558}
1559
1560static int ceph_redirect_decode(void **p, void *end,
1561 struct ceph_request_redirect *redir)
1562{
1563 u8 struct_v, struct_cv;
1564 u32 len;
1565 void *struct_end;
1566 int ret;
1567
1568 ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
1569 struct_v = ceph_decode_8(p);
1570 struct_cv = ceph_decode_8(p);
1571 if (struct_cv > 1) {
1572 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
1573 struct_v, struct_cv);
1574 goto e_inval;
1575 }
1576 len = ceph_decode_32(p);
1577 ceph_decode_need(p, end, len, e_inval);
1578 struct_end = *p + len;
1579
1580 ret = ceph_oloc_decode(p, end, &redir->oloc);
1581 if (ret)
1582 goto out;
1583
1584 len = ceph_decode_32(p);
1585 if (len > 0) {
1586 pr_warn("ceph_request_redirect::object_name is set\n");
1587 goto e_inval;
1588 }
1589
1590 len = ceph_decode_32(p);
1591 *p += len; /* skip osd_instructions */
1592
1593 /* skip the rest */
1594 *p = struct_end;
1595out:
1596 return ret;
1597
1598e_inval:
1599 ret = -EINVAL;
1600 goto out;
1601}
1602
1435static void complete_request(struct ceph_osd_request *req) 1603static void complete_request(struct ceph_osd_request *req)
1436{ 1604{
1437 complete_all(&req->r_safe_completion); /* fsync waiter */ 1605 complete_all(&req->r_safe_completion); /* fsync waiter */
@@ -1446,6 +1614,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1446{ 1614{
1447 void *p, *end; 1615 void *p, *end;
1448 struct ceph_osd_request *req; 1616 struct ceph_osd_request *req;
1617 struct ceph_request_redirect redir;
1449 u64 tid; 1618 u64 tid;
1450 int object_len; 1619 int object_len;
1451 unsigned int numops; 1620 unsigned int numops;
@@ -1525,10 +1694,41 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1525 for (i = 0; i < numops; i++) 1694 for (i = 0; i < numops; i++)
1526 req->r_reply_op_result[i] = ceph_decode_32(&p); 1695 req->r_reply_op_result[i] = ceph_decode_32(&p);
1527 1696
1528 already_completed = req->r_got_reply; 1697 if (le16_to_cpu(msg->hdr.version) >= 6) {
1698 p += 8 + 4; /* skip replay_version */
1699 p += 8; /* skip user_version */
1529 1700
1530 if (!req->r_got_reply) { 1701 err = ceph_redirect_decode(&p, end, &redir);
1702 if (err)
1703 goto bad_put;
1704 } else {
1705 redir.oloc.pool = -1;
1706 }
1707
1708 if (redir.oloc.pool != -1) {
1709 dout("redirect pool %lld\n", redir.oloc.pool);
1710
1711 __unregister_request(osdc, req);
1712 mutex_unlock(&osdc->request_mutex);
1713
1714 req->r_target_oloc = redir.oloc; /* struct */
1715
1716 /*
1717 * Start redirect requests with nofail=true. If
1718 * mapping fails, request will end up on the notarget
1719 * list, waiting for the new osdmap (which can take
1720 * a while), even though the original request mapped
1721 * successfully. In the future we might want to follow
1722 * original request's nofail setting here.
1723 */
1724 err = ceph_osdc_start_request(osdc, req, true);
1725 BUG_ON(err);
1531 1726
1727 goto done;
1728 }
1729
1730 already_completed = req->r_got_reply;
1731 if (!req->r_got_reply) {
1532 req->r_result = result; 1732 req->r_result = result;
1533 dout("handle_reply result %d bytes %d\n", req->r_result, 1733 dout("handle_reply result %d bytes %d\n", req->r_result,
1534 bytes); 1734 bytes);
@@ -1581,6 +1781,13 @@ done:
1581 return; 1781 return;
1582 1782
1583bad_put: 1783bad_put:
1784 req->r_result = -EIO;
1785 __unregister_request(osdc, req);
1786 if (req->r_callback)
1787 req->r_callback(req, msg);
1788 else
1789 complete_all(&req->r_completion);
1790 complete_request(req);
1584 ceph_osdc_put_request(req); 1791 ceph_osdc_put_request(req);
1585bad_mutex: 1792bad_mutex:
1586 mutex_unlock(&osdc->request_mutex); 1793 mutex_unlock(&osdc->request_mutex);
@@ -1613,14 +1820,17 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
1613 * 1820 *
1614 * Caller should hold map_sem for read. 1821 * Caller should hold map_sem for read.
1615 */ 1822 */
1616static void kick_requests(struct ceph_osd_client *osdc, int force_resend) 1823static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
1824 bool force_resend_writes)
1617{ 1825{
1618 struct ceph_osd_request *req, *nreq; 1826 struct ceph_osd_request *req, *nreq;
1619 struct rb_node *p; 1827 struct rb_node *p;
1620 int needmap = 0; 1828 int needmap = 0;
1621 int err; 1829 int err;
1830 bool force_resend_req;
1622 1831
1623 dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); 1832 dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "",
1833 force_resend_writes ? " (force resend writes)" : "");
1624 mutex_lock(&osdc->request_mutex); 1834 mutex_lock(&osdc->request_mutex);
1625 for (p = rb_first(&osdc->requests); p; ) { 1835 for (p = rb_first(&osdc->requests); p; ) {
1626 req = rb_entry(p, struct ceph_osd_request, r_node); 1836 req = rb_entry(p, struct ceph_osd_request, r_node);
@@ -1645,7 +1855,10 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1645 continue; 1855 continue;
1646 } 1856 }
1647 1857
1648 err = __map_request(osdc, req, force_resend); 1858 force_resend_req = force_resend ||
1859 (force_resend_writes &&
1860 req->r_flags & CEPH_OSD_FLAG_WRITE);
1861 err = __map_request(osdc, req, force_resend_req);
1649 if (err < 0) 1862 if (err < 0)
1650 continue; /* error */ 1863 continue; /* error */
1651 if (req->r_osd == NULL) { 1864 if (req->r_osd == NULL) {
@@ -1665,7 +1878,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1665 r_linger_item) { 1878 r_linger_item) {
1666 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1879 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1667 1880
1668 err = __map_request(osdc, req, force_resend); 1881 err = __map_request(osdc, req,
1882 force_resend || force_resend_writes);
1669 dout("__map_request returned %d\n", err); 1883 dout("__map_request returned %d\n", err);
1670 if (err == 0) 1884 if (err == 0)
1671 continue; /* no change and no osd was specified */ 1885 continue; /* no change and no osd was specified */
@@ -1707,6 +1921,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1707 struct ceph_osdmap *newmap = NULL, *oldmap; 1921 struct ceph_osdmap *newmap = NULL, *oldmap;
1708 int err; 1922 int err;
1709 struct ceph_fsid fsid; 1923 struct ceph_fsid fsid;
1924 bool was_full;
1710 1925
1711 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 1926 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1712 p = msg->front.iov_base; 1927 p = msg->front.iov_base;
@@ -1720,6 +1935,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1720 1935
1721 down_write(&osdc->map_sem); 1936 down_write(&osdc->map_sem);
1722 1937
1938 was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
1939
1723 /* incremental maps */ 1940 /* incremental maps */
1724 ceph_decode_32_safe(&p, end, nr_maps, bad); 1941 ceph_decode_32_safe(&p, end, nr_maps, bad);
1725 dout(" %d inc maps\n", nr_maps); 1942 dout(" %d inc maps\n", nr_maps);
@@ -1744,7 +1961,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1744 ceph_osdmap_destroy(osdc->osdmap); 1961 ceph_osdmap_destroy(osdc->osdmap);
1745 osdc->osdmap = newmap; 1962 osdc->osdmap = newmap;
1746 } 1963 }
1747 kick_requests(osdc, 0); 1964 was_full = was_full ||
1965 ceph_osdmap_flag(osdc->osdmap,
1966 CEPH_OSDMAP_FULL);
1967 kick_requests(osdc, 0, was_full);
1748 } else { 1968 } else {
1749 dout("ignoring incremental map %u len %d\n", 1969 dout("ignoring incremental map %u len %d\n",
1750 epoch, maplen); 1970 epoch, maplen);
@@ -1787,7 +2007,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1787 skipped_map = 1; 2007 skipped_map = 1;
1788 ceph_osdmap_destroy(oldmap); 2008 ceph_osdmap_destroy(oldmap);
1789 } 2009 }
1790 kick_requests(osdc, skipped_map); 2010 was_full = was_full ||
2011 ceph_osdmap_flag(osdc->osdmap,
2012 CEPH_OSDMAP_FULL);
2013 kick_requests(osdc, skipped_map, was_full);
1791 } 2014 }
1792 p += maplen; 2015 p += maplen;
1793 nr_maps--; 2016 nr_maps--;
@@ -1804,7 +2027,9 @@ done:
1804 * we find out when we are no longer full and stop returning 2027 * we find out when we are no longer full and stop returning
1805 * ENOSPC. 2028 * ENOSPC.
1806 */ 2029 */
1807 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 2030 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
2031 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
2032 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR))
1808 ceph_monc_request_next_osdmap(&osdc->client->monc); 2033 ceph_monc_request_next_osdmap(&osdc->client->monc);
1809 2034
1810 mutex_lock(&osdc->request_mutex); 2035 mutex_lock(&osdc->request_mutex);
@@ -2068,10 +2293,11 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
2068 ceph_encode_32(&p, -1); /* preferred */ 2293 ceph_encode_32(&p, -1); /* preferred */
2069 2294
2070 /* oid */ 2295 /* oid */
2071 ceph_encode_32(&p, req->r_oid_len); 2296 ceph_encode_32(&p, req->r_base_oid.name_len);
2072 memcpy(p, req->r_oid, req->r_oid_len); 2297 memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len);
2073 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); 2298 dout("oid '%.*s' len %d\n", req->r_base_oid.name_len,
2074 p += req->r_oid_len; 2299 req->r_base_oid.name, req->r_base_oid.name_len);
2300 p += req->r_base_oid.name_len;
2075 2301
2076 /* ops--can imply data */ 2302 /* ops--can imply data */
2077 ceph_encode_16(&p, (u16)req->r_num_ops); 2303 ceph_encode_16(&p, (u16)req->r_num_ops);
@@ -2454,7 +2680,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2454 struct ceph_osd_client *osdc = osd->o_osdc; 2680 struct ceph_osd_client *osdc = osd->o_osdc;
2455 struct ceph_msg *m; 2681 struct ceph_msg *m;
2456 struct ceph_osd_request *req; 2682 struct ceph_osd_request *req;
2457 int front = le32_to_cpu(hdr->front_len); 2683 int front_len = le32_to_cpu(hdr->front_len);
2458 int data_len = le32_to_cpu(hdr->data_len); 2684 int data_len = le32_to_cpu(hdr->data_len);
2459 u64 tid; 2685 u64 tid;
2460 2686
@@ -2474,12 +2700,13 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2474 req->r_reply, req->r_reply->con); 2700 req->r_reply, req->r_reply->con);
2475 ceph_msg_revoke_incoming(req->r_reply); 2701 ceph_msg_revoke_incoming(req->r_reply);
2476 2702
2477 if (front > req->r_reply->front.iov_len) { 2703 if (front_len > req->r_reply->front_alloc_len) {
2478 pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n", 2704 pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n",
2479 front, (int)req->r_reply->front.iov_len, 2705 front_len, req->r_reply->front_alloc_len,
2480 (unsigned int)con->peer_name.type, 2706 (unsigned int)con->peer_name.type,
2481 le64_to_cpu(con->peer_name.num)); 2707 le64_to_cpu(con->peer_name.num));
2482 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); 2708 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
2709 false);
2483 if (!m) 2710 if (!m)
2484 goto out; 2711 goto out;
2485 ceph_msg_put(req->r_reply); 2712 ceph_msg_put(req->r_reply);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index dbd9a4792427..aade4a5c1c07 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -464,6 +464,11 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id)
464 return NULL; 464 return NULL;
465} 465}
466 466
467struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id)
468{
469 return __lookup_pg_pool(&map->pg_pools, id);
470}
471
467const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id) 472const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
468{ 473{
469 struct ceph_pg_pool_info *pi; 474 struct ceph_pg_pool_info *pi;
@@ -514,8 +519,8 @@ static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
514 pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv); 519 pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
515 return -EINVAL; 520 return -EINVAL;
516 } 521 }
517 if (cv > 7) { 522 if (cv > 9) {
518 pr_warning("got v %d cv %d > 7 of ceph_pg_pool\n", ev, cv); 523 pr_warning("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);
519 return -EINVAL; 524 return -EINVAL;
520 } 525 }
521 len = ceph_decode_32(p); 526 len = ceph_decode_32(p);
@@ -543,12 +548,34 @@ static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
543 *p += len; 548 *p += len;
544 } 549 }
545 550
546 /* skip removed snaps */ 551 /* skip removed_snaps */
547 num = ceph_decode_32(p); 552 num = ceph_decode_32(p);
548 *p += num * (8 + 8); 553 *p += num * (8 + 8);
549 554
550 *p += 8; /* skip auid */ 555 *p += 8; /* skip auid */
551 pi->flags = ceph_decode_64(p); 556 pi->flags = ceph_decode_64(p);
557 *p += 4; /* skip crash_replay_interval */
558
559 if (ev >= 7)
560 *p += 1; /* skip min_size */
561
562 if (ev >= 8)
563 *p += 8 + 8; /* skip quota_max_* */
564
565 if (ev >= 9) {
566 /* skip tiers */
567 num = ceph_decode_32(p);
568 *p += num * 8;
569
570 *p += 8; /* skip tier_of */
571 *p += 1; /* skip cache_mode */
572
573 pi->read_tier = ceph_decode_64(p);
574 pi->write_tier = ceph_decode_64(p);
575 } else {
576 pi->read_tier = -1;
577 pi->write_tier = -1;
578 }
552 579
553 /* ignore the rest */ 580 /* ignore the rest */
554 581
@@ -1090,25 +1117,40 @@ invalid:
1090EXPORT_SYMBOL(ceph_calc_file_object_mapping); 1117EXPORT_SYMBOL(ceph_calc_file_object_mapping);
1091 1118
1092/* 1119/*
1093 * calculate an object layout (i.e. pgid) from an oid, 1120 * Calculate mapping of a (oloc, oid) pair to a PG. Should only be
1094 * file_layout, and osdmap 1121 * called with target's (oloc, oid), since tiering isn't taken into
1122 * account.
1095 */ 1123 */
1096int ceph_calc_ceph_pg(struct ceph_pg *pg, const char *oid, 1124int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap,
1097 struct ceph_osdmap *osdmap, uint64_t pool) 1125 struct ceph_object_locator *oloc,
1126 struct ceph_object_id *oid,
1127 struct ceph_pg *pg_out)
1098{ 1128{
1099 struct ceph_pg_pool_info *pool_info; 1129 struct ceph_pg_pool_info *pi;
1100 1130
1101 BUG_ON(!osdmap); 1131 pi = __lookup_pg_pool(&osdmap->pg_pools, oloc->pool);
1102 pool_info = __lookup_pg_pool(&osdmap->pg_pools, pool); 1132 if (!pi)
1103 if (!pool_info)
1104 return -EIO; 1133 return -EIO;
1105 pg->pool = pool;
1106 pg->seed = ceph_str_hash(pool_info->object_hash, oid, strlen(oid));
1107 1134
1108 dout("%s '%s' pgid %lld.%x\n", __func__, oid, pg->pool, pg->seed); 1135 pg_out->pool = oloc->pool;
1136 pg_out->seed = ceph_str_hash(pi->object_hash, oid->name,
1137 oid->name_len);
1138
1139 dout("%s '%.*s' pgid %llu.%x\n", __func__, oid->name_len, oid->name,
1140 pg_out->pool, pg_out->seed);
1109 return 0; 1141 return 0;
1110} 1142}
1111EXPORT_SYMBOL(ceph_calc_ceph_pg); 1143EXPORT_SYMBOL(ceph_oloc_oid_to_pg);
1144
1145static int crush_do_rule_ary(const struct crush_map *map, int ruleno, int x,
1146 int *result, int result_max,
1147 const __u32 *weight, int weight_max)
1148{
1149 int scratch[result_max * 3];
1150
1151 return crush_do_rule(map, ruleno, x, result, result_max,
1152 weight, weight_max, scratch);
1153}
1112 1154
1113/* 1155/*
1114 * Calculate raw osd vector for the given pgid. Return pointer to osd 1156 * Calculate raw osd vector for the given pgid. Return pointer to osd
@@ -1163,9 +1205,9 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1163 pool->pgp_num_mask) + 1205 pool->pgp_num_mask) +
1164 (unsigned)pgid.pool; 1206 (unsigned)pgid.pool;
1165 } 1207 }
1166 r = crush_do_rule(osdmap->crush, ruleno, pps, osds, 1208 r = crush_do_rule_ary(osdmap->crush, ruleno, pps,
1167 min_t(int, pool->size, *num), 1209 osds, min_t(int, pool->size, *num),
1168 osdmap->osd_weight); 1210 osdmap->osd_weight, osdmap->max_osd);
1169 if (r < 0) { 1211 if (r < 0) {
1170 pr_err("error %d from crush rule: pool %lld ruleset %d type %d" 1212 pr_err("error %d from crush rule: pool %lld ruleset %d type %d"
1171 " size %d\n", r, pgid.pool, pool->crush_ruleset, 1213 " size %d\n", r, pgid.pool, pool->crush_ruleset,