aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2016-10-10 16:52:05 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2016-10-10 16:52:05 -0400
commit8dfb790b15e779232d5d4e3f0102af2bea21ca55 (patch)
tree7208241fc93d39f769dcec0c227c8582f117dfce
parentfed41f7d039bad02f94cad9059e4b14cd81d13f2 (diff)
parent64f77566e1c84990d6c448bb3960f899521c0b7d (diff)
Merge tag 'ceph-for-4.9-rc1' of git://github.com/ceph/ceph-client
Pull Ceph updates from Ilya Dryomov: "The big ticket item here is support for rbd exclusive-lock feature, with maintenance operations offloaded to userspace (Douglas Fuller, Mike Christie and myself). Another block device bullet is a series fixing up layering error paths (myself). On the filesystem side, we've got patches that improve our handling of buffered vs dio write races (Neil Brown) and a few assorted fixes from Zheng. Also included a couple of random cleanups and a minor CRUSH update" * tag 'ceph-for-4.9-rc1' of git://github.com/ceph/ceph-client: (39 commits) crush: remove redundant local variable crush: don't normalize input of crush_ln iteratively libceph: ceph_build_auth() doesn't need ceph_auth_build_hello() libceph: use CEPH_AUTH_UNKNOWN in ceph_auth_build_hello() ceph: fix description for rsize and rasize mount options rbd: use kmalloc_array() in rbd_header_from_disk() ceph: use list_move instead of list_del/list_add ceph: handle CEPH_SESSION_REJECT message ceph: avoid accessing / when mounting a subpath ceph: fix mandatory flock check ceph: remove warning when ceph_releasepage() is called on dirty page ceph: ignore error from invalidate_inode_pages2_range() in direct write ceph: fix error handling of start_read() rbd: add rbd_obj_request_error() helper rbd: img_data requests don't own their page array rbd: don't call rbd_osd_req_format_read() for !img_data requests rbd: rework rbd_img_obj_exists_submit() error paths rbd: don't crash or leak on errors in rbd_img_obj_parent_read_full_callback() rbd: move bumping img_request refcount into rbd_obj_request_submit() rbd: mark the original request as done if stat request fails ...
-rw-r--r--Documentation/ABI/testing/sysfs-bus-rbd29
-rw-r--r--Documentation/filesystems/ceph.txt4
-rw-r--r--drivers/block/rbd.c1432
-rw-r--r--drivers/block/rbd_types.h11
-rw-r--r--fs/ceph/addr.c24
-rw-r--r--fs/ceph/file.c4
-rw-r--r--fs/ceph/locks.c4
-rw-r--r--fs/ceph/mds_client.c30
-rw-r--r--fs/ceph/mds_client.h1
-rw-r--r--fs/ceph/strings.c2
-rw-r--r--fs/ceph/super.c49
-rw-r--r--include/linux/ceph/auth.h2
-rw-r--r--include/linux/ceph/ceph_fs.h12
-rw-r--r--include/linux/ceph/cls_lock_client.h49
-rw-r--r--include/linux/ceph/libceph.h3
-rw-r--r--include/linux/ceph/mon_client.h3
-rw-r--r--include/linux/ceph/osd_client.h23
-rw-r--r--net/ceph/Makefile1
-rw-r--r--net/ceph/auth.c13
-rw-r--r--net/ceph/auth_none.c2
-rw-r--r--net/ceph/ceph_common.c13
-rw-r--r--net/ceph/ceph_strings.c1
-rw-r--r--net/ceph/cls_lock_client.c325
-rw-r--r--net/ceph/crush/mapper.c17
-rw-r--r--net/ceph/mon_client.c82
-rw-r--r--net/ceph/osd_client.c169
26 files changed, 1966 insertions, 339 deletions
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index 2ddd680929d8..f208ac58d613 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -6,7 +6,7 @@ Description:
6 6
7Being used for adding and removing rbd block devices. 7Being used for adding and removing rbd block devices.
8 8
9Usage: <mon ip addr> <options> <pool name> <rbd image name> [snap name] 9Usage: <mon ip addr> <options> <pool name> <rbd image name> [<snap name>]
10 10
11 $ echo "192.168.0.1 name=admin rbd foo" > /sys/bus/rbd/add 11 $ echo "192.168.0.1 name=admin rbd foo" > /sys/bus/rbd/add
12 12
@@ -14,9 +14,13 @@ The snapshot name can be "-" or omitted to map the image read/write. A <dev-id>
14will be assigned for any registered block device. If snapshot is used, it will 14will be assigned for any registered block device. If snapshot is used, it will
15be mapped read-only. 15be mapped read-only.
16 16
17Removal of a device: 17Usage: <dev-id> [force]
18 18
19 $ echo <dev-id> > /sys/bus/rbd/remove 19 $ echo 2 > /sys/bus/rbd/remove
20
21Optional "force" argument which when passed will wait for running requests and
22then unmap the image. Requests sent to the driver after initiating the removal
23will be failed. (August 2016, since 4.9.)
20 24
21What: /sys/bus/rbd/add_single_major 25What: /sys/bus/rbd/add_single_major
22Date: December 2013 26Date: December 2013
@@ -43,10 +47,25 @@ Description: Available only if rbd module is inserted with single_major
43Entries under /sys/bus/rbd/devices/<dev-id>/ 47Entries under /sys/bus/rbd/devices/<dev-id>/
44-------------------------------------------- 48--------------------------------------------
45 49
50client_addr
51
52 The ceph unique client entity_addr_t (address + nonce).
53 The format is <address>:<port>/<nonce>: '1.2.3.4:1234/5678' or
54 '[1:2:3:4:5:6:7:8]:1234/5678'. (August 2016, since 4.9.)
55
46client_id 56client_id
47 57
48 The ceph unique client id that was assigned for this specific session. 58 The ceph unique client id that was assigned for this specific session.
49 59
60cluster_fsid
61
62 The ceph cluster UUID. (August 2016, since 4.9.)
63
64config_info
65
66 The string written into /sys/bus/rbd/add{,_single_major}. (August
67 2016, since 4.9.)
68
50features 69features
51 70
52 A hexadecimal encoding of the feature bits for this image. 71 A hexadecimal encoding of the feature bits for this image.
@@ -92,6 +111,10 @@ current_snap
92 111
93 The current snapshot for which the device is mapped. 112 The current snapshot for which the device is mapped.
94 113
114snap_id
115
116 The current snapshot's id. (August 2016, since 4.9.)
117
95parent 118parent
96 119
97 Information identifying the chain of parent images in a layered rbd 120 Information identifying the chain of parent images in a layered rbd
diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt
index d6030aa33376..f5306ee40ea9 100644
--- a/Documentation/filesystems/ceph.txt
+++ b/Documentation/filesystems/ceph.txt
@@ -98,6 +98,10 @@ Mount Options
98 size. 98 size.
99 99
100 rsize=X 100 rsize=X
101 Specify the maximum read size in bytes. By default there is no
102 maximum.
103
104 rasize=X
101 Specify the maximum readahead. 105 Specify the maximum readahead.
102 106
103 mount_timeout=X 107 mount_timeout=X
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index c1f84df7838b..abb71628ab61 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -31,6 +31,7 @@
31#include <linux/ceph/libceph.h> 31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h> 32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h> 33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/cls_lock_client.h>
34#include <linux/ceph/decode.h> 35#include <linux/ceph/decode.h>
35#include <linux/parser.h> 36#include <linux/parser.h>
36#include <linux/bsearch.h> 37#include <linux/bsearch.h>
@@ -114,12 +115,17 @@ static int atomic_dec_return_safe(atomic_t *v)
114 115
115#define RBD_OBJ_PREFIX_LEN_MAX 64 116#define RBD_OBJ_PREFIX_LEN_MAX 64
116 117
118#define RBD_NOTIFY_TIMEOUT 5 /* seconds */
119#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
120
117/* Feature bits */ 121/* Feature bits */
118 122
119#define RBD_FEATURE_LAYERING (1<<0) 123#define RBD_FEATURE_LAYERING (1<<0)
120#define RBD_FEATURE_STRIPINGV2 (1<<1) 124#define RBD_FEATURE_STRIPINGV2 (1<<1)
121#define RBD_FEATURES_ALL \ 125#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
122 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2) 126#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
127 RBD_FEATURE_STRIPINGV2 | \
128 RBD_FEATURE_EXCLUSIVE_LOCK)
123 129
124/* Features supported by this (client software) implementation. */ 130/* Features supported by this (client software) implementation. */
125 131
@@ -128,11 +134,8 @@ static int atomic_dec_return_safe(atomic_t *v)
128/* 134/*
129 * An RBD device name will be "rbd#", where the "rbd" comes from 135 * An RBD device name will be "rbd#", where the "rbd" comes from
130 * RBD_DRV_NAME above, and # is a unique integer identifier. 136 * RBD_DRV_NAME above, and # is a unique integer identifier.
131 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
132 * enough to hold all possible device names.
133 */ 137 */
134#define DEV_NAME_LEN 32 138#define DEV_NAME_LEN 32
135#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
136 139
137/* 140/*
138 * block device image metadata (in-memory version) 141 * block device image metadata (in-memory version)
@@ -322,6 +325,24 @@ struct rbd_img_request {
322#define for_each_obj_request_safe(ireq, oreq, n) \ 325#define for_each_obj_request_safe(ireq, oreq, n) \
323 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) 326 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
324 327
328enum rbd_watch_state {
329 RBD_WATCH_STATE_UNREGISTERED,
330 RBD_WATCH_STATE_REGISTERED,
331 RBD_WATCH_STATE_ERROR,
332};
333
334enum rbd_lock_state {
335 RBD_LOCK_STATE_UNLOCKED,
336 RBD_LOCK_STATE_LOCKED,
337 RBD_LOCK_STATE_RELEASING,
338};
339
340/* WatchNotify::ClientId */
341struct rbd_client_id {
342 u64 gid;
343 u64 handle;
344};
345
325struct rbd_mapping { 346struct rbd_mapping {
326 u64 size; 347 u64 size;
327 u64 features; 348 u64 features;
@@ -349,13 +370,29 @@ struct rbd_device {
349 unsigned long flags; /* possibly lock protected */ 370 unsigned long flags; /* possibly lock protected */
350 struct rbd_spec *spec; 371 struct rbd_spec *spec;
351 struct rbd_options *opts; 372 struct rbd_options *opts;
373 char *config_info; /* add{,_single_major} string */
352 374
353 struct ceph_object_id header_oid; 375 struct ceph_object_id header_oid;
354 struct ceph_object_locator header_oloc; 376 struct ceph_object_locator header_oloc;
355 377
356 struct ceph_file_layout layout; 378 struct ceph_file_layout layout; /* used for all rbd requests */
357 379
380 struct mutex watch_mutex;
381 enum rbd_watch_state watch_state;
358 struct ceph_osd_linger_request *watch_handle; 382 struct ceph_osd_linger_request *watch_handle;
383 u64 watch_cookie;
384 struct delayed_work watch_dwork;
385
386 struct rw_semaphore lock_rwsem;
387 enum rbd_lock_state lock_state;
388 struct rbd_client_id owner_cid;
389 struct work_struct acquired_lock_work;
390 struct work_struct released_lock_work;
391 struct delayed_work lock_dwork;
392 struct work_struct unlock_work;
393 wait_queue_head_t lock_waitq;
394
395 struct workqueue_struct *task_wq;
359 396
360 struct rbd_spec *parent_spec; 397 struct rbd_spec *parent_spec;
361 u64 parent_overlap; 398 u64 parent_overlap;
@@ -439,6 +476,29 @@ static int minor_to_rbd_dev_id(int minor)
439 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; 476 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
440} 477}
441 478
479static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
480{
481 return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
482 rbd_dev->spec->snap_id == CEPH_NOSNAP &&
483 !rbd_dev->mapping.read_only;
484}
485
486static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
487{
488 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
489 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
490}
491
492static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
493{
494 bool is_lock_owner;
495
496 down_read(&rbd_dev->lock_rwsem);
497 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
498 up_read(&rbd_dev->lock_rwsem);
499 return is_lock_owner;
500}
501
442static BUS_ATTR(add, S_IWUSR, NULL, rbd_add); 502static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
443static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove); 503static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
444static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major); 504static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
@@ -735,6 +795,7 @@ enum {
735 /* string args above */ 795 /* string args above */
736 Opt_read_only, 796 Opt_read_only,
737 Opt_read_write, 797 Opt_read_write,
798 Opt_lock_on_read,
738 Opt_err 799 Opt_err
739}; 800};
740 801
@@ -746,16 +807,19 @@ static match_table_t rbd_opts_tokens = {
746 {Opt_read_only, "ro"}, /* Alternate spelling */ 807 {Opt_read_only, "ro"}, /* Alternate spelling */
747 {Opt_read_write, "read_write"}, 808 {Opt_read_write, "read_write"},
748 {Opt_read_write, "rw"}, /* Alternate spelling */ 809 {Opt_read_write, "rw"}, /* Alternate spelling */
810 {Opt_lock_on_read, "lock_on_read"},
749 {Opt_err, NULL} 811 {Opt_err, NULL}
750}; 812};
751 813
752struct rbd_options { 814struct rbd_options {
753 int queue_depth; 815 int queue_depth;
754 bool read_only; 816 bool read_only;
817 bool lock_on_read;
755}; 818};
756 819
757#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ 820#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
758#define RBD_READ_ONLY_DEFAULT false 821#define RBD_READ_ONLY_DEFAULT false
822#define RBD_LOCK_ON_READ_DEFAULT false
759 823
760static int parse_rbd_opts_token(char *c, void *private) 824static int parse_rbd_opts_token(char *c, void *private)
761{ 825{
@@ -791,6 +855,9 @@ static int parse_rbd_opts_token(char *c, void *private)
791 case Opt_read_write: 855 case Opt_read_write:
792 rbd_opts->read_only = false; 856 rbd_opts->read_only = false;
793 break; 857 break;
858 case Opt_lock_on_read:
859 rbd_opts->lock_on_read = true;
860 break;
794 default: 861 default:
795 /* libceph prints "bad option" msg */ 862 /* libceph prints "bad option" msg */
796 return -EINVAL; 863 return -EINVAL;
@@ -919,7 +986,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
919 char *snap_names = NULL; 986 char *snap_names = NULL;
920 u64 *snap_sizes = NULL; 987 u64 *snap_sizes = NULL;
921 u32 snap_count; 988 u32 snap_count;
922 size_t size;
923 int ret = -ENOMEM; 989 int ret = -ENOMEM;
924 u32 i; 990 u32 i;
925 991
@@ -957,9 +1023,9 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
957 goto out_err; 1023 goto out_err;
958 1024
959 /* ...as well as the array of their sizes. */ 1025 /* ...as well as the array of their sizes. */
960 1026 snap_sizes = kmalloc_array(snap_count,
961 size = snap_count * sizeof (*header->snap_sizes); 1027 sizeof(*header->snap_sizes),
962 snap_sizes = kmalloc(size, GFP_KERNEL); 1028 GFP_KERNEL);
963 if (!snap_sizes) 1029 if (!snap_sizes)
964 goto out_err; 1030 goto out_err;
965 1031
@@ -1551,11 +1617,18 @@ static bool obj_request_type_valid(enum obj_request_type type)
1551 } 1617 }
1552} 1618}
1553 1619
1554static int rbd_obj_request_submit(struct ceph_osd_client *osdc, 1620static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1555 struct rbd_obj_request *obj_request) 1621
1622static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1556{ 1623{
1557 dout("%s %p\n", __func__, obj_request); 1624 struct ceph_osd_request *osd_req = obj_request->osd_req;
1558 return ceph_osdc_start_request(osdc, obj_request->osd_req, false); 1625
1626 dout("%s %p osd_req %p\n", __func__, obj_request, osd_req);
1627 if (obj_request_img_data_test(obj_request)) {
1628 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1629 rbd_img_request_get(obj_request->img_request);
1630 }
1631 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1559} 1632}
1560 1633
1561static void rbd_obj_request_end(struct rbd_obj_request *obj_request) 1634static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
@@ -1745,6 +1818,22 @@ static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1745 complete_all(&obj_request->completion); 1818 complete_all(&obj_request->completion);
1746} 1819}
1747 1820
1821static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1822{
1823 obj_request->result = err;
1824 obj_request->xferred = 0;
1825 /*
1826 * kludge - mirror rbd_obj_request_submit() to match a put in
1827 * rbd_img_obj_callback()
1828 */
1829 if (obj_request_img_data_test(obj_request)) {
1830 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1831 rbd_img_request_get(obj_request->img_request);
1832 }
1833 obj_request_done_set(obj_request);
1834 rbd_obj_request_complete(obj_request);
1835}
1836
1748static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) 1837static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1749{ 1838{
1750 struct rbd_img_request *img_request = NULL; 1839 struct rbd_img_request *img_request = NULL;
@@ -1877,11 +1966,10 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1877 1966
1878static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) 1967static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1879{ 1968{
1880 struct rbd_img_request *img_request = obj_request->img_request;
1881 struct ceph_osd_request *osd_req = obj_request->osd_req; 1969 struct ceph_osd_request *osd_req = obj_request->osd_req;
1882 1970
1883 if (img_request) 1971 rbd_assert(obj_request_img_data_test(obj_request));
1884 osd_req->r_snapid = img_request->snap_id; 1972 osd_req->r_snapid = obj_request->img_request->snap_id;
1885} 1973}
1886 1974
1887static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) 1975static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
@@ -2074,7 +2162,9 @@ static void rbd_obj_request_destroy(struct kref *kref)
2074 bio_chain_put(obj_request->bio_list); 2162 bio_chain_put(obj_request->bio_list);
2075 break; 2163 break;
2076 case OBJ_REQUEST_PAGES: 2164 case OBJ_REQUEST_PAGES:
2077 if (obj_request->pages) 2165 /* img_data requests don't own their page array */
2166 if (obj_request->pages &&
2167 !obj_request_img_data_test(obj_request))
2078 ceph_release_page_vector(obj_request->pages, 2168 ceph_release_page_vector(obj_request->pages,
2079 obj_request->page_count); 2169 obj_request->page_count);
2080 break; 2170 break;
@@ -2295,13 +2385,6 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2295 xferred = obj_request->length; 2385 xferred = obj_request->length;
2296 } 2386 }
2297 2387
2298 /* Image object requests don't own their page array */
2299
2300 if (obj_request->type == OBJ_REQUEST_PAGES) {
2301 obj_request->pages = NULL;
2302 obj_request->page_count = 0;
2303 }
2304
2305 if (img_request_child_test(img_request)) { 2388 if (img_request_child_test(img_request)) {
2306 rbd_assert(img_request->obj_request != NULL); 2389 rbd_assert(img_request->obj_request != NULL);
2307 more = obj_request->which < img_request->obj_request_count - 1; 2390 more = obj_request->which < img_request->obj_request_count - 1;
@@ -2520,8 +2603,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
2520 2603
2521 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0); 2604 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2522 2605
2523 rbd_img_request_get(img_request);
2524
2525 img_offset += length; 2606 img_offset += length;
2526 resid -= length; 2607 resid -= length;
2527 } 2608 }
@@ -2579,7 +2660,6 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2579{ 2660{
2580 struct rbd_obj_request *orig_request; 2661 struct rbd_obj_request *orig_request;
2581 struct ceph_osd_request *osd_req; 2662 struct ceph_osd_request *osd_req;
2582 struct ceph_osd_client *osdc;
2583 struct rbd_device *rbd_dev; 2663 struct rbd_device *rbd_dev;
2584 struct page **pages; 2664 struct page **pages;
2585 enum obj_operation_type op_type; 2665 enum obj_operation_type op_type;
@@ -2603,7 +2683,7 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2603 rbd_assert(obj_request_type_valid(orig_request->type)); 2683 rbd_assert(obj_request_type_valid(orig_request->type));
2604 img_result = img_request->result; 2684 img_result = img_request->result;
2605 parent_length = img_request->length; 2685 parent_length = img_request->length;
2606 rbd_assert(parent_length == img_request->xferred); 2686 rbd_assert(img_result || parent_length == img_request->xferred);
2607 rbd_img_request_put(img_request); 2687 rbd_img_request_put(img_request);
2608 2688
2609 rbd_assert(orig_request->img_request); 2689 rbd_assert(orig_request->img_request);
@@ -2616,13 +2696,9 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2616 * and re-submit the original write request. 2696 * and re-submit the original write request.
2617 */ 2697 */
2618 if (!rbd_dev->parent_overlap) { 2698 if (!rbd_dev->parent_overlap) {
2619 struct ceph_osd_client *osdc;
2620
2621 ceph_release_page_vector(pages, page_count); 2699 ceph_release_page_vector(pages, page_count);
2622 osdc = &rbd_dev->rbd_client->client->osdc; 2700 rbd_obj_request_submit(orig_request);
2623 img_result = rbd_obj_request_submit(osdc, orig_request); 2701 return;
2624 if (!img_result)
2625 return;
2626 } 2702 }
2627 2703
2628 if (img_result) 2704 if (img_result)
@@ -2656,17 +2732,12 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2656 2732
2657 /* All set, send it off. */ 2733 /* All set, send it off. */
2658 2734
2659 osdc = &rbd_dev->rbd_client->client->osdc; 2735 rbd_obj_request_submit(orig_request);
2660 img_result = rbd_obj_request_submit(osdc, orig_request); 2736 return;
2661 if (!img_result)
2662 return;
2663out_err:
2664 /* Record the error code and complete the request */
2665 2737
2666 orig_request->result = img_result; 2738out_err:
2667 orig_request->xferred = 0; 2739 ceph_release_page_vector(pages, page_count);
2668 obj_request_done_set(orig_request); 2740 rbd_obj_request_error(orig_request, img_result);
2669 rbd_obj_request_complete(orig_request);
2670} 2741}
2671 2742
2672/* 2743/*
@@ -2680,26 +2751,19 @@ out_err:
2680 * When the read completes, this page array will be transferred to 2751 * When the read completes, this page array will be transferred to
2681 * the original object request for the copyup operation. 2752 * the original object request for the copyup operation.
2682 * 2753 *
2683 * If an error occurs, record it as the result of the original 2754 * If an error occurs, it is recorded as the result of the original
2684 * object request and mark it done so it gets completed. 2755 * object request in rbd_img_obj_exists_callback().
2685 */ 2756 */
2686static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) 2757static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2687{ 2758{
2688 struct rbd_img_request *img_request = NULL; 2759 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2689 struct rbd_img_request *parent_request = NULL; 2760 struct rbd_img_request *parent_request = NULL;
2690 struct rbd_device *rbd_dev;
2691 u64 img_offset; 2761 u64 img_offset;
2692 u64 length; 2762 u64 length;
2693 struct page **pages = NULL; 2763 struct page **pages = NULL;
2694 u32 page_count; 2764 u32 page_count;
2695 int result; 2765 int result;
2696 2766
2697 rbd_assert(obj_request_img_data_test(obj_request));
2698 rbd_assert(obj_request_type_valid(obj_request->type));
2699
2700 img_request = obj_request->img_request;
2701 rbd_assert(img_request != NULL);
2702 rbd_dev = img_request->rbd_dev;
2703 rbd_assert(rbd_dev->parent != NULL); 2767 rbd_assert(rbd_dev->parent != NULL);
2704 2768
2705 /* 2769 /*
@@ -2740,10 +2804,11 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2740 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); 2804 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2741 if (result) 2805 if (result)
2742 goto out_err; 2806 goto out_err;
2807
2743 parent_request->copyup_pages = pages; 2808 parent_request->copyup_pages = pages;
2744 parent_request->copyup_page_count = page_count; 2809 parent_request->copyup_page_count = page_count;
2745
2746 parent_request->callback = rbd_img_obj_parent_read_full_callback; 2810 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2811
2747 result = rbd_img_request_submit(parent_request); 2812 result = rbd_img_request_submit(parent_request);
2748 if (!result) 2813 if (!result)
2749 return 0; 2814 return 0;
@@ -2757,10 +2822,6 @@ out_err:
2757 ceph_release_page_vector(pages, page_count); 2822 ceph_release_page_vector(pages, page_count);
2758 if (parent_request) 2823 if (parent_request)
2759 rbd_img_request_put(parent_request); 2824 rbd_img_request_put(parent_request);
2760 obj_request->result = result;
2761 obj_request->xferred = 0;
2762 obj_request_done_set(obj_request);
2763
2764 return result; 2825 return result;
2765} 2826}
2766 2827
@@ -2793,17 +2854,13 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2793 2854
2794 /* 2855 /*
2795 * If the overlap has become 0 (most likely because the 2856 * If the overlap has become 0 (most likely because the
2796 * image has been flattened) we need to free the pages 2857 * image has been flattened) we need to re-submit the
2797 * and re-submit the original write request. 2858 * original request.
2798 */ 2859 */
2799 rbd_dev = orig_request->img_request->rbd_dev; 2860 rbd_dev = orig_request->img_request->rbd_dev;
2800 if (!rbd_dev->parent_overlap) { 2861 if (!rbd_dev->parent_overlap) {
2801 struct ceph_osd_client *osdc; 2862 rbd_obj_request_submit(orig_request);
2802 2863 return;
2803 osdc = &rbd_dev->rbd_client->client->osdc;
2804 result = rbd_obj_request_submit(osdc, orig_request);
2805 if (!result)
2806 return;
2807 } 2864 }
2808 2865
2809 /* 2866 /*
@@ -2816,31 +2873,45 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2816 obj_request_existence_set(orig_request, true); 2873 obj_request_existence_set(orig_request, true);
2817 } else if (result == -ENOENT) { 2874 } else if (result == -ENOENT) {
2818 obj_request_existence_set(orig_request, false); 2875 obj_request_existence_set(orig_request, false);
2819 } else if (result) { 2876 } else {
2820 orig_request->result = result; 2877 goto fail_orig_request;
2821 goto out;
2822 } 2878 }
2823 2879
2824 /* 2880 /*
2825 * Resubmit the original request now that we have recorded 2881 * Resubmit the original request now that we have recorded
2826 * whether the target object exists. 2882 * whether the target object exists.
2827 */ 2883 */
2828 orig_request->result = rbd_img_obj_request_submit(orig_request); 2884 result = rbd_img_obj_request_submit(orig_request);
2829out: 2885 if (result)
2830 if (orig_request->result) 2886 goto fail_orig_request;
2831 rbd_obj_request_complete(orig_request); 2887
2888 return;
2889
2890fail_orig_request:
2891 rbd_obj_request_error(orig_request, result);
2832} 2892}
2833 2893
2834static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) 2894static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2835{ 2895{
2896 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2836 struct rbd_obj_request *stat_request; 2897 struct rbd_obj_request *stat_request;
2837 struct rbd_device *rbd_dev; 2898 struct page **pages;
2838 struct ceph_osd_client *osdc;
2839 struct page **pages = NULL;
2840 u32 page_count; 2899 u32 page_count;
2841 size_t size; 2900 size_t size;
2842 int ret; 2901 int ret;
2843 2902
2903 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2904 OBJ_REQUEST_PAGES);
2905 if (!stat_request)
2906 return -ENOMEM;
2907
2908 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2909 stat_request);
2910 if (!stat_request->osd_req) {
2911 ret = -ENOMEM;
2912 goto fail_stat_request;
2913 }
2914
2844 /* 2915 /*
2845 * The response data for a STAT call consists of: 2916 * The response data for a STAT call consists of:
2846 * le64 length; 2917 * le64 length;
@@ -2852,52 +2923,33 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2852 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32); 2923 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2853 page_count = (u32)calc_pages_for(0, size); 2924 page_count = (u32)calc_pages_for(0, size);
2854 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 2925 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2855 if (IS_ERR(pages)) 2926 if (IS_ERR(pages)) {
2856 return PTR_ERR(pages); 2927 ret = PTR_ERR(pages);
2928 goto fail_stat_request;
2929 }
2857 2930
2858 ret = -ENOMEM; 2931 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2859 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, 2932 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2860 OBJ_REQUEST_PAGES); 2933 false, false);
2861 if (!stat_request)
2862 goto out;
2863 2934
2864 rbd_obj_request_get(obj_request); 2935 rbd_obj_request_get(obj_request);
2865 stat_request->obj_request = obj_request; 2936 stat_request->obj_request = obj_request;
2866 stat_request->pages = pages; 2937 stat_request->pages = pages;
2867 stat_request->page_count = page_count; 2938 stat_request->page_count = page_count;
2868
2869 rbd_assert(obj_request->img_request);
2870 rbd_dev = obj_request->img_request->rbd_dev;
2871 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2872 stat_request);
2873 if (!stat_request->osd_req)
2874 goto out;
2875 stat_request->callback = rbd_img_obj_exists_callback; 2939 stat_request->callback = rbd_img_obj_exists_callback;
2876 2940
2877 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0); 2941 rbd_obj_request_submit(stat_request);
2878 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, 2942 return 0;
2879 false, false);
2880 rbd_osd_req_format_read(stat_request);
2881
2882 osdc = &rbd_dev->rbd_client->client->osdc;
2883 ret = rbd_obj_request_submit(osdc, stat_request);
2884out:
2885 if (ret)
2886 rbd_obj_request_put(obj_request);
2887 2943
2944fail_stat_request:
2945 rbd_obj_request_put(stat_request);
2888 return ret; 2946 return ret;
2889} 2947}
2890 2948
2891static bool img_obj_request_simple(struct rbd_obj_request *obj_request) 2949static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2892{ 2950{
2893 struct rbd_img_request *img_request; 2951 struct rbd_img_request *img_request = obj_request->img_request;
2894 struct rbd_device *rbd_dev; 2952 struct rbd_device *rbd_dev = img_request->rbd_dev;
2895
2896 rbd_assert(obj_request_img_data_test(obj_request));
2897
2898 img_request = obj_request->img_request;
2899 rbd_assert(img_request);
2900 rbd_dev = img_request->rbd_dev;
2901 2953
2902 /* Reads */ 2954 /* Reads */
2903 if (!img_request_write_test(img_request) && 2955 if (!img_request_write_test(img_request) &&
@@ -2936,14 +2988,13 @@ static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2936 2988
2937static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) 2989static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2938{ 2990{
2939 if (img_obj_request_simple(obj_request)) { 2991 rbd_assert(obj_request_img_data_test(obj_request));
2940 struct rbd_device *rbd_dev; 2992 rbd_assert(obj_request_type_valid(obj_request->type));
2941 struct ceph_osd_client *osdc; 2993 rbd_assert(obj_request->img_request);
2942
2943 rbd_dev = obj_request->img_request->rbd_dev;
2944 osdc = &rbd_dev->rbd_client->client->osdc;
2945 2994
2946 return rbd_obj_request_submit(osdc, obj_request); 2995 if (img_obj_request_simple(obj_request)) {
2996 rbd_obj_request_submit(obj_request);
2997 return 0;
2947 } 2998 }
2948 2999
2949 /* 3000 /*
@@ -3006,12 +3057,8 @@ static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
3006 rbd_assert(obj_request->img_request); 3057 rbd_assert(obj_request->img_request);
3007 rbd_dev = obj_request->img_request->rbd_dev; 3058 rbd_dev = obj_request->img_request->rbd_dev;
3008 if (!rbd_dev->parent_overlap) { 3059 if (!rbd_dev->parent_overlap) {
3009 struct ceph_osd_client *osdc; 3060 rbd_obj_request_submit(obj_request);
3010 3061 return;
3011 osdc = &rbd_dev->rbd_client->client->osdc;
3012 img_result = rbd_obj_request_submit(osdc, obj_request);
3013 if (!img_result)
3014 return;
3015 } 3062 }
3016 3063
3017 obj_request->result = img_result; 3064 obj_request->result = img_result;
@@ -3084,65 +3131,724 @@ out_err:
3084 obj_request_done_set(obj_request); 3131 obj_request_done_set(obj_request);
3085} 3132}
3086 3133
3087static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev); 3134static const struct rbd_client_id rbd_empty_cid;
3088static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev);
3089 3135
3090static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, 3136static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3091 u64 notifier_id, void *data, size_t data_len) 3137 const struct rbd_client_id *rhs)
3138{
3139 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3140}
3141
3142static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3143{
3144 struct rbd_client_id cid;
3145
3146 mutex_lock(&rbd_dev->watch_mutex);
3147 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3148 cid.handle = rbd_dev->watch_cookie;
3149 mutex_unlock(&rbd_dev->watch_mutex);
3150 return cid;
3151}
3152
3153/*
3154 * lock_rwsem must be held for write
3155 */
3156static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3157 const struct rbd_client_id *cid)
3158{
3159 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3160 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3161 cid->gid, cid->handle);
3162 rbd_dev->owner_cid = *cid; /* struct */
3163}
3164
3165static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3166{
3167 mutex_lock(&rbd_dev->watch_mutex);
3168 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3169 mutex_unlock(&rbd_dev->watch_mutex);
3170}
3171
3172/*
3173 * lock_rwsem must be held for write
3174 */
3175static int rbd_lock(struct rbd_device *rbd_dev)
3092{ 3176{
3093 struct rbd_device *rbd_dev = arg;
3094 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3177 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3178 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3179 char cookie[32];
3180 int ret;
3181
3182 WARN_ON(__rbd_is_lock_owner(rbd_dev));
3183
3184 format_lock_cookie(rbd_dev, cookie);
3185 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3186 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3187 RBD_LOCK_TAG, "", 0);
3188 if (ret)
3189 return ret;
3190
3191 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3192 rbd_set_owner_cid(rbd_dev, &cid);
3193 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3194 return 0;
3195}
3196
3197/*
3198 * lock_rwsem must be held for write
3199 */
3200static int rbd_unlock(struct rbd_device *rbd_dev)
3201{
3202 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3203 char cookie[32];
3204 int ret;
3205
3206 WARN_ON(!__rbd_is_lock_owner(rbd_dev));
3207
3208 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3209
3210 format_lock_cookie(rbd_dev, cookie);
3211 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3212 RBD_LOCK_NAME, cookie);
3213 if (ret && ret != -ENOENT) {
3214 rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
3215 return ret;
3216 }
3217
3218 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3219 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3220 return 0;
3221}
3222
3223static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3224 enum rbd_notify_op notify_op,
3225 struct page ***preply_pages,
3226 size_t *preply_len)
3227{
3228 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3229 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3230 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3231 char buf[buf_size];
3232 void *p = buf;
3233
3234 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3235
3236 /* encode *LockPayload NotifyMessage (op + ClientId) */
3237 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3238 ceph_encode_32(&p, notify_op);
3239 ceph_encode_64(&p, cid.gid);
3240 ceph_encode_64(&p, cid.handle);
3241
3242 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3243 &rbd_dev->header_oloc, buf, buf_size,
3244 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3245}
3246
3247static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3248 enum rbd_notify_op notify_op)
3249{
3250 struct page **reply_pages;
3251 size_t reply_len;
3252
3253 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3254 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3255}
3256
3257static void rbd_notify_acquired_lock(struct work_struct *work)
3258{
3259 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3260 acquired_lock_work);
3261
3262 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3263}
3264
3265static void rbd_notify_released_lock(struct work_struct *work)
3266{
3267 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3268 released_lock_work);
3269
3270 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3271}
3272
3273static int rbd_request_lock(struct rbd_device *rbd_dev)
3274{
3275 struct page **reply_pages;
3276 size_t reply_len;
3277 bool lock_owner_responded = false;
3095 int ret; 3278 int ret;
3096 3279
3097 dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev, 3280 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3098 cookie, notify_id); 3281
3282 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3283 &reply_pages, &reply_len);
3284 if (ret && ret != -ETIMEDOUT) {
3285 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3286 goto out;
3287 }
3288
3289 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3290 void *p = page_address(reply_pages[0]);
3291 void *const end = p + reply_len;
3292 u32 n;
3293
3294 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3295 while (n--) {
3296 u8 struct_v;
3297 u32 len;
3099 3298
3299 ceph_decode_need(&p, end, 8 + 8, e_inval);
3300 p += 8 + 8; /* skip gid and cookie */
3301
3302 ceph_decode_32_safe(&p, end, len, e_inval);
3303 if (!len)
3304 continue;
3305
3306 if (lock_owner_responded) {
3307 rbd_warn(rbd_dev,
3308 "duplicate lock owners detected");
3309 ret = -EIO;
3310 goto out;
3311 }
3312
3313 lock_owner_responded = true;
3314 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3315 &struct_v, &len);
3316 if (ret) {
3317 rbd_warn(rbd_dev,
3318 "failed to decode ResponseMessage: %d",
3319 ret);
3320 goto e_inval;
3321 }
3322
3323 ret = ceph_decode_32(&p);
3324 }
3325 }
3326
3327 if (!lock_owner_responded) {
3328 rbd_warn(rbd_dev, "no lock owners detected");
3329 ret = -ETIMEDOUT;
3330 }
3331
3332out:
3333 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3334 return ret;
3335
3336e_inval:
3337 ret = -EINVAL;
3338 goto out;
3339}
3340
3341static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3342{
3343 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3344
3345 cancel_delayed_work(&rbd_dev->lock_dwork);
3346 if (wake_all)
3347 wake_up_all(&rbd_dev->lock_waitq);
3348 else
3349 wake_up(&rbd_dev->lock_waitq);
3350}
3351
3352static int get_lock_owner_info(struct rbd_device *rbd_dev,
3353 struct ceph_locker **lockers, u32 *num_lockers)
3354{
3355 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3356 u8 lock_type;
3357 char *lock_tag;
3358 int ret;
3359
3360 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3361
3362 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3363 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3364 &lock_type, &lock_tag, lockers, num_lockers);
3365 if (ret)
3366 return ret;
3367
3368 if (*num_lockers == 0) {
3369 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3370 goto out;
3371 }
3372
3373 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3374 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3375 lock_tag);
3376 ret = -EBUSY;
3377 goto out;
3378 }
3379
3380 if (lock_type == CEPH_CLS_LOCK_SHARED) {
3381 rbd_warn(rbd_dev, "shared lock type detected");
3382 ret = -EBUSY;
3383 goto out;
3384 }
3385
3386 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3387 strlen(RBD_LOCK_COOKIE_PREFIX))) {
3388 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3389 (*lockers)[0].id.cookie);
3390 ret = -EBUSY;
3391 goto out;
3392 }
3393
3394out:
3395 kfree(lock_tag);
3396 return ret;
3397}
3398
3399static int find_watcher(struct rbd_device *rbd_dev,
3400 const struct ceph_locker *locker)
3401{
3402 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3403 struct ceph_watch_item *watchers;
3404 u32 num_watchers;
3405 u64 cookie;
3406 int i;
3407 int ret;
3408
3409 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3410 &rbd_dev->header_oloc, &watchers,
3411 &num_watchers);
3412 if (ret)
3413 return ret;
3414
3415 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3416 for (i = 0; i < num_watchers; i++) {
3417 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3418 sizeof(locker->info.addr)) &&
3419 watchers[i].cookie == cookie) {
3420 struct rbd_client_id cid = {
3421 .gid = le64_to_cpu(watchers[i].name.num),
3422 .handle = cookie,
3423 };
3424
3425 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3426 rbd_dev, cid.gid, cid.handle);
3427 rbd_set_owner_cid(rbd_dev, &cid);
3428 ret = 1;
3429 goto out;
3430 }
3431 }
3432
3433 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3434 ret = 0;
3435out:
3436 kfree(watchers);
3437 return ret;
3438}
3439
3440/*
3441 * lock_rwsem must be held for write
3442 */
3443static int rbd_try_lock(struct rbd_device *rbd_dev)
3444{
3445 struct ceph_client *client = rbd_dev->rbd_client->client;
3446 struct ceph_locker *lockers;
3447 u32 num_lockers;
3448 int ret;
3449
3450 for (;;) {
3451 ret = rbd_lock(rbd_dev);
3452 if (ret != -EBUSY)
3453 return ret;
3454
3455 /* determine if the current lock holder is still alive */
3456 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3457 if (ret)
3458 return ret;
3459
3460 if (num_lockers == 0)
3461 goto again;
3462
3463 ret = find_watcher(rbd_dev, lockers);
3464 if (ret) {
3465 if (ret > 0)
3466 ret = 0; /* have to request lock */
3467 goto out;
3468 }
3469
3470 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3471 ENTITY_NAME(lockers[0].id.name));
3472
3473 ret = ceph_monc_blacklist_add(&client->monc,
3474 &lockers[0].info.addr);
3475 if (ret) {
3476 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3477 ENTITY_NAME(lockers[0].id.name), ret);
3478 goto out;
3479 }
3480
3481 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3482 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3483 lockers[0].id.cookie,
3484 &lockers[0].id.name);
3485 if (ret && ret != -ENOENT)
3486 goto out;
3487
3488again:
3489 ceph_free_lockers(lockers, num_lockers);
3490 }
3491
3492out:
3493 ceph_free_lockers(lockers, num_lockers);
3494 return ret;
3495}
3496
3497/*
3498 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3499 */
3500static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3501 int *pret)
3502{
3503 enum rbd_lock_state lock_state;
3504
3505 down_read(&rbd_dev->lock_rwsem);
3506 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3507 rbd_dev->lock_state);
3508 if (__rbd_is_lock_owner(rbd_dev)) {
3509 lock_state = rbd_dev->lock_state;
3510 up_read(&rbd_dev->lock_rwsem);
3511 return lock_state;
3512 }
3513
3514 up_read(&rbd_dev->lock_rwsem);
3515 down_write(&rbd_dev->lock_rwsem);
3516 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3517 rbd_dev->lock_state);
3518 if (!__rbd_is_lock_owner(rbd_dev)) {
3519 *pret = rbd_try_lock(rbd_dev);
3520 if (*pret)
3521 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3522 }
3523
3524 lock_state = rbd_dev->lock_state;
3525 up_write(&rbd_dev->lock_rwsem);
3526 return lock_state;
3527}
3528
3529static void rbd_acquire_lock(struct work_struct *work)
3530{
3531 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3532 struct rbd_device, lock_dwork);
3533 enum rbd_lock_state lock_state;
3534 int ret;
3535
3536 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3537again:
3538 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3539 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3540 if (lock_state == RBD_LOCK_STATE_LOCKED)
3541 wake_requests(rbd_dev, true);
3542 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3543 rbd_dev, lock_state, ret);
3544 return;
3545 }
3546
3547 ret = rbd_request_lock(rbd_dev);
3548 if (ret == -ETIMEDOUT) {
3549 goto again; /* treat this as a dead client */
3550 } else if (ret < 0) {
3551 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3552 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3553 RBD_RETRY_DELAY);
3554 } else {
3555 /*
3556 * lock owner acked, but resend if we don't see them
3557 * release the lock
3558 */
3559 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3560 rbd_dev);
3561 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3562 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3563 }
3564}
3565
3566/*
3567 * lock_rwsem must be held for write
3568 */
3569static bool rbd_release_lock(struct rbd_device *rbd_dev)
3570{
3571 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3572 rbd_dev->lock_state);
3573 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3574 return false;
3575
3576 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3577 downgrade_write(&rbd_dev->lock_rwsem);
3100 /* 3578 /*
3101 * Until adequate refresh error handling is in place, there is 3579 * Ensure that all in-flight IO is flushed.
3102 * not much we can do here, except warn.
3103 * 3580 *
3104 * See http://tracker.ceph.com/issues/5040 3581 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3582 * may be shared with other devices.
3105 */ 3583 */
3106 ret = rbd_dev_refresh(rbd_dev); 3584 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3107 if (ret) 3585 up_read(&rbd_dev->lock_rwsem);
3108 rbd_warn(rbd_dev, "refresh failed: %d", ret); 3586
3587 down_write(&rbd_dev->lock_rwsem);
3588 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3589 rbd_dev->lock_state);
3590 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3591 return false;
3592
3593 if (!rbd_unlock(rbd_dev))
3594 /*
3595 * Give others a chance to grab the lock - we would re-acquire
3596 * almost immediately if we got new IO during ceph_osdc_sync()
3597 * otherwise. We need to ack our own notifications, so this
3598 * lock_dwork will be requeued from rbd_wait_state_locked()
3599 * after wake_requests() in rbd_handle_released_lock().
3600 */
3601 cancel_delayed_work(&rbd_dev->lock_dwork);
3602
3603 return true;
3604}
3605
3606static void rbd_release_lock_work(struct work_struct *work)
3607{
3608 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3609 unlock_work);
3610
3611 down_write(&rbd_dev->lock_rwsem);
3612 rbd_release_lock(rbd_dev);
3613 up_write(&rbd_dev->lock_rwsem);
3614}
3615
3616static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3617 void **p)
3618{
3619 struct rbd_client_id cid = { 0 };
3620
3621 if (struct_v >= 2) {
3622 cid.gid = ceph_decode_64(p);
3623 cid.handle = ceph_decode_64(p);
3624 }
3625
3626 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3627 cid.handle);
3628 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3629 down_write(&rbd_dev->lock_rwsem);
3630 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3631 /*
3632 * we already know that the remote client is
3633 * the owner
3634 */
3635 up_write(&rbd_dev->lock_rwsem);
3636 return;
3637 }
3638
3639 rbd_set_owner_cid(rbd_dev, &cid);
3640 downgrade_write(&rbd_dev->lock_rwsem);
3641 } else {
3642 down_read(&rbd_dev->lock_rwsem);
3643 }
3644
3645 if (!__rbd_is_lock_owner(rbd_dev))
3646 wake_requests(rbd_dev, false);
3647 up_read(&rbd_dev->lock_rwsem);
3648}
3649
3650static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3651 void **p)
3652{
3653 struct rbd_client_id cid = { 0 };
3654
3655 if (struct_v >= 2) {
3656 cid.gid = ceph_decode_64(p);
3657 cid.handle = ceph_decode_64(p);
3658 }
3659
3660 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3661 cid.handle);
3662 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3663 down_write(&rbd_dev->lock_rwsem);
3664 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3665 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3666 __func__, rbd_dev, cid.gid, cid.handle,
3667 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3668 up_write(&rbd_dev->lock_rwsem);
3669 return;
3670 }
3671
3672 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3673 downgrade_write(&rbd_dev->lock_rwsem);
3674 } else {
3675 down_read(&rbd_dev->lock_rwsem);
3676 }
3677
3678 if (!__rbd_is_lock_owner(rbd_dev))
3679 wake_requests(rbd_dev, false);
3680 up_read(&rbd_dev->lock_rwsem);
3681}
3682
3683static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3684 void **p)
3685{
3686 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3687 struct rbd_client_id cid = { 0 };
3688 bool need_to_send;
3689
3690 if (struct_v >= 2) {
3691 cid.gid = ceph_decode_64(p);
3692 cid.handle = ceph_decode_64(p);
3693 }
3694
3695 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3696 cid.handle);
3697 if (rbd_cid_equal(&cid, &my_cid))
3698 return false;
3699
3700 down_read(&rbd_dev->lock_rwsem);
3701 need_to_send = __rbd_is_lock_owner(rbd_dev);
3702 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3703 if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
3704 dout("%s rbd_dev %p queueing unlock_work\n", __func__,
3705 rbd_dev);
3706 queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
3707 }
3708 }
3709 up_read(&rbd_dev->lock_rwsem);
3710 return need_to_send;
3711}
3712
3713static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3714 u64 notify_id, u64 cookie, s32 *result)
3715{
3716 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3717 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3718 char buf[buf_size];
3719 int ret;
3720
3721 if (result) {
3722 void *p = buf;
3723
3724 /* encode ResponseMessage */
3725 ceph_start_encoding(&p, 1, 1,
3726 buf_size - CEPH_ENCODING_START_BLK_LEN);
3727 ceph_encode_32(&p, *result);
3728 } else {
3729 buf_size = 0;
3730 }
3109 3731
3110 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid, 3732 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3111 &rbd_dev->header_oloc, notify_id, cookie, 3733 &rbd_dev->header_oloc, notify_id, cookie,
3112 NULL, 0); 3734 buf, buf_size);
3113 if (ret) 3735 if (ret)
3114 rbd_warn(rbd_dev, "notify_ack ret %d", ret); 3736 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3115} 3737}
3116 3738
3117static void rbd_watch_errcb(void *arg, u64 cookie, int err) 3739static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3740 u64 cookie)
3741{
3742 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3743 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3744}
3745
3746static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3747 u64 notify_id, u64 cookie, s32 result)
3748{
3749 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3750 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3751}
3752
3753static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3754 u64 notifier_id, void *data, size_t data_len)
3118{ 3755{
3119 struct rbd_device *rbd_dev = arg; 3756 struct rbd_device *rbd_dev = arg;
3757 void *p = data;
3758 void *const end = p + data_len;
3759 u8 struct_v;
3760 u32 len;
3761 u32 notify_op;
3120 int ret; 3762 int ret;
3121 3763
3122 rbd_warn(rbd_dev, "encountered watch error: %d", err); 3764 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3765 __func__, rbd_dev, cookie, notify_id, data_len);
3766 if (data_len) {
3767 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3768 &struct_v, &len);
3769 if (ret) {
3770 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3771 ret);
3772 return;
3773 }
3123 3774
3124 __rbd_dev_header_unwatch_sync(rbd_dev); 3775 notify_op = ceph_decode_32(&p);
3776 } else {
3777 /* legacy notification for header updates */
3778 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3779 len = 0;
3780 }
3125 3781
3126 ret = rbd_dev_header_watch_sync(rbd_dev); 3782 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3127 if (ret) { 3783 switch (notify_op) {
3128 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); 3784 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3129 return; 3785 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3786 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3787 break;
3788 case RBD_NOTIFY_OP_RELEASED_LOCK:
3789 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3790 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3791 break;
3792 case RBD_NOTIFY_OP_REQUEST_LOCK:
3793 if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
3794 /*
3795 * send ResponseMessage(0) back so the client
3796 * can detect a missing owner
3797 */
3798 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3799 cookie, 0);
3800 else
3801 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3802 break;
3803 case RBD_NOTIFY_OP_HEADER_UPDATE:
3804 ret = rbd_dev_refresh(rbd_dev);
3805 if (ret)
3806 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3807
3808 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3809 break;
3810 default:
3811 if (rbd_is_lock_owner(rbd_dev))
3812 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3813 cookie, -EOPNOTSUPP);
3814 else
3815 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3816 break;
3130 } 3817 }
3818}
3131 3819
3132 ret = rbd_dev_refresh(rbd_dev); 3820static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3133 if (ret) 3821
3134 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); 3822static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3823{
3824 struct rbd_device *rbd_dev = arg;
3825
3826 rbd_warn(rbd_dev, "encountered watch error: %d", err);
3827
3828 down_write(&rbd_dev->lock_rwsem);
3829 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3830 up_write(&rbd_dev->lock_rwsem);
3831
3832 mutex_lock(&rbd_dev->watch_mutex);
3833 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3834 __rbd_unregister_watch(rbd_dev);
3835 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3836
3837 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3838 }
3839 mutex_unlock(&rbd_dev->watch_mutex);
3135} 3840}
3136 3841
3137/* 3842/*
3138 * Initiate a watch request, synchronously. 3843 * watch_mutex must be locked
3139 */ 3844 */
3140static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) 3845static int __rbd_register_watch(struct rbd_device *rbd_dev)
3141{ 3846{
3142 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3847 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3143 struct ceph_osd_linger_request *handle; 3848 struct ceph_osd_linger_request *handle;
3144 3849
3145 rbd_assert(!rbd_dev->watch_handle); 3850 rbd_assert(!rbd_dev->watch_handle);
3851 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3146 3852
3147 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid, 3853 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3148 &rbd_dev->header_oloc, rbd_watch_cb, 3854 &rbd_dev->header_oloc, rbd_watch_cb,
@@ -3154,13 +3860,16 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
3154 return 0; 3860 return 0;
3155} 3861}
3156 3862
3157static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) 3863/*
3864 * watch_mutex must be locked
3865 */
3866static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3158{ 3867{
3159 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3868 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3160 int ret; 3869 int ret;
3161 3870
3162 if (!rbd_dev->watch_handle) 3871 rbd_assert(rbd_dev->watch_handle);
3163 return; 3872 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3164 3873
3165 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle); 3874 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3166 if (ret) 3875 if (ret)
@@ -3169,17 +3878,100 @@ static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
3169 rbd_dev->watch_handle = NULL; 3878 rbd_dev->watch_handle = NULL;
3170} 3879}
3171 3880
3172/* 3881static int rbd_register_watch(struct rbd_device *rbd_dev)
3173 * Tear down a watch request, synchronously. 3882{
3174 */ 3883 int ret;
3175static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) 3884
3885 mutex_lock(&rbd_dev->watch_mutex);
3886 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3887 ret = __rbd_register_watch(rbd_dev);
3888 if (ret)
3889 goto out;
3890
3891 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3892 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3893
3894out:
3895 mutex_unlock(&rbd_dev->watch_mutex);
3896 return ret;
3897}
3898
3899static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3176{ 3900{
3177 __rbd_dev_header_unwatch_sync(rbd_dev); 3901 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3902
3903 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3904 cancel_work_sync(&rbd_dev->acquired_lock_work);
3905 cancel_work_sync(&rbd_dev->released_lock_work);
3906 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3907 cancel_work_sync(&rbd_dev->unlock_work);
3908}
3909
3910static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3911{
3912 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3913 cancel_tasks_sync(rbd_dev);
3914
3915 mutex_lock(&rbd_dev->watch_mutex);
3916 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3917 __rbd_unregister_watch(rbd_dev);
3918 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3919 mutex_unlock(&rbd_dev->watch_mutex);
3178 3920
3179 dout("%s flushing notifies\n", __func__);
3180 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); 3921 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3181} 3922}
3182 3923
3924static void rbd_reregister_watch(struct work_struct *work)
3925{
3926 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3927 struct rbd_device, watch_dwork);
3928 bool was_lock_owner = false;
3929 int ret;
3930
3931 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3932
3933 down_write(&rbd_dev->lock_rwsem);
3934 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3935 was_lock_owner = rbd_release_lock(rbd_dev);
3936
3937 mutex_lock(&rbd_dev->watch_mutex);
3938 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR)
3939 goto fail_unlock;
3940
3941 ret = __rbd_register_watch(rbd_dev);
3942 if (ret) {
3943 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3944 if (ret != -EBLACKLISTED)
3945 queue_delayed_work(rbd_dev->task_wq,
3946 &rbd_dev->watch_dwork,
3947 RBD_RETRY_DELAY);
3948 goto fail_unlock;
3949 }
3950
3951 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3952 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3953 mutex_unlock(&rbd_dev->watch_mutex);
3954
3955 ret = rbd_dev_refresh(rbd_dev);
3956 if (ret)
3957 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3958
3959 if (was_lock_owner) {
3960 ret = rbd_try_lock(rbd_dev);
3961 if (ret)
3962 rbd_warn(rbd_dev, "reregisteration lock failed: %d",
3963 ret);
3964 }
3965
3966 up_write(&rbd_dev->lock_rwsem);
3967 wake_requests(rbd_dev, true);
3968 return;
3969
3970fail_unlock:
3971 mutex_unlock(&rbd_dev->watch_mutex);
3972 up_write(&rbd_dev->lock_rwsem);
3973}
3974
3183/* 3975/*
3184 * Synchronous osd object method call. Returns the number of bytes 3976 * Synchronous osd object method call. Returns the number of bytes
3185 * returned in the outbound buffer, or a negative error code. 3977 * returned in the outbound buffer, or a negative error code.
@@ -3193,7 +3985,6 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3193 void *inbound, 3985 void *inbound,
3194 size_t inbound_size) 3986 size_t inbound_size)
3195{ 3987{
3196 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3197 struct rbd_obj_request *obj_request; 3988 struct rbd_obj_request *obj_request;
3198 struct page **pages; 3989 struct page **pages;
3199 u32 page_count; 3990 u32 page_count;
@@ -3242,11 +4033,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3242 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0, 4033 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3243 obj_request->pages, inbound_size, 4034 obj_request->pages, inbound_size,
3244 0, false, false); 4035 0, false, false);
3245 rbd_osd_req_format_read(obj_request);
3246 4036
3247 ret = rbd_obj_request_submit(osdc, obj_request); 4037 rbd_obj_request_submit(obj_request);
3248 if (ret)
3249 goto out;
3250 ret = rbd_obj_request_wait(obj_request); 4038 ret = rbd_obj_request_wait(obj_request);
3251 if (ret) 4039 if (ret)
3252 goto out; 4040 goto out;
@@ -3267,6 +4055,29 @@ out:
3267 return ret; 4055 return ret;
3268} 4056}
3269 4057
4058/*
4059 * lock_rwsem must be held for read
4060 */
4061static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
4062{
4063 DEFINE_WAIT(wait);
4064
4065 do {
4066 /*
4067 * Note the use of mod_delayed_work() in rbd_acquire_lock()
4068 * and cancel_delayed_work() in wake_requests().
4069 */
4070 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4071 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4072 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4073 TASK_UNINTERRUPTIBLE);
4074 up_read(&rbd_dev->lock_rwsem);
4075 schedule();
4076 down_read(&rbd_dev->lock_rwsem);
4077 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
4078 finish_wait(&rbd_dev->lock_waitq, &wait);
4079}
4080
3270static void rbd_queue_workfn(struct work_struct *work) 4081static void rbd_queue_workfn(struct work_struct *work)
3271{ 4082{
3272 struct request *rq = blk_mq_rq_from_pdu(work); 4083 struct request *rq = blk_mq_rq_from_pdu(work);
@@ -3277,6 +4088,7 @@ static void rbd_queue_workfn(struct work_struct *work)
3277 u64 length = blk_rq_bytes(rq); 4088 u64 length = blk_rq_bytes(rq);
3278 enum obj_operation_type op_type; 4089 enum obj_operation_type op_type;
3279 u64 mapping_size; 4090 u64 mapping_size;
4091 bool must_be_locked;
3280 int result; 4092 int result;
3281 4093
3282 if (rq->cmd_type != REQ_TYPE_FS) { 4094 if (rq->cmd_type != REQ_TYPE_FS) {
@@ -3338,6 +4150,10 @@ static void rbd_queue_workfn(struct work_struct *work)
3338 if (op_type != OBJ_OP_READ) { 4150 if (op_type != OBJ_OP_READ) {
3339 snapc = rbd_dev->header.snapc; 4151 snapc = rbd_dev->header.snapc;
3340 ceph_get_snap_context(snapc); 4152 ceph_get_snap_context(snapc);
4153 must_be_locked = rbd_is_lock_supported(rbd_dev);
4154 } else {
4155 must_be_locked = rbd_dev->opts->lock_on_read &&
4156 rbd_is_lock_supported(rbd_dev);
3341 } 4157 }
3342 up_read(&rbd_dev->header_rwsem); 4158 up_read(&rbd_dev->header_rwsem);
3343 4159
@@ -3348,11 +4164,17 @@ static void rbd_queue_workfn(struct work_struct *work)
3348 goto err_rq; 4164 goto err_rq;
3349 } 4165 }
3350 4166
4167 if (must_be_locked) {
4168 down_read(&rbd_dev->lock_rwsem);
4169 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4170 rbd_wait_state_locked(rbd_dev);
4171 }
4172
3351 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type, 4173 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
3352 snapc); 4174 snapc);
3353 if (!img_request) { 4175 if (!img_request) {
3354 result = -ENOMEM; 4176 result = -ENOMEM;
3355 goto err_rq; 4177 goto err_unlock;
3356 } 4178 }
3357 img_request->rq = rq; 4179 img_request->rq = rq;
3358 snapc = NULL; /* img_request consumes a ref */ 4180 snapc = NULL; /* img_request consumes a ref */
@@ -3370,10 +4192,15 @@ static void rbd_queue_workfn(struct work_struct *work)
3370 if (result) 4192 if (result)
3371 goto err_img_request; 4193 goto err_img_request;
3372 4194
4195 if (must_be_locked)
4196 up_read(&rbd_dev->lock_rwsem);
3373 return; 4197 return;
3374 4198
3375err_img_request: 4199err_img_request:
3376 rbd_img_request_put(img_request); 4200 rbd_img_request_put(img_request);
4201err_unlock:
4202 if (must_be_locked)
4203 up_read(&rbd_dev->lock_rwsem);
3377err_rq: 4204err_rq:
3378 if (result) 4205 if (result)
3379 rbd_warn(rbd_dev, "%s %llx at %llx result %d", 4206 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
@@ -3415,7 +4242,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3415 u64 offset, u64 length, void *buf) 4242 u64 offset, u64 length, void *buf)
3416 4243
3417{ 4244{
3418 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3419 struct rbd_obj_request *obj_request; 4245 struct rbd_obj_request *obj_request;
3420 struct page **pages = NULL; 4246 struct page **pages = NULL;
3421 u32 page_count; 4247 u32 page_count;
@@ -3448,11 +4274,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3448 obj_request->length, 4274 obj_request->length,
3449 obj_request->offset & ~PAGE_MASK, 4275 obj_request->offset & ~PAGE_MASK,
3450 false, false); 4276 false, false);
3451 rbd_osd_req_format_read(obj_request);
3452 4277
3453 ret = rbd_obj_request_submit(osdc, obj_request); 4278 rbd_obj_request_submit(obj_request);
3454 if (ret)
3455 goto out;
3456 ret = rbd_obj_request_wait(obj_request); 4279 ret = rbd_obj_request_wait(obj_request);
3457 if (ret) 4280 if (ret)
3458 goto out; 4281 goto out;
@@ -3751,13 +4574,40 @@ static ssize_t rbd_minor_show(struct device *dev,
3751 return sprintf(buf, "%d\n", rbd_dev->minor); 4574 return sprintf(buf, "%d\n", rbd_dev->minor);
3752} 4575}
3753 4576
4577static ssize_t rbd_client_addr_show(struct device *dev,
4578 struct device_attribute *attr, char *buf)
4579{
4580 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4581 struct ceph_entity_addr *client_addr =
4582 ceph_client_addr(rbd_dev->rbd_client->client);
4583
4584 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4585 le32_to_cpu(client_addr->nonce));
4586}
4587
3754static ssize_t rbd_client_id_show(struct device *dev, 4588static ssize_t rbd_client_id_show(struct device *dev,
3755 struct device_attribute *attr, char *buf) 4589 struct device_attribute *attr, char *buf)
3756{ 4590{
3757 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4591 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3758 4592
3759 return sprintf(buf, "client%lld\n", 4593 return sprintf(buf, "client%lld\n",
3760 ceph_client_id(rbd_dev->rbd_client->client)); 4594 ceph_client_gid(rbd_dev->rbd_client->client));
4595}
4596
4597static ssize_t rbd_cluster_fsid_show(struct device *dev,
4598 struct device_attribute *attr, char *buf)
4599{
4600 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4601
4602 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4603}
4604
4605static ssize_t rbd_config_info_show(struct device *dev,
4606 struct device_attribute *attr, char *buf)
4607{
4608 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4609
4610 return sprintf(buf, "%s\n", rbd_dev->config_info);
3761} 4611}
3762 4612
3763static ssize_t rbd_pool_show(struct device *dev, 4613static ssize_t rbd_pool_show(struct device *dev,
@@ -3809,6 +4659,14 @@ static ssize_t rbd_snap_show(struct device *dev,
3809 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name); 4659 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3810} 4660}
3811 4661
4662static ssize_t rbd_snap_id_show(struct device *dev,
4663 struct device_attribute *attr, char *buf)
4664{
4665 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4666
4667 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4668}
4669
3812/* 4670/*
3813 * For a v2 image, shows the chain of parent images, separated by empty 4671 * For a v2 image, shows the chain of parent images, separated by empty
3814 * lines. For v1 images or if there is no parent, shows "(no parent 4672 * lines. For v1 images or if there is no parent, shows "(no parent
@@ -3861,13 +4719,17 @@ static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3861static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); 4719static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3862static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); 4720static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3863static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL); 4721static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4722static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
3864static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); 4723static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4724static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4725static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
3865static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); 4726static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3866static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); 4727static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3867static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); 4728static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3868static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); 4729static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3869static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); 4730static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3870static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); 4731static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4732static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
3871static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL); 4733static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3872 4734
3873static struct attribute *rbd_attrs[] = { 4735static struct attribute *rbd_attrs[] = {
@@ -3875,12 +4737,16 @@ static struct attribute *rbd_attrs[] = {
3875 &dev_attr_features.attr, 4737 &dev_attr_features.attr,
3876 &dev_attr_major.attr, 4738 &dev_attr_major.attr,
3877 &dev_attr_minor.attr, 4739 &dev_attr_minor.attr,
4740 &dev_attr_client_addr.attr,
3878 &dev_attr_client_id.attr, 4741 &dev_attr_client_id.attr,
4742 &dev_attr_cluster_fsid.attr,
4743 &dev_attr_config_info.attr,
3879 &dev_attr_pool.attr, 4744 &dev_attr_pool.attr,
3880 &dev_attr_pool_id.attr, 4745 &dev_attr_pool_id.attr,
3881 &dev_attr_name.attr, 4746 &dev_attr_name.attr,
3882 &dev_attr_image_id.attr, 4747 &dev_attr_image_id.attr,
3883 &dev_attr_current_snap.attr, 4748 &dev_attr_current_snap.attr,
4749 &dev_attr_snap_id.attr,
3884 &dev_attr_parent.attr, 4750 &dev_attr_parent.attr,
3885 &dev_attr_refresh.attr, 4751 &dev_attr_refresh.attr,
3886 NULL 4752 NULL
@@ -3943,18 +4809,32 @@ static void rbd_spec_free(struct kref *kref)
3943 kfree(spec); 4809 kfree(spec);
3944} 4810}
3945 4811
3946static void rbd_dev_release(struct device *dev) 4812static void rbd_dev_free(struct rbd_device *rbd_dev)
3947{ 4813{
3948 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4814 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
3949 bool need_put = !!rbd_dev->opts; 4815 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
3950 4816
3951 ceph_oid_destroy(&rbd_dev->header_oid); 4817 ceph_oid_destroy(&rbd_dev->header_oid);
3952 ceph_oloc_destroy(&rbd_dev->header_oloc); 4818 ceph_oloc_destroy(&rbd_dev->header_oloc);
4819 kfree(rbd_dev->config_info);
3953 4820
3954 rbd_put_client(rbd_dev->rbd_client); 4821 rbd_put_client(rbd_dev->rbd_client);
3955 rbd_spec_put(rbd_dev->spec); 4822 rbd_spec_put(rbd_dev->spec);
3956 kfree(rbd_dev->opts); 4823 kfree(rbd_dev->opts);
3957 kfree(rbd_dev); 4824 kfree(rbd_dev);
4825}
4826
4827static void rbd_dev_release(struct device *dev)
4828{
4829 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4830 bool need_put = !!rbd_dev->opts;
4831
4832 if (need_put) {
4833 destroy_workqueue(rbd_dev->task_wq);
4834 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4835 }
4836
4837 rbd_dev_free(rbd_dev);
3958 4838
3959 /* 4839 /*
3960 * This is racy, but way better than putting module outside of 4840 * This is racy, but way better than putting module outside of
@@ -3965,25 +4845,34 @@ static void rbd_dev_release(struct device *dev)
3965 module_put(THIS_MODULE); 4845 module_put(THIS_MODULE);
3966} 4846}
3967 4847
3968static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, 4848static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
3969 struct rbd_spec *spec, 4849 struct rbd_spec *spec)
3970 struct rbd_options *opts)
3971{ 4850{
3972 struct rbd_device *rbd_dev; 4851 struct rbd_device *rbd_dev;
3973 4852
3974 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL); 4853 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3975 if (!rbd_dev) 4854 if (!rbd_dev)
3976 return NULL; 4855 return NULL;
3977 4856
3978 spin_lock_init(&rbd_dev->lock); 4857 spin_lock_init(&rbd_dev->lock);
3979 rbd_dev->flags = 0;
3980 atomic_set(&rbd_dev->parent_ref, 0);
3981 INIT_LIST_HEAD(&rbd_dev->node); 4858 INIT_LIST_HEAD(&rbd_dev->node);
3982 init_rwsem(&rbd_dev->header_rwsem); 4859 init_rwsem(&rbd_dev->header_rwsem);
3983 4860
3984 ceph_oid_init(&rbd_dev->header_oid); 4861 ceph_oid_init(&rbd_dev->header_oid);
3985 ceph_oloc_init(&rbd_dev->header_oloc); 4862 ceph_oloc_init(&rbd_dev->header_oloc);
3986 4863
4864 mutex_init(&rbd_dev->watch_mutex);
4865 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4866 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4867
4868 init_rwsem(&rbd_dev->lock_rwsem);
4869 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4870 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4871 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4872 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4873 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4874 init_waitqueue_head(&rbd_dev->lock_waitq);
4875
3987 rbd_dev->dev.bus = &rbd_bus_type; 4876 rbd_dev->dev.bus = &rbd_bus_type;
3988 rbd_dev->dev.type = &rbd_device_type; 4877 rbd_dev->dev.type = &rbd_device_type;
3989 rbd_dev->dev.parent = &rbd_root_dev; 4878 rbd_dev->dev.parent = &rbd_root_dev;
@@ -3991,9 +4880,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3991 4880
3992 rbd_dev->rbd_client = rbdc; 4881 rbd_dev->rbd_client = rbdc;
3993 rbd_dev->spec = spec; 4882 rbd_dev->spec = spec;
3994 rbd_dev->opts = opts;
3995
3996 /* Initialize the layout used for all rbd requests */
3997 4883
3998 rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER; 4884 rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
3999 rbd_dev->layout.stripe_count = 1; 4885 rbd_dev->layout.stripe_count = 1;
@@ -4001,15 +4887,48 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4001 rbd_dev->layout.pool_id = spec->pool_id; 4887 rbd_dev->layout.pool_id = spec->pool_id;
4002 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); 4888 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
4003 4889
4004 /* 4890 return rbd_dev;
4005 * If this is a mapping rbd_dev (as opposed to a parent one), 4891}
4006 * pin our module. We have a ref from do_rbd_add(), so use
4007 * __module_get().
4008 */
4009 if (rbd_dev->opts)
4010 __module_get(THIS_MODULE);
4011 4892
4893/*
4894 * Create a mapping rbd_dev.
4895 */
4896static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4897 struct rbd_spec *spec,
4898 struct rbd_options *opts)
4899{
4900 struct rbd_device *rbd_dev;
4901
4902 rbd_dev = __rbd_dev_create(rbdc, spec);
4903 if (!rbd_dev)
4904 return NULL;
4905
4906 rbd_dev->opts = opts;
4907
4908 /* get an id and fill in device name */
4909 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4910 minor_to_rbd_dev_id(1 << MINORBITS),
4911 GFP_KERNEL);
4912 if (rbd_dev->dev_id < 0)
4913 goto fail_rbd_dev;
4914
4915 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4916 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4917 rbd_dev->name);
4918 if (!rbd_dev->task_wq)
4919 goto fail_dev_id;
4920
4921 /* we have a ref from do_rbd_add() */
4922 __module_get(THIS_MODULE);
4923
4924 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4012 return rbd_dev; 4925 return rbd_dev;
4926
4927fail_dev_id:
4928 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4929fail_rbd_dev:
4930 rbd_dev_free(rbd_dev);
4931 return NULL;
4013} 4932}
4014 4933
4015static void rbd_dev_destroy(struct rbd_device *rbd_dev) 4934static void rbd_dev_destroy(struct rbd_device *rbd_dev)
@@ -4645,46 +5564,6 @@ static int rbd_dev_header_info(struct rbd_device *rbd_dev)
4645} 5564}
4646 5565
4647/* 5566/*
4648 * Get a unique rbd identifier for the given new rbd_dev, and add
4649 * the rbd_dev to the global list.
4650 */
4651static int rbd_dev_id_get(struct rbd_device *rbd_dev)
4652{
4653 int new_dev_id;
4654
4655 new_dev_id = ida_simple_get(&rbd_dev_id_ida,
4656 0, minor_to_rbd_dev_id(1 << MINORBITS),
4657 GFP_KERNEL);
4658 if (new_dev_id < 0)
4659 return new_dev_id;
4660
4661 rbd_dev->dev_id = new_dev_id;
4662
4663 spin_lock(&rbd_dev_list_lock);
4664 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4665 spin_unlock(&rbd_dev_list_lock);
4666
4667 dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
4668
4669 return 0;
4670}
4671
4672/*
4673 * Remove an rbd_dev from the global list, and record that its
4674 * identifier is no longer in use.
4675 */
4676static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4677{
4678 spin_lock(&rbd_dev_list_lock);
4679 list_del_init(&rbd_dev->node);
4680 spin_unlock(&rbd_dev_list_lock);
4681
4682 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4683
4684 dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
4685}
4686
4687/*
4688 * Skips over white space at *buf, and updates *buf to point to the 5567 * Skips over white space at *buf, and updates *buf to point to the
4689 * first found non-space character (if any). Returns the length of 5568 * first found non-space character (if any). Returns the length of
4690 * the token (string of non-white space characters) found. Note 5569 * the token (string of non-white space characters) found. Note
@@ -4859,6 +5738,7 @@ static int rbd_add_parse_args(const char *buf,
4859 5738
4860 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; 5739 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4861 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT; 5740 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5741 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
4862 5742
4863 copts = ceph_parse_options(options, mon_addrs, 5743 copts = ceph_parse_options(options, mon_addrs,
4864 mon_addrs + mon_addrs_size - 1, 5744 mon_addrs + mon_addrs_size - 1,
@@ -5076,8 +5956,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5076 goto out_err; 5956 goto out_err;
5077 } 5957 }
5078 5958
5079 parent = rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec, 5959 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5080 NULL);
5081 if (!parent) { 5960 if (!parent) {
5082 ret = -ENOMEM; 5961 ret = -ENOMEM;
5083 goto out_err; 5962 goto out_err;
@@ -5112,22 +5991,12 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5112{ 5991{
5113 int ret; 5992 int ret;
5114 5993
5115 /* Get an id and fill in device name. */
5116
5117 ret = rbd_dev_id_get(rbd_dev);
5118 if (ret)
5119 goto err_out_unlock;
5120
5121 BUILD_BUG_ON(DEV_NAME_LEN
5122 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
5123 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
5124
5125 /* Record our major and minor device numbers. */ 5994 /* Record our major and minor device numbers. */
5126 5995
5127 if (!single_major) { 5996 if (!single_major) {
5128 ret = register_blkdev(0, rbd_dev->name); 5997 ret = register_blkdev(0, rbd_dev->name);
5129 if (ret < 0) 5998 if (ret < 0)
5130 goto err_out_id; 5999 goto err_out_unlock;
5131 6000
5132 rbd_dev->major = ret; 6001 rbd_dev->major = ret;
5133 rbd_dev->minor = 0; 6002 rbd_dev->minor = 0;
@@ -5159,9 +6028,14 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5159 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 6028 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5160 up_write(&rbd_dev->header_rwsem); 6029 up_write(&rbd_dev->header_rwsem);
5161 6030
6031 spin_lock(&rbd_dev_list_lock);
6032 list_add_tail(&rbd_dev->node, &rbd_dev_list);
6033 spin_unlock(&rbd_dev_list_lock);
6034
5162 add_disk(rbd_dev->disk); 6035 add_disk(rbd_dev->disk);
5163 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, 6036 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
5164 (unsigned long long) rbd_dev->mapping.size); 6037 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6038 rbd_dev->header.features);
5165 6039
5166 return ret; 6040 return ret;
5167 6041
@@ -5172,8 +6046,6 @@ err_out_disk:
5172err_out_blkdev: 6046err_out_blkdev:
5173 if (!single_major) 6047 if (!single_major)
5174 unregister_blkdev(rbd_dev->major, rbd_dev->name); 6048 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5175err_out_id:
5176 rbd_dev_id_put(rbd_dev);
5177err_out_unlock: 6049err_out_unlock:
5178 up_write(&rbd_dev->header_rwsem); 6050 up_write(&rbd_dev->header_rwsem);
5179 return ret; 6051 return ret;
@@ -5234,7 +6106,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
5234 goto err_out_format; 6106 goto err_out_format;
5235 6107
5236 if (!depth) { 6108 if (!depth) {
5237 ret = rbd_dev_header_watch_sync(rbd_dev); 6109 ret = rbd_register_watch(rbd_dev);
5238 if (ret) { 6110 if (ret) {
5239 if (ret == -ENOENT) 6111 if (ret == -ENOENT)
5240 pr_info("image %s/%s does not exist\n", 6112 pr_info("image %s/%s does not exist\n",
@@ -5293,7 +6165,7 @@ err_out_probe:
5293 rbd_dev_unprobe(rbd_dev); 6165 rbd_dev_unprobe(rbd_dev);
5294err_out_watch: 6166err_out_watch:
5295 if (!depth) 6167 if (!depth)
5296 rbd_dev_header_unwatch_sync(rbd_dev); 6168 rbd_unregister_watch(rbd_dev);
5297err_out_format: 6169err_out_format:
5298 rbd_dev->image_format = 0; 6170 rbd_dev->image_format = 0;
5299 kfree(rbd_dev->spec->image_id); 6171 kfree(rbd_dev->spec->image_id);
@@ -5345,10 +6217,18 @@ static ssize_t do_rbd_add(struct bus_type *bus,
5345 spec = NULL; /* rbd_dev now owns this */ 6217 spec = NULL; /* rbd_dev now owns this */
5346 rbd_opts = NULL; /* rbd_dev now owns this */ 6218 rbd_opts = NULL; /* rbd_dev now owns this */
5347 6219
6220 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6221 if (!rbd_dev->config_info) {
6222 rc = -ENOMEM;
6223 goto err_out_rbd_dev;
6224 }
6225
5348 down_write(&rbd_dev->header_rwsem); 6226 down_write(&rbd_dev->header_rwsem);
5349 rc = rbd_dev_image_probe(rbd_dev, 0); 6227 rc = rbd_dev_image_probe(rbd_dev, 0);
5350 if (rc < 0) 6228 if (rc < 0) {
6229 up_write(&rbd_dev->header_rwsem);
5351 goto err_out_rbd_dev; 6230 goto err_out_rbd_dev;
6231 }
5352 6232
5353 /* If we are mapping a snapshot it must be marked read-only */ 6233 /* If we are mapping a snapshot it must be marked read-only */
5354 6234
@@ -5360,11 +6240,11 @@ static ssize_t do_rbd_add(struct bus_type *bus,
5360 rc = rbd_dev_device_setup(rbd_dev); 6240 rc = rbd_dev_device_setup(rbd_dev);
5361 if (rc) { 6241 if (rc) {
5362 /* 6242 /*
5363 * rbd_dev_header_unwatch_sync() can't be moved into 6243 * rbd_unregister_watch() can't be moved into
5364 * rbd_dev_image_release() without refactoring, see 6244 * rbd_dev_image_release() without refactoring, see
5365 * commit 1f3ef78861ac. 6245 * commit 1f3ef78861ac.
5366 */ 6246 */
5367 rbd_dev_header_unwatch_sync(rbd_dev); 6247 rbd_unregister_watch(rbd_dev);
5368 rbd_dev_image_release(rbd_dev); 6248 rbd_dev_image_release(rbd_dev);
5369 goto out; 6249 goto out;
5370 } 6250 }
@@ -5375,7 +6255,6 @@ out:
5375 return rc; 6255 return rc;
5376 6256
5377err_out_rbd_dev: 6257err_out_rbd_dev:
5378 up_write(&rbd_dev->header_rwsem);
5379 rbd_dev_destroy(rbd_dev); 6258 rbd_dev_destroy(rbd_dev);
5380err_out_client: 6259err_out_client:
5381 rbd_put_client(rbdc); 6260 rbd_put_client(rbdc);
@@ -5405,12 +6284,16 @@ static ssize_t rbd_add_single_major(struct bus_type *bus,
5405static void rbd_dev_device_release(struct rbd_device *rbd_dev) 6284static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5406{ 6285{
5407 rbd_free_disk(rbd_dev); 6286 rbd_free_disk(rbd_dev);
6287
6288 spin_lock(&rbd_dev_list_lock);
6289 list_del_init(&rbd_dev->node);
6290 spin_unlock(&rbd_dev_list_lock);
6291
5408 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 6292 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5409 device_del(&rbd_dev->dev); 6293 device_del(&rbd_dev->dev);
5410 rbd_dev_mapping_clear(rbd_dev); 6294 rbd_dev_mapping_clear(rbd_dev);
5411 if (!single_major) 6295 if (!single_major)
5412 unregister_blkdev(rbd_dev->major, rbd_dev->name); 6296 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5413 rbd_dev_id_put(rbd_dev);
5414} 6297}
5415 6298
5416static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) 6299static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
@@ -5446,18 +6329,26 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
5446 struct rbd_device *rbd_dev = NULL; 6329 struct rbd_device *rbd_dev = NULL;
5447 struct list_head *tmp; 6330 struct list_head *tmp;
5448 int dev_id; 6331 int dev_id;
5449 unsigned long ul; 6332 char opt_buf[6];
5450 bool already = false; 6333 bool already = false;
6334 bool force = false;
5451 int ret; 6335 int ret;
5452 6336
5453 ret = kstrtoul(buf, 10, &ul); 6337 dev_id = -1;
5454 if (ret) 6338 opt_buf[0] = '\0';
5455 return ret; 6339 sscanf(buf, "%d %5s", &dev_id, opt_buf);
5456 6340 if (dev_id < 0) {
5457 /* convert to int; abort if we lost anything in the conversion */ 6341 pr_err("dev_id out of range\n");
5458 dev_id = (int)ul;
5459 if (dev_id != ul)
5460 return -EINVAL; 6342 return -EINVAL;
6343 }
6344 if (opt_buf[0] != '\0') {
6345 if (!strcmp(opt_buf, "force")) {
6346 force = true;
6347 } else {
6348 pr_err("bad remove option at '%s'\n", opt_buf);
6349 return -EINVAL;
6350 }
6351 }
5461 6352
5462 ret = -ENOENT; 6353 ret = -ENOENT;
5463 spin_lock(&rbd_dev_list_lock); 6354 spin_lock(&rbd_dev_list_lock);
@@ -5470,7 +6361,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
5470 } 6361 }
5471 if (!ret) { 6362 if (!ret) {
5472 spin_lock_irq(&rbd_dev->lock); 6363 spin_lock_irq(&rbd_dev->lock);
5473 if (rbd_dev->open_count) 6364 if (rbd_dev->open_count && !force)
5474 ret = -EBUSY; 6365 ret = -EBUSY;
5475 else 6366 else
5476 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING, 6367 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
@@ -5481,7 +6372,20 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
5481 if (ret < 0 || already) 6372 if (ret < 0 || already)
5482 return ret; 6373 return ret;
5483 6374
5484 rbd_dev_header_unwatch_sync(rbd_dev); 6375 if (force) {
6376 /*
6377 * Prevent new IO from being queued and wait for existing
6378 * IO to complete/fail.
6379 */
6380 blk_mq_freeze_queue(rbd_dev->disk->queue);
6381 blk_set_queue_dying(rbd_dev->disk->queue);
6382 }
6383
6384 down_write(&rbd_dev->lock_rwsem);
6385 if (__rbd_is_lock_owner(rbd_dev))
6386 rbd_unlock(rbd_dev);
6387 up_write(&rbd_dev->lock_rwsem);
6388 rbd_unregister_watch(rbd_dev);
5485 6389
5486 /* 6390 /*
5487 * Don't free anything from rbd_dev->disk until after all 6391 * Don't free anything from rbd_dev->disk until after all
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index 49d77cbcf8bd..94f367db27b0 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -28,6 +28,17 @@
28#define RBD_DATA_PREFIX "rbd_data." 28#define RBD_DATA_PREFIX "rbd_data."
29#define RBD_ID_PREFIX "rbd_id." 29#define RBD_ID_PREFIX "rbd_id."
30 30
31#define RBD_LOCK_NAME "rbd_lock"
32#define RBD_LOCK_TAG "internal"
33#define RBD_LOCK_COOKIE_PREFIX "auto"
34
35enum rbd_notify_op {
36 RBD_NOTIFY_OP_ACQUIRED_LOCK = 0,
37 RBD_NOTIFY_OP_RELEASED_LOCK = 1,
38 RBD_NOTIFY_OP_REQUEST_LOCK = 2,
39 RBD_NOTIFY_OP_HEADER_UPDATE = 3,
40};
41
31/* 42/*
32 * For format version 1, rbd image 'foo' consists of objects 43 * For format version 1, rbd image 'foo' consists of objects
33 * foo.rbd - image metadata 44 * foo.rbd - image metadata
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index d5b6f959a3c3..ef3ebd780aff 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -175,9 +175,8 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
175 175
176static int ceph_releasepage(struct page *page, gfp_t g) 176static int ceph_releasepage(struct page *page, gfp_t g)
177{ 177{
178 dout("%p releasepage %p idx %lu\n", page->mapping->host, 178 dout("%p releasepage %p idx %lu (%sdirty)\n", page->mapping->host,
179 page, page->index); 179 page, page->index, PageDirty(page) ? "" : "not ");
180 WARN_ON(PageDirty(page));
181 180
182 /* Can we release the page from the cache? */ 181 /* Can we release the page from the cache? */
183 if (!ceph_release_fscache_page(page, g)) 182 if (!ceph_release_fscache_page(page, g))
@@ -298,14 +297,6 @@ unlock:
298 kfree(osd_data->pages); 297 kfree(osd_data->pages);
299} 298}
300 299
301static void ceph_unlock_page_vector(struct page **pages, int num_pages)
302{
303 int i;
304
305 for (i = 0; i < num_pages; i++)
306 unlock_page(pages[i]);
307}
308
309/* 300/*
310 * start an async read(ahead) operation. return nr_pages we submitted 301 * start an async read(ahead) operation. return nr_pages we submitted
311 * a read for on success, or negative error code. 302 * a read for on success, or negative error code.
@@ -370,6 +361,10 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
370 dout("start_read %p add_to_page_cache failed %p\n", 361 dout("start_read %p add_to_page_cache failed %p\n",
371 inode, page); 362 inode, page);
372 nr_pages = i; 363 nr_pages = i;
364 if (nr_pages > 0) {
365 len = nr_pages << PAGE_SHIFT;
366 break;
367 }
373 goto out_pages; 368 goto out_pages;
374 } 369 }
375 pages[i] = page; 370 pages[i] = page;
@@ -386,8 +381,11 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
386 return nr_pages; 381 return nr_pages;
387 382
388out_pages: 383out_pages:
389 ceph_unlock_page_vector(pages, nr_pages); 384 for (i = 0; i < nr_pages; ++i) {
390 ceph_release_page_vector(pages, nr_pages); 385 ceph_fscache_readpage_cancel(inode, pages[i]);
386 unlock_page(pages[i]);
387 }
388 ceph_put_page_vector(pages, nr_pages, false);
391out: 389out:
392 ceph_osdc_put_request(req); 390 ceph_osdc_put_request(req);
393 return ret; 391 return ret;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 0f5375d8e030..395c7fcb1cea 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -902,10 +902,10 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
902 return ret; 902 return ret;
903 903
904 if (write) { 904 if (write) {
905 ret = invalidate_inode_pages2_range(inode->i_mapping, 905 int ret2 = invalidate_inode_pages2_range(inode->i_mapping,
906 pos >> PAGE_SHIFT, 906 pos >> PAGE_SHIFT,
907 (pos + count) >> PAGE_SHIFT); 907 (pos + count) >> PAGE_SHIFT);
908 if (ret < 0) 908 if (ret2 < 0)
909 dout("invalidate_inode_pages2_range returned %d\n", ret); 909 dout("invalidate_inode_pages2_range returned %d\n", ret);
910 910
911 flags = CEPH_OSD_FLAG_ORDERSNAP | 911 flags = CEPH_OSD_FLAG_ORDERSNAP |
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index a2cb0c254060..6806dbeaee19 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -210,8 +210,8 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
210 if (!(fl->fl_flags & FL_FLOCK)) 210 if (!(fl->fl_flags & FL_FLOCK))
211 return -ENOLCK; 211 return -ENOLCK;
212 /* No mandatory locks */ 212 /* No mandatory locks */
213 if (__mandatory_lock(file->f_mapping->host) && fl->fl_type != F_UNLCK) 213 if (fl->fl_type & LOCK_MAND)
214 return -ENOLCK; 214 return -EOPNOTSUPP;
215 215
216 dout("ceph_flock, fl_file: %p", fl->fl_file); 216 dout("ceph_flock, fl_file: %p", fl->fl_file);
217 217
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index f72d4ae303b2..815acd1a56d4 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -370,6 +370,7 @@ const char *ceph_session_state_name(int s)
370 case CEPH_MDS_SESSION_CLOSING: return "closing"; 370 case CEPH_MDS_SESSION_CLOSING: return "closing";
371 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 371 case CEPH_MDS_SESSION_RESTARTING: return "restarting";
372 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 372 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
373 case CEPH_MDS_SESSION_REJECTED: return "rejected";
373 default: return "???"; 374 default: return "???";
374 } 375 }
375} 376}
@@ -1150,8 +1151,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1150 while (!list_empty(&ci->i_cap_flush_list)) { 1151 while (!list_empty(&ci->i_cap_flush_list)) {
1151 cf = list_first_entry(&ci->i_cap_flush_list, 1152 cf = list_first_entry(&ci->i_cap_flush_list,
1152 struct ceph_cap_flush, i_list); 1153 struct ceph_cap_flush, i_list);
1153 list_del(&cf->i_list); 1154 list_move(&cf->i_list, &to_remove);
1154 list_add(&cf->i_list, &to_remove);
1155 } 1155 }
1156 1156
1157 spin_lock(&mdsc->cap_dirty_lock); 1157 spin_lock(&mdsc->cap_dirty_lock);
@@ -1378,7 +1378,7 @@ static int request_close_session(struct ceph_mds_client *mdsc,
1378 if (!msg) 1378 if (!msg)
1379 return -ENOMEM; 1379 return -ENOMEM;
1380 ceph_con_send(&session->s_con, msg); 1380 ceph_con_send(&session->s_con, msg);
1381 return 0; 1381 return 1;
1382} 1382}
1383 1383
1384/* 1384/*
@@ -2131,6 +2131,10 @@ static int __do_request(struct ceph_mds_client *mdsc,
2131 ceph_session_state_name(session->s_state)); 2131 ceph_session_state_name(session->s_state));
2132 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2132 if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2133 session->s_state != CEPH_MDS_SESSION_HUNG) { 2133 session->s_state != CEPH_MDS_SESSION_HUNG) {
2134 if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2135 err = -EACCES;
2136 goto out_session;
2137 }
2134 if (session->s_state == CEPH_MDS_SESSION_NEW || 2138 if (session->s_state == CEPH_MDS_SESSION_NEW ||
2135 session->s_state == CEPH_MDS_SESSION_CLOSING) 2139 session->s_state == CEPH_MDS_SESSION_CLOSING)
2136 __open_session(mdsc, session); 2140 __open_session(mdsc, session);
@@ -2652,6 +2656,15 @@ static void handle_session(struct ceph_mds_session *session,
2652 wake_up_session_caps(session, 0); 2656 wake_up_session_caps(session, 0);
2653 break; 2657 break;
2654 2658
2659 case CEPH_SESSION_REJECT:
2660 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
2661 pr_info("mds%d rejected session\n", session->s_mds);
2662 session->s_state = CEPH_MDS_SESSION_REJECTED;
2663 cleanup_session_requests(mdsc, session);
2664 remove_session_caps(session);
2665 wake = 2; /* for good measure */
2666 break;
2667
2655 default: 2668 default:
2656 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 2669 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2657 WARN_ON(1); 2670 WARN_ON(1);
@@ -3557,11 +3570,11 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3557/* 3570/*
3558 * true if all sessions are closed, or we force unmount 3571 * true if all sessions are closed, or we force unmount
3559 */ 3572 */
3560static bool done_closing_sessions(struct ceph_mds_client *mdsc) 3573static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
3561{ 3574{
3562 if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 3575 if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3563 return true; 3576 return true;
3564 return atomic_read(&mdsc->num_sessions) == 0; 3577 return atomic_read(&mdsc->num_sessions) <= skipped;
3565} 3578}
3566 3579
3567/* 3580/*
@@ -3572,6 +3585,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3572 struct ceph_options *opts = mdsc->fsc->client->options; 3585 struct ceph_options *opts = mdsc->fsc->client->options;
3573 struct ceph_mds_session *session; 3586 struct ceph_mds_session *session;
3574 int i; 3587 int i;
3588 int skipped = 0;
3575 3589
3576 dout("close_sessions\n"); 3590 dout("close_sessions\n");
3577 3591
@@ -3583,7 +3597,8 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3583 continue; 3597 continue;
3584 mutex_unlock(&mdsc->mutex); 3598 mutex_unlock(&mdsc->mutex);
3585 mutex_lock(&session->s_mutex); 3599 mutex_lock(&session->s_mutex);
3586 __close_session(mdsc, session); 3600 if (__close_session(mdsc, session) <= 0)
3601 skipped++;
3587 mutex_unlock(&session->s_mutex); 3602 mutex_unlock(&session->s_mutex);
3588 ceph_put_mds_session(session); 3603 ceph_put_mds_session(session);
3589 mutex_lock(&mdsc->mutex); 3604 mutex_lock(&mdsc->mutex);
@@ -3591,7 +3606,8 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3591 mutex_unlock(&mdsc->mutex); 3606 mutex_unlock(&mdsc->mutex);
3592 3607
3593 dout("waiting for sessions to close\n"); 3608 dout("waiting for sessions to close\n");
3594 wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc), 3609 wait_event_timeout(mdsc->session_close_wq,
3610 done_closing_sessions(mdsc, skipped),
3595 ceph_timeout_jiffies(opts->mount_timeout)); 3611 ceph_timeout_jiffies(opts->mount_timeout));
3596 3612
3597 /* tear down remaining sessions */ 3613 /* tear down remaining sessions */
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 6b3679737d4a..3c6f77b7bb02 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -121,6 +121,7 @@ enum {
121 CEPH_MDS_SESSION_CLOSING = 5, 121 CEPH_MDS_SESSION_CLOSING = 5,
122 CEPH_MDS_SESSION_RESTARTING = 6, 122 CEPH_MDS_SESSION_RESTARTING = 6,
123 CEPH_MDS_SESSION_RECONNECTING = 7, 123 CEPH_MDS_SESSION_RECONNECTING = 7,
124 CEPH_MDS_SESSION_REJECTED = 8,
124}; 125};
125 126
126struct ceph_mds_session { 127struct ceph_mds_session {
diff --git a/fs/ceph/strings.c b/fs/ceph/strings.c
index 89e6bc321df3..913dea163d5c 100644
--- a/fs/ceph/strings.c
+++ b/fs/ceph/strings.c
@@ -43,6 +43,8 @@ const char *ceph_session_op_name(int op)
43 case CEPH_SESSION_RECALL_STATE: return "recall_state"; 43 case CEPH_SESSION_RECALL_STATE: return "recall_state";
44 case CEPH_SESSION_FLUSHMSG: return "flushmsg"; 44 case CEPH_SESSION_FLUSHMSG: return "flushmsg";
45 case CEPH_SESSION_FLUSHMSG_ACK: return "flushmsg_ack"; 45 case CEPH_SESSION_FLUSHMSG_ACK: return "flushmsg_ack";
46 case CEPH_SESSION_FORCE_RO: return "force_ro";
47 case CEPH_SESSION_REJECT: return "reject";
46 } 48 }
47 return "???"; 49 return "???";
48} 50}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index e247f6f0feb7..a29ffce98187 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -396,10 +396,12 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
396 */ 396 */
397 dev_name_end = strchr(dev_name, '/'); 397 dev_name_end = strchr(dev_name, '/');
398 if (dev_name_end) { 398 if (dev_name_end) {
399 fsopt->server_path = kstrdup(dev_name_end, GFP_KERNEL); 399 if (strlen(dev_name_end) > 1) {
400 if (!fsopt->server_path) { 400 fsopt->server_path = kstrdup(dev_name_end, GFP_KERNEL);
401 err = -ENOMEM; 401 if (!fsopt->server_path) {
402 goto out; 402 err = -ENOMEM;
403 goto out;
404 }
403 } 405 }
404 } else { 406 } else {
405 dev_name_end = dev_name + strlen(dev_name); 407 dev_name_end = dev_name + strlen(dev_name);
@@ -788,15 +790,10 @@ static struct dentry *open_root_dentry(struct ceph_fs_client *fsc,
788 struct inode *inode = req->r_target_inode; 790 struct inode *inode = req->r_target_inode;
789 req->r_target_inode = NULL; 791 req->r_target_inode = NULL;
790 dout("open_root_inode success\n"); 792 dout("open_root_inode success\n");
791 if (ceph_ino(inode) == CEPH_INO_ROOT && 793 root = d_make_root(inode);
792 fsc->sb->s_root == NULL) { 794 if (!root) {
793 root = d_make_root(inode); 795 root = ERR_PTR(-ENOMEM);
794 if (!root) { 796 goto out;
795 root = ERR_PTR(-ENOMEM);
796 goto out;
797 }
798 } else {
799 root = d_obtain_root(inode);
800 } 797 }
801 ceph_init_dentry(root); 798 ceph_init_dentry(root);
802 dout("open_root_inode success, root dentry is %p\n", root); 799 dout("open_root_inode success, root dentry is %p\n", root);
@@ -825,17 +822,24 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc)
825 mutex_lock(&fsc->client->mount_mutex); 822 mutex_lock(&fsc->client->mount_mutex);
826 823
827 if (!fsc->sb->s_root) { 824 if (!fsc->sb->s_root) {
825 const char *path;
828 err = __ceph_open_session(fsc->client, started); 826 err = __ceph_open_session(fsc->client, started);
829 if (err < 0) 827 if (err < 0)
830 goto out; 828 goto out;
831 829
832 dout("mount opening root\n"); 830 if (!fsc->mount_options->server_path) {
833 root = open_root_dentry(fsc, "", started); 831 path = "";
832 dout("mount opening path \\t\n");
833 } else {
834 path = fsc->mount_options->server_path + 1;
835 dout("mount opening path %s\n", path);
836 }
837 root = open_root_dentry(fsc, path, started);
834 if (IS_ERR(root)) { 838 if (IS_ERR(root)) {
835 err = PTR_ERR(root); 839 err = PTR_ERR(root);
836 goto out; 840 goto out;
837 } 841 }
838 fsc->sb->s_root = root; 842 fsc->sb->s_root = dget(root);
839 first = 1; 843 first = 1;
840 844
841 err = ceph_fs_debugfs_init(fsc); 845 err = ceph_fs_debugfs_init(fsc);
@@ -843,19 +847,6 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc)
843 goto fail; 847 goto fail;
844 } 848 }
845 849
846 if (!fsc->mount_options->server_path) {
847 root = fsc->sb->s_root;
848 dget(root);
849 } else {
850 const char *path = fsc->mount_options->server_path + 1;
851 dout("mount opening path %s\n", path);
852 root = open_root_dentry(fsc, path, started);
853 if (IS_ERR(root)) {
854 err = PTR_ERR(root);
855 goto fail;
856 }
857 }
858
859 fsc->mount_state = CEPH_MOUNT_MOUNTED; 850 fsc->mount_state = CEPH_MOUNT_MOUNTED;
860 dout("mount success\n"); 851 dout("mount success\n");
861 mutex_unlock(&fsc->client->mount_mutex); 852 mutex_unlock(&fsc->client->mount_mutex);
diff --git a/include/linux/ceph/auth.h b/include/linux/ceph/auth.h
index 1563265d2097..374bb1c4ef52 100644
--- a/include/linux/ceph/auth.h
+++ b/include/linux/ceph/auth.h
@@ -104,7 +104,7 @@ extern int ceph_auth_build_hello(struct ceph_auth_client *ac,
104extern int ceph_handle_auth_reply(struct ceph_auth_client *ac, 104extern int ceph_handle_auth_reply(struct ceph_auth_client *ac,
105 void *buf, size_t len, 105 void *buf, size_t len,
106 void *reply_buf, size_t reply_len); 106 void *reply_buf, size_t reply_len);
107extern int ceph_entity_name_encode(const char *name, void **p, void *end); 107int ceph_auth_entity_name_encode(const char *name, void **p, void *end);
108 108
109extern int ceph_build_auth(struct ceph_auth_client *ac, 109extern int ceph_build_auth(struct ceph_auth_client *ac,
110 void *msg_buf, size_t msg_len); 110 void *msg_buf, size_t msg_len);
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index 7868d602c0a0..f96de8de4fa7 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -138,6 +138,9 @@ struct ceph_dir_layout {
138#define CEPH_MSG_POOLOP_REPLY 48 138#define CEPH_MSG_POOLOP_REPLY 48
139#define CEPH_MSG_POOLOP 49 139#define CEPH_MSG_POOLOP 49
140 140
141/* mon commands */
142#define CEPH_MSG_MON_COMMAND 50
143#define CEPH_MSG_MON_COMMAND_ACK 51
141 144
142/* osd */ 145/* osd */
143#define CEPH_MSG_OSD_MAP 41 146#define CEPH_MSG_OSD_MAP 41
@@ -176,6 +179,14 @@ struct ceph_mon_statfs_reply {
176 struct ceph_statfs st; 179 struct ceph_statfs st;
177} __attribute__ ((packed)); 180} __attribute__ ((packed));
178 181
182struct ceph_mon_command {
183 struct ceph_mon_request_header monhdr;
184 struct ceph_fsid fsid;
185 __le32 num_strs; /* always 1 */
186 __le32 str_len;
187 char str[];
188} __attribute__ ((packed));
189
179struct ceph_osd_getmap { 190struct ceph_osd_getmap {
180 struct ceph_mon_request_header monhdr; 191 struct ceph_mon_request_header monhdr;
181 struct ceph_fsid fsid; 192 struct ceph_fsid fsid;
@@ -270,6 +281,7 @@ enum {
270 CEPH_SESSION_FLUSHMSG, 281 CEPH_SESSION_FLUSHMSG,
271 CEPH_SESSION_FLUSHMSG_ACK, 282 CEPH_SESSION_FLUSHMSG_ACK,
272 CEPH_SESSION_FORCE_RO, 283 CEPH_SESSION_FORCE_RO,
284 CEPH_SESSION_REJECT,
273}; 285};
274 286
275extern const char *ceph_session_op_name(int op); 287extern const char *ceph_session_op_name(int op);
diff --git a/include/linux/ceph/cls_lock_client.h b/include/linux/ceph/cls_lock_client.h
new file mode 100644
index 000000000000..84884d8d4710
--- /dev/null
+++ b/include/linux/ceph/cls_lock_client.h
@@ -0,0 +1,49 @@
1#ifndef _LINUX_CEPH_CLS_LOCK_CLIENT_H
2#define _LINUX_CEPH_CLS_LOCK_CLIENT_H
3
4#include <linux/ceph/osd_client.h>
5
6enum ceph_cls_lock_type {
7 CEPH_CLS_LOCK_NONE = 0,
8 CEPH_CLS_LOCK_EXCLUSIVE = 1,
9 CEPH_CLS_LOCK_SHARED = 2,
10};
11
12struct ceph_locker_id {
13 struct ceph_entity_name name; /* locker's client name */
14 char *cookie; /* locker's cookie */
15};
16
17struct ceph_locker_info {
18 struct ceph_entity_addr addr; /* locker's address */
19};
20
21struct ceph_locker {
22 struct ceph_locker_id id;
23 struct ceph_locker_info info;
24};
25
26int ceph_cls_lock(struct ceph_osd_client *osdc,
27 struct ceph_object_id *oid,
28 struct ceph_object_locator *oloc,
29 char *lock_name, u8 type, char *cookie,
30 char *tag, char *desc, u8 flags);
31int ceph_cls_unlock(struct ceph_osd_client *osdc,
32 struct ceph_object_id *oid,
33 struct ceph_object_locator *oloc,
34 char *lock_name, char *cookie);
35int ceph_cls_break_lock(struct ceph_osd_client *osdc,
36 struct ceph_object_id *oid,
37 struct ceph_object_locator *oloc,
38 char *lock_name, char *cookie,
39 struct ceph_entity_name *locker);
40
41void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers);
42
43int ceph_cls_lock_info(struct ceph_osd_client *osdc,
44 struct ceph_object_id *oid,
45 struct ceph_object_locator *oloc,
46 char *lock_name, u8 *type, char **tag,
47 struct ceph_locker **lockers, u32 *num_lockers);
48
49#endif
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 83fc1fff7061..1816c5e26581 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -264,7 +264,8 @@ extern struct ceph_client *ceph_create_client(struct ceph_options *opt,
264 void *private, 264 void *private,
265 u64 supported_features, 265 u64 supported_features,
266 u64 required_features); 266 u64 required_features);
267extern u64 ceph_client_id(struct ceph_client *client); 267struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client);
268u64 ceph_client_gid(struct ceph_client *client);
268extern void ceph_destroy_client(struct ceph_client *client); 269extern void ceph_destroy_client(struct ceph_client *client);
269extern int __ceph_open_session(struct ceph_client *client, 270extern int __ceph_open_session(struct ceph_client *client,
270 unsigned long started); 271 unsigned long started);
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index 24d704d1ea5c..d5a3ecea578d 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -141,6 +141,9 @@ int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
141int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, 141int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
142 ceph_monc_callback_t cb, u64 private_data); 142 ceph_monc_callback_t cb, u64 private_data);
143 143
144int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
145 struct ceph_entity_addr *client_addr);
146
144extern int ceph_monc_open_session(struct ceph_mon_client *monc); 147extern int ceph_monc_open_session(struct ceph_mon_client *monc);
145 148
146extern int ceph_monc_validate_auth(struct ceph_mon_client *monc); 149extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 858932304260..96337b15a60d 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -121,6 +121,9 @@ struct ceph_osd_req_op {
121 struct ceph_osd_data response_data; 121 struct ceph_osd_data response_data;
122 } notify; 122 } notify;
123 struct { 123 struct {
124 struct ceph_osd_data response_data;
125 } list_watchers;
126 struct {
124 u64 expected_object_size; 127 u64 expected_object_size;
125 u64 expected_write_size; 128 u64 expected_write_size;
126 } alloc_hint; 129 } alloc_hint;
@@ -249,6 +252,12 @@ struct ceph_osd_linger_request {
249 size_t *preply_len; 252 size_t *preply_len;
250}; 253};
251 254
255struct ceph_watch_item {
256 struct ceph_entity_name name;
257 u64 cookie;
258 struct ceph_entity_addr addr;
259};
260
252struct ceph_osd_client { 261struct ceph_osd_client {
253 struct ceph_client *client; 262 struct ceph_client *client;
254 263
@@ -346,7 +355,6 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
346 struct page **pages, u64 length, 355 struct page **pages, u64 length,
347 u32 alignment, bool pages_from_pool, 356 u32 alignment, bool pages_from_pool,
348 bool own_pages); 357 bool own_pages);
349
350extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req, 358extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
351 unsigned int which, u16 opcode, 359 unsigned int which, u16 opcode,
352 const char *class, const char *method); 360 const char *class, const char *method);
@@ -389,6 +397,14 @@ extern void ceph_osdc_sync(struct ceph_osd_client *osdc);
389extern void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc); 397extern void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc);
390void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc); 398void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc);
391 399
400int ceph_osdc_call(struct ceph_osd_client *osdc,
401 struct ceph_object_id *oid,
402 struct ceph_object_locator *oloc,
403 const char *class, const char *method,
404 unsigned int flags,
405 struct page *req_page, size_t req_len,
406 struct page *resp_page, size_t *resp_len);
407
392extern int ceph_osdc_readpages(struct ceph_osd_client *osdc, 408extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
393 struct ceph_vino vino, 409 struct ceph_vino vino,
394 struct ceph_file_layout *layout, 410 struct ceph_file_layout *layout,
@@ -434,5 +450,10 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
434 size_t *preply_len); 450 size_t *preply_len);
435int ceph_osdc_watch_check(struct ceph_osd_client *osdc, 451int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
436 struct ceph_osd_linger_request *lreq); 452 struct ceph_osd_linger_request *lreq);
453int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
454 struct ceph_object_id *oid,
455 struct ceph_object_locator *oloc,
456 struct ceph_watch_item **watchers,
457 u32 *num_watchers);
437#endif 458#endif
438 459
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index 84cbed630c4b..6a5180903e7b 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -5,6 +5,7 @@ obj-$(CONFIG_CEPH_LIB) += libceph.o
5 5
6libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \ 6libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
7 mon_client.o \ 7 mon_client.o \
8 cls_lock_client.o \
8 osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \ 9 osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \
9 debugfs.o \ 10 debugfs.o \
10 auth.o auth_none.o \ 11 auth.o auth_none.o \
diff --git a/net/ceph/auth.c b/net/ceph/auth.c
index 2bc5965fdd1e..c822b3ae1bd3 100644
--- a/net/ceph/auth.c
+++ b/net/ceph/auth.c
@@ -82,7 +82,10 @@ void ceph_auth_reset(struct ceph_auth_client *ac)
82 mutex_unlock(&ac->mutex); 82 mutex_unlock(&ac->mutex);
83} 83}
84 84
85int ceph_entity_name_encode(const char *name, void **p, void *end) 85/*
86 * EntityName, not to be confused with entity_name_t
87 */
88int ceph_auth_entity_name_encode(const char *name, void **p, void *end)
86{ 89{
87 int len = strlen(name); 90 int len = strlen(name);
88 91
@@ -111,7 +114,7 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len)
111 monhdr->session_mon = cpu_to_le16(-1); 114 monhdr->session_mon = cpu_to_le16(-1);
112 monhdr->session_mon_tid = 0; 115 monhdr->session_mon_tid = 0;
113 116
114 ceph_encode_32(&p, 0); /* no protocol, yet */ 117 ceph_encode_32(&p, CEPH_AUTH_UNKNOWN); /* no protocol, yet */
115 118
116 lenp = p; 119 lenp = p;
117 p += sizeof(u32); 120 p += sizeof(u32);
@@ -124,7 +127,7 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len)
124 for (i = 0; i < num; i++) 127 for (i = 0; i < num; i++)
125 ceph_encode_32(&p, supported_protocols[i]); 128 ceph_encode_32(&p, supported_protocols[i]);
126 129
127 ret = ceph_entity_name_encode(ac->name, &p, end); 130 ret = ceph_auth_entity_name_encode(ac->name, &p, end);
128 if (ret < 0) 131 if (ret < 0)
129 goto out; 132 goto out;
130 ceph_decode_need(&p, end, sizeof(u64), bad); 133 ceph_decode_need(&p, end, sizeof(u64), bad);
@@ -259,9 +262,7 @@ int ceph_build_auth(struct ceph_auth_client *ac,
259 int ret = 0; 262 int ret = 0;
260 263
261 mutex_lock(&ac->mutex); 264 mutex_lock(&ac->mutex);
262 if (!ac->protocol) 265 if (ac->ops->should_authenticate(ac))
263 ret = ceph_auth_build_hello(ac, msg_buf, msg_len);
264 else if (ac->ops->should_authenticate(ac))
265 ret = ceph_build_auth_request(ac, msg_buf, msg_len); 266 ret = ceph_build_auth_request(ac, msg_buf, msg_len);
266 mutex_unlock(&ac->mutex); 267 mutex_unlock(&ac->mutex);
267 return ret; 268 return ret;
diff --git a/net/ceph/auth_none.c b/net/ceph/auth_none.c
index 5f836f02ae36..df45e467c81f 100644
--- a/net/ceph/auth_none.c
+++ b/net/ceph/auth_none.c
@@ -46,7 +46,7 @@ static int ceph_auth_none_build_authorizer(struct ceph_auth_client *ac,
46 int ret; 46 int ret;
47 47
48 ceph_encode_8_safe(&p, end, 1, e_range); 48 ceph_encode_8_safe(&p, end, 1, e_range);
49 ret = ceph_entity_name_encode(ac->name, &p, end); 49 ret = ceph_auth_entity_name_encode(ac->name, &p, end);
50 if (ret < 0) 50 if (ret < 0)
51 return ret; 51 return ret;
52 52
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index bddfcf6f09c2..464e88599b9d 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -566,11 +566,17 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client)
566} 566}
567EXPORT_SYMBOL(ceph_print_client_options); 567EXPORT_SYMBOL(ceph_print_client_options);
568 568
569u64 ceph_client_id(struct ceph_client *client) 569struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client)
570{
571 return &client->msgr.inst.addr;
572}
573EXPORT_SYMBOL(ceph_client_addr);
574
575u64 ceph_client_gid(struct ceph_client *client)
570{ 576{
571 return client->monc.auth->global_id; 577 return client->monc.auth->global_id;
572} 578}
573EXPORT_SYMBOL(ceph_client_id); 579EXPORT_SYMBOL(ceph_client_gid);
574 580
575/* 581/*
576 * create a fresh client instance 582 * create a fresh client instance
@@ -685,7 +691,8 @@ int __ceph_open_session(struct ceph_client *client, unsigned long started)
685 return client->auth_err; 691 return client->auth_err;
686 } 692 }
687 693
688 pr_info("client%llu fsid %pU\n", ceph_client_id(client), &client->fsid); 694 pr_info("client%llu fsid %pU\n", ceph_client_gid(client),
695 &client->fsid);
689 ceph_debugfs_client_init(client); 696 ceph_debugfs_client_init(client);
690 697
691 return 0; 698 return 0;
diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c
index 3773a4fa11e3..19b7d8aa915c 100644
--- a/net/ceph/ceph_strings.c
+++ b/net/ceph/ceph_strings.c
@@ -15,6 +15,7 @@ const char *ceph_entity_type_name(int type)
15 default: return "unknown"; 15 default: return "unknown";
16 } 16 }
17} 17}
18EXPORT_SYMBOL(ceph_entity_type_name);
18 19
19const char *ceph_osd_op_name(int op) 20const char *ceph_osd_op_name(int op)
20{ 21{
diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
new file mode 100644
index 000000000000..50f040fdb2a9
--- /dev/null
+++ b/net/ceph/cls_lock_client.c
@@ -0,0 +1,325 @@
1#include <linux/ceph/ceph_debug.h>
2
3#include <linux/types.h>
4#include <linux/slab.h>
5
6#include <linux/ceph/cls_lock_client.h>
7#include <linux/ceph/decode.h>
8
9/**
10 * ceph_cls_lock - grab rados lock for object
11 * @oid, @oloc: object to lock
12 * @lock_name: the name of the lock
13 * @type: lock type (CEPH_CLS_LOCK_EXCLUSIVE or CEPH_CLS_LOCK_SHARED)
14 * @cookie: user-defined identifier for this instance of the lock
15 * @tag: user-defined tag
16 * @desc: user-defined lock description
17 * @flags: lock flags
18 *
19 * All operations on the same lock should use the same tag.
20 */
21int ceph_cls_lock(struct ceph_osd_client *osdc,
22 struct ceph_object_id *oid,
23 struct ceph_object_locator *oloc,
24 char *lock_name, u8 type, char *cookie,
25 char *tag, char *desc, u8 flags)
26{
27 int lock_op_buf_size;
28 int name_len = strlen(lock_name);
29 int cookie_len = strlen(cookie);
30 int tag_len = strlen(tag);
31 int desc_len = strlen(desc);
32 void *p, *end;
33 struct page *lock_op_page;
34 struct timespec mtime;
35 int ret;
36
37 lock_op_buf_size = name_len + sizeof(__le32) +
38 cookie_len + sizeof(__le32) +
39 tag_len + sizeof(__le32) +
40 desc_len + sizeof(__le32) +
41 sizeof(struct ceph_timespec) +
42 /* flag and type */
43 sizeof(u8) + sizeof(u8) +
44 CEPH_ENCODING_START_BLK_LEN;
45 if (lock_op_buf_size > PAGE_SIZE)
46 return -E2BIG;
47
48 lock_op_page = alloc_page(GFP_NOIO);
49 if (!lock_op_page)
50 return -ENOMEM;
51
52 p = page_address(lock_op_page);
53 end = p + lock_op_buf_size;
54
55 /* encode cls_lock_lock_op struct */
56 ceph_start_encoding(&p, 1, 1,
57 lock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
58 ceph_encode_string(&p, end, lock_name, name_len);
59 ceph_encode_8(&p, type);
60 ceph_encode_string(&p, end, cookie, cookie_len);
61 ceph_encode_string(&p, end, tag, tag_len);
62 ceph_encode_string(&p, end, desc, desc_len);
63 /* only support infinite duration */
64 memset(&mtime, 0, sizeof(mtime));
65 ceph_encode_timespec(p, &mtime);
66 p += sizeof(struct ceph_timespec);
67 ceph_encode_8(&p, flags);
68
69 dout("%s lock_name %s type %d cookie %s tag %s desc %s flags 0x%x\n",
70 __func__, lock_name, type, cookie, tag, desc, flags);
71 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "lock",
72 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
73 lock_op_page, lock_op_buf_size, NULL, NULL);
74
75 dout("%s: status %d\n", __func__, ret);
76 __free_page(lock_op_page);
77 return ret;
78}
79EXPORT_SYMBOL(ceph_cls_lock);
80
81/**
82 * ceph_cls_unlock - release rados lock for object
83 * @oid, @oloc: object to lock
84 * @lock_name: the name of the lock
85 * @cookie: user-defined identifier for this instance of the lock
86 */
87int ceph_cls_unlock(struct ceph_osd_client *osdc,
88 struct ceph_object_id *oid,
89 struct ceph_object_locator *oloc,
90 char *lock_name, char *cookie)
91{
92 int unlock_op_buf_size;
93 int name_len = strlen(lock_name);
94 int cookie_len = strlen(cookie);
95 void *p, *end;
96 struct page *unlock_op_page;
97 int ret;
98
99 unlock_op_buf_size = name_len + sizeof(__le32) +
100 cookie_len + sizeof(__le32) +
101 CEPH_ENCODING_START_BLK_LEN;
102 if (unlock_op_buf_size > PAGE_SIZE)
103 return -E2BIG;
104
105 unlock_op_page = alloc_page(GFP_NOIO);
106 if (!unlock_op_page)
107 return -ENOMEM;
108
109 p = page_address(unlock_op_page);
110 end = p + unlock_op_buf_size;
111
112 /* encode cls_lock_unlock_op struct */
113 ceph_start_encoding(&p, 1, 1,
114 unlock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
115 ceph_encode_string(&p, end, lock_name, name_len);
116 ceph_encode_string(&p, end, cookie, cookie_len);
117
118 dout("%s lock_name %s cookie %s\n", __func__, lock_name, cookie);
119 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "unlock",
120 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
121 unlock_op_page, unlock_op_buf_size, NULL, NULL);
122
123 dout("%s: status %d\n", __func__, ret);
124 __free_page(unlock_op_page);
125 return ret;
126}
127EXPORT_SYMBOL(ceph_cls_unlock);
128
129/**
130 * ceph_cls_break_lock - release rados lock for object for specified client
131 * @oid, @oloc: object to lock
132 * @lock_name: the name of the lock
133 * @cookie: user-defined identifier for this instance of the lock
134 * @locker: current lock owner
135 */
136int ceph_cls_break_lock(struct ceph_osd_client *osdc,
137 struct ceph_object_id *oid,
138 struct ceph_object_locator *oloc,
139 char *lock_name, char *cookie,
140 struct ceph_entity_name *locker)
141{
142 int break_op_buf_size;
143 int name_len = strlen(lock_name);
144 int cookie_len = strlen(cookie);
145 struct page *break_op_page;
146 void *p, *end;
147 int ret;
148
149 break_op_buf_size = name_len + sizeof(__le32) +
150 cookie_len + sizeof(__le32) +
151 sizeof(u8) + sizeof(__le64) +
152 CEPH_ENCODING_START_BLK_LEN;
153 if (break_op_buf_size > PAGE_SIZE)
154 return -E2BIG;
155
156 break_op_page = alloc_page(GFP_NOIO);
157 if (!break_op_page)
158 return -ENOMEM;
159
160 p = page_address(break_op_page);
161 end = p + break_op_buf_size;
162
163 /* encode cls_lock_break_op struct */
164 ceph_start_encoding(&p, 1, 1,
165 break_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
166 ceph_encode_string(&p, end, lock_name, name_len);
167 ceph_encode_copy(&p, locker, sizeof(*locker));
168 ceph_encode_string(&p, end, cookie, cookie_len);
169
170 dout("%s lock_name %s cookie %s locker %s%llu\n", __func__, lock_name,
171 cookie, ENTITY_NAME(*locker));
172 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "break_lock",
173 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
174 break_op_page, break_op_buf_size, NULL, NULL);
175
176 dout("%s: status %d\n", __func__, ret);
177 __free_page(break_op_page);
178 return ret;
179}
180EXPORT_SYMBOL(ceph_cls_break_lock);
181
182void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers)
183{
184 int i;
185
186 for (i = 0; i < num_lockers; i++)
187 kfree(lockers[i].id.cookie);
188 kfree(lockers);
189}
190EXPORT_SYMBOL(ceph_free_lockers);
191
192static int decode_locker(void **p, void *end, struct ceph_locker *locker)
193{
194 u8 struct_v;
195 u32 len;
196 char *s;
197 int ret;
198
199 ret = ceph_start_decoding(p, end, 1, "locker_id_t", &struct_v, &len);
200 if (ret)
201 return ret;
202
203 ceph_decode_copy(p, &locker->id.name, sizeof(locker->id.name));
204 s = ceph_extract_encoded_string(p, end, NULL, GFP_NOIO);
205 if (IS_ERR(s))
206 return PTR_ERR(s);
207
208 locker->id.cookie = s;
209
210 ret = ceph_start_decoding(p, end, 1, "locker_info_t", &struct_v, &len);
211 if (ret)
212 return ret;
213
214 *p += sizeof(struct ceph_timespec); /* skip expiration */
215 ceph_decode_copy(p, &locker->info.addr, sizeof(locker->info.addr));
216 ceph_decode_addr(&locker->info.addr);
217 len = ceph_decode_32(p);
218 *p += len; /* skip description */
219
220 dout("%s %s%llu cookie %s addr %s\n", __func__,
221 ENTITY_NAME(locker->id.name), locker->id.cookie,
222 ceph_pr_addr(&locker->info.addr.in_addr));
223 return 0;
224}
225
226static int decode_lockers(void **p, void *end, u8 *type, char **tag,
227 struct ceph_locker **lockers, u32 *num_lockers)
228{
229 u8 struct_v;
230 u32 struct_len;
231 char *s;
232 int i;
233 int ret;
234
235 ret = ceph_start_decoding(p, end, 1, "cls_lock_get_info_reply",
236 &struct_v, &struct_len);
237 if (ret)
238 return ret;
239
240 *num_lockers = ceph_decode_32(p);
241 *lockers = kcalloc(*num_lockers, sizeof(**lockers), GFP_NOIO);
242 if (!*lockers)
243 return -ENOMEM;
244
245 for (i = 0; i < *num_lockers; i++) {
246 ret = decode_locker(p, end, *lockers + i);
247 if (ret)
248 goto err_free_lockers;
249 }
250
251 *type = ceph_decode_8(p);
252 s = ceph_extract_encoded_string(p, end, NULL, GFP_NOIO);
253 if (IS_ERR(s)) {
254 ret = PTR_ERR(s);
255 goto err_free_lockers;
256 }
257
258 *tag = s;
259 return 0;
260
261err_free_lockers:
262 ceph_free_lockers(*lockers, *num_lockers);
263 return ret;
264}
265
266/*
267 * On success, the caller is responsible for:
268 *
269 * kfree(tag);
270 * ceph_free_lockers(lockers, num_lockers);
271 */
272int ceph_cls_lock_info(struct ceph_osd_client *osdc,
273 struct ceph_object_id *oid,
274 struct ceph_object_locator *oloc,
275 char *lock_name, u8 *type, char **tag,
276 struct ceph_locker **lockers, u32 *num_lockers)
277{
278 int get_info_op_buf_size;
279 int name_len = strlen(lock_name);
280 struct page *get_info_op_page, *reply_page;
281 size_t reply_len;
282 void *p, *end;
283 int ret;
284
285 get_info_op_buf_size = name_len + sizeof(__le32) +
286 CEPH_ENCODING_START_BLK_LEN;
287 if (get_info_op_buf_size > PAGE_SIZE)
288 return -E2BIG;
289
290 get_info_op_page = alloc_page(GFP_NOIO);
291 if (!get_info_op_page)
292 return -ENOMEM;
293
294 reply_page = alloc_page(GFP_NOIO);
295 if (!reply_page) {
296 __free_page(get_info_op_page);
297 return -ENOMEM;
298 }
299
300 p = page_address(get_info_op_page);
301 end = p + get_info_op_buf_size;
302
303 /* encode cls_lock_get_info_op struct */
304 ceph_start_encoding(&p, 1, 1,
305 get_info_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
306 ceph_encode_string(&p, end, lock_name, name_len);
307
308 dout("%s lock_name %s\n", __func__, lock_name);
309 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "get_info",
310 CEPH_OSD_FLAG_READ, get_info_op_page,
311 get_info_op_buf_size, reply_page, &reply_len);
312
313 dout("%s: status %d\n", __func__, ret);
314 if (ret >= 0) {
315 p = page_address(reply_page);
316 end = p + reply_len;
317
318 ret = decode_lockers(&p, end, type, tag, lockers, num_lockers);
319 }
320
321 __free_page(get_info_op_page);
322 __free_page(reply_page);
323 return ret;
324}
325EXPORT_SYMBOL(ceph_cls_lock_info);
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 5fcfb98f309e..a421e905331a 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -245,7 +245,7 @@ static int bucket_straw_choose(struct crush_bucket_straw *bucket,
245/* compute 2^44*log2(input+1) */ 245/* compute 2^44*log2(input+1) */
246static __u64 crush_ln(unsigned int xin) 246static __u64 crush_ln(unsigned int xin)
247{ 247{
248 unsigned int x = xin, x1; 248 unsigned int x = xin;
249 int iexpon, index1, index2; 249 int iexpon, index1, index2;
250 __u64 RH, LH, LL, xl64, result; 250 __u64 RH, LH, LL, xl64, result;
251 251
@@ -253,9 +253,15 @@ static __u64 crush_ln(unsigned int xin)
253 253
254 /* normalize input */ 254 /* normalize input */
255 iexpon = 15; 255 iexpon = 15;
256 while (!(x & 0x18000)) { 256
257 x <<= 1; 257 /*
258 iexpon--; 258 * figure out number of bits we need to shift and
259 * do it in one step instead of iteratively
260 */
261 if (!(x & 0x18000)) {
262 int bits = __builtin_clz(x & 0x1FFFF) - 16;
263 x <<= bits;
264 iexpon = 15 - bits;
259 } 265 }
260 266
261 index1 = (x >> 8) << 1; 267 index1 = (x >> 8) << 1;
@@ -267,12 +273,11 @@ static __u64 crush_ln(unsigned int xin)
267 /* RH*x ~ 2^48 * (2^15 + xf), xf<2^8 */ 273 /* RH*x ~ 2^48 * (2^15 + xf), xf<2^8 */
268 xl64 = (__s64)x * RH; 274 xl64 = (__s64)x * RH;
269 xl64 >>= 48; 275 xl64 >>= 48;
270 x1 = xl64;
271 276
272 result = iexpon; 277 result = iexpon;
273 result <<= (12 + 32); 278 result <<= (12 + 32);
274 279
275 index2 = x1 & 0xff; 280 index2 = xl64 & 0xff;
276 /* LL ~ 2^48*log2(1.0+index2/2^15) */ 281 /* LL ~ 2^48*log2(1.0+index2/2^15) */
277 LL = __LL_tbl[index2]; 282 LL = __LL_tbl[index2];
278 283
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index ef34a02719d7..a8effc8b7280 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -835,6 +835,83 @@ int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
835} 835}
836EXPORT_SYMBOL(ceph_monc_get_version_async); 836EXPORT_SYMBOL(ceph_monc_get_version_async);
837 837
838static void handle_command_ack(struct ceph_mon_client *monc,
839 struct ceph_msg *msg)
840{
841 struct ceph_mon_generic_request *req;
842 void *p = msg->front.iov_base;
843 void *const end = p + msg->front_alloc_len;
844 u64 tid = le64_to_cpu(msg->hdr.tid);
845
846 dout("%s msg %p tid %llu\n", __func__, msg, tid);
847
848 ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
849 sizeof(u32), bad);
850 p += sizeof(struct ceph_mon_request_header);
851
852 mutex_lock(&monc->mutex);
853 req = lookup_generic_request(&monc->generic_request_tree, tid);
854 if (!req) {
855 mutex_unlock(&monc->mutex);
856 return;
857 }
858
859 req->result = ceph_decode_32(&p);
860 __finish_generic_request(req);
861 mutex_unlock(&monc->mutex);
862
863 complete_generic_request(req);
864 return;
865
866bad:
867 pr_err("corrupt mon_command ack, tid %llu\n", tid);
868 ceph_msg_dump(msg);
869}
870
871int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
872 struct ceph_entity_addr *client_addr)
873{
874 struct ceph_mon_generic_request *req;
875 struct ceph_mon_command *h;
876 int ret = -ENOMEM;
877 int len;
878
879 req = alloc_generic_request(monc, GFP_NOIO);
880 if (!req)
881 goto out;
882
883 req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
884 if (!req->request)
885 goto out;
886
887 req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
888 true);
889 if (!req->reply)
890 goto out;
891
892 mutex_lock(&monc->mutex);
893 register_generic_request(req);
894 h = req->request->front.iov_base;
895 h->monhdr.have_version = 0;
896 h->monhdr.session_mon = cpu_to_le16(-1);
897 h->monhdr.session_mon_tid = 0;
898 h->fsid = monc->monmap->fsid;
899 h->num_strs = cpu_to_le32(1);
900 len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
901 \"blacklistop\": \"add\", \
902 \"addr\": \"%pISpc/%u\" }",
903 &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
904 h->str_len = cpu_to_le32(len);
905 send_generic_request(monc, req);
906 mutex_unlock(&monc->mutex);
907
908 ret = wait_generic_request(req);
909out:
910 put_generic_request(req);
911 return ret;
912}
913EXPORT_SYMBOL(ceph_monc_blacklist_add);
914
838/* 915/*
839 * Resend pending generic requests. 916 * Resend pending generic requests.
840 */ 917 */
@@ -1139,6 +1216,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1139 handle_get_version_reply(monc, msg); 1216 handle_get_version_reply(monc, msg);
1140 break; 1217 break;
1141 1218
1219 case CEPH_MSG_MON_COMMAND_ACK:
1220 handle_command_ack(monc, msg);
1221 break;
1222
1142 case CEPH_MSG_MON_MAP: 1223 case CEPH_MSG_MON_MAP:
1143 ceph_monc_handle_map(monc, msg); 1224 ceph_monc_handle_map(monc, msg);
1144 break; 1225 break;
@@ -1178,6 +1259,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1178 m = ceph_msg_get(monc->m_subscribe_ack); 1259 m = ceph_msg_get(monc->m_subscribe_ack);
1179 break; 1260 break;
1180 case CEPH_MSG_STATFS_REPLY: 1261 case CEPH_MSG_STATFS_REPLY:
1262 case CEPH_MSG_MON_COMMAND_ACK:
1181 return get_generic_reply(con, hdr, skip); 1263 return get_generic_reply(con, hdr, skip);
1182 case CEPH_MSG_AUTH_REPLY: 1264 case CEPH_MSG_AUTH_REPLY:
1183 m = ceph_msg_get(monc->m_auth_reply); 1265 m = ceph_msg_get(monc->m_auth_reply);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index a97e7b506612..d9bf7a1d0a58 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -338,6 +338,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
338 ceph_osd_data_release(&op->notify.request_data); 338 ceph_osd_data_release(&op->notify.request_data);
339 ceph_osd_data_release(&op->notify.response_data); 339 ceph_osd_data_release(&op->notify.response_data);
340 break; 340 break;
341 case CEPH_OSD_OP_LIST_WATCHERS:
342 ceph_osd_data_release(&op->list_watchers.response_data);
343 break;
341 default: 344 default:
342 break; 345 break;
343 } 346 }
@@ -863,6 +866,8 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
863 case CEPH_OSD_OP_NOTIFY: 866 case CEPH_OSD_OP_NOTIFY:
864 dst->notify.cookie = cpu_to_le64(src->notify.cookie); 867 dst->notify.cookie = cpu_to_le64(src->notify.cookie);
865 break; 868 break;
869 case CEPH_OSD_OP_LIST_WATCHERS:
870 break;
866 case CEPH_OSD_OP_SETALLOCHINT: 871 case CEPH_OSD_OP_SETALLOCHINT:
867 dst->alloc_hint.expected_object_size = 872 dst->alloc_hint.expected_object_size =
868 cpu_to_le64(src->alloc_hint.expected_object_size); 873 cpu_to_le64(src->alloc_hint.expected_object_size);
@@ -1445,6 +1450,10 @@ static void setup_request_data(struct ceph_osd_request *req,
1445 ceph_osdc_msg_data_add(req->r_reply, 1450 ceph_osdc_msg_data_add(req->r_reply,
1446 &op->extent.osd_data); 1451 &op->extent.osd_data);
1447 break; 1452 break;
1453 case CEPH_OSD_OP_LIST_WATCHERS:
1454 ceph_osdc_msg_data_add(req->r_reply,
1455 &op->list_watchers.response_data);
1456 break;
1448 1457
1449 /* both */ 1458 /* both */
1450 case CEPH_OSD_OP_CALL: 1459 case CEPH_OSD_OP_CALL:
@@ -3891,12 +3900,121 @@ int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
3891 return ret; 3900 return ret;
3892} 3901}
3893 3902
3903static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
3904{
3905 u8 struct_v;
3906 u32 struct_len;
3907 int ret;
3908
3909 ret = ceph_start_decoding(p, end, 2, "watch_item_t",
3910 &struct_v, &struct_len);
3911 if (ret)
3912 return ret;
3913
3914 ceph_decode_copy(p, &item->name, sizeof(item->name));
3915 item->cookie = ceph_decode_64(p);
3916 *p += 4; /* skip timeout_seconds */
3917 if (struct_v >= 2) {
3918 ceph_decode_copy(p, &item->addr, sizeof(item->addr));
3919 ceph_decode_addr(&item->addr);
3920 }
3921
3922 dout("%s %s%llu cookie %llu addr %s\n", __func__,
3923 ENTITY_NAME(item->name), item->cookie,
3924 ceph_pr_addr(&item->addr.in_addr));
3925 return 0;
3926}
3927
3928static int decode_watchers(void **p, void *end,
3929 struct ceph_watch_item **watchers,
3930 u32 *num_watchers)
3931{
3932 u8 struct_v;
3933 u32 struct_len;
3934 int i;
3935 int ret;
3936
3937 ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
3938 &struct_v, &struct_len);
3939 if (ret)
3940 return ret;
3941
3942 *num_watchers = ceph_decode_32(p);
3943 *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
3944 if (!*watchers)
3945 return -ENOMEM;
3946
3947 for (i = 0; i < *num_watchers; i++) {
3948 ret = decode_watcher(p, end, *watchers + i);
3949 if (ret) {
3950 kfree(*watchers);
3951 return ret;
3952 }
3953 }
3954
3955 return 0;
3956}
3957
3958/*
3959 * On success, the caller is responsible for:
3960 *
3961 * kfree(watchers);
3962 */
3963int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
3964 struct ceph_object_id *oid,
3965 struct ceph_object_locator *oloc,
3966 struct ceph_watch_item **watchers,
3967 u32 *num_watchers)
3968{
3969 struct ceph_osd_request *req;
3970 struct page **pages;
3971 int ret;
3972
3973 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
3974 if (!req)
3975 return -ENOMEM;
3976
3977 ceph_oid_copy(&req->r_base_oid, oid);
3978 ceph_oloc_copy(&req->r_base_oloc, oloc);
3979 req->r_flags = CEPH_OSD_FLAG_READ;
3980
3981 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3982 if (ret)
3983 goto out_put_req;
3984
3985 pages = ceph_alloc_page_vector(1, GFP_NOIO);
3986 if (IS_ERR(pages)) {
3987 ret = PTR_ERR(pages);
3988 goto out_put_req;
3989 }
3990
3991 osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
3992 ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
3993 response_data),
3994 pages, PAGE_SIZE, 0, false, true);
3995
3996 ceph_osdc_start_request(osdc, req, false);
3997 ret = ceph_osdc_wait_request(osdc, req);
3998 if (ret >= 0) {
3999 void *p = page_address(pages[0]);
4000 void *const end = p + req->r_ops[0].outdata_len;
4001
4002 ret = decode_watchers(&p, end, watchers, num_watchers);
4003 }
4004
4005out_put_req:
4006 ceph_osdc_put_request(req);
4007 return ret;
4008}
4009EXPORT_SYMBOL(ceph_osdc_list_watchers);
4010
3894/* 4011/*
3895 * Call all pending notify callbacks - for use after a watch is 4012 * Call all pending notify callbacks - for use after a watch is
3896 * unregistered, to make sure no more callbacks for it will be invoked 4013 * unregistered, to make sure no more callbacks for it will be invoked
3897 */ 4014 */
3898void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) 4015void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
3899{ 4016{
4017 dout("%s osdc %p\n", __func__, osdc);
3900 flush_workqueue(osdc->notify_wq); 4018 flush_workqueue(osdc->notify_wq);
3901} 4019}
3902EXPORT_SYMBOL(ceph_osdc_flush_notifies); 4020EXPORT_SYMBOL(ceph_osdc_flush_notifies);
@@ -3910,6 +4028,57 @@ void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
3910EXPORT_SYMBOL(ceph_osdc_maybe_request_map); 4028EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
3911 4029
3912/* 4030/*
4031 * Execute an OSD class method on an object.
4032 *
4033 * @flags: CEPH_OSD_FLAG_*
4034 * @resp_len: out param for reply length
4035 */
4036int ceph_osdc_call(struct ceph_osd_client *osdc,
4037 struct ceph_object_id *oid,
4038 struct ceph_object_locator *oloc,
4039 const char *class, const char *method,
4040 unsigned int flags,
4041 struct page *req_page, size_t req_len,
4042 struct page *resp_page, size_t *resp_len)
4043{
4044 struct ceph_osd_request *req;
4045 int ret;
4046
4047 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4048 if (!req)
4049 return -ENOMEM;
4050
4051 ceph_oid_copy(&req->r_base_oid, oid);
4052 ceph_oloc_copy(&req->r_base_oloc, oloc);
4053 req->r_flags = flags;
4054
4055 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4056 if (ret)
4057 goto out_put_req;
4058
4059 osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
4060 if (req_page)
4061 osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
4062 0, false, false);
4063 if (resp_page)
4064 osd_req_op_cls_response_data_pages(req, 0, &resp_page,
4065 PAGE_SIZE, 0, false, false);
4066
4067 ceph_osdc_start_request(osdc, req, false);
4068 ret = ceph_osdc_wait_request(osdc, req);
4069 if (ret >= 0) {
4070 ret = req->r_ops[0].rval;
4071 if (resp_page)
4072 *resp_len = req->r_ops[0].outdata_len;
4073 }
4074
4075out_put_req:
4076 ceph_osdc_put_request(req);
4077 return ret;
4078}
4079EXPORT_SYMBOL(ceph_osdc_call);
4080
4081/*
3913 * init, shutdown 4082 * init, shutdown
3914 */ 4083 */
3915int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 4084int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)