aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/block
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2013-05-06 16:11:19 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2013-05-06 16:11:19 -0400
commit91f8575685e35f3bd021286bc82d26397458f5a9 (patch)
tree09de8d889758a12071adb9427ed741e27c907aa6 /drivers/block
parent2e378f3eebd28feefbb1f9953834a5a19482f053 (diff)
parentb5b09be30cf99f9c699e825629f02e3bce555d44 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph changes from Alex Elder: "This is a big pull. Most of it is culmination of Alex's work to implement RBD image layering, which is now complete (yay!). There is also some work from Yan to fix i_mutex behavior surrounding writes in cephfs, a sync write fix, a fix for RBD images that get resized while they are mapped, and a few patches from me that resolve annoying auth warnings and fix several bugs in the ceph auth code." * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (254 commits) rbd: fix image request leak on parent read libceph: use slab cache for osd client requests libceph: allocate ceph message data with a slab allocator libceph: allocate ceph messages with a slab allocator rbd: allocate image object names with a slab allocator rbd: allocate object requests with a slab allocator rbd: allocate name separate from obj_request rbd: allocate image requests with a slab allocator rbd: use binary search for snapshot lookup rbd: clear EXISTS flag if mapped snapshot disappears rbd: kill off the snapshot list rbd: define rbd_snap_size() and rbd_snap_features() rbd: use snap_id not index to look up snap info rbd: look up snapshot name in names buffer rbd: drop obj_request->version rbd: drop rbd_obj_method_sync() version parameter rbd: more version parameter removal rbd: get rid of some version parameters rbd: stop tracking header object version rbd: snap names are pointer to constant data ...
Diffstat (limited to 'drivers/block')
-rw-r--r--drivers/block/rbd.c2858
1 files changed, 1834 insertions, 1024 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index b7b7a88d9f68..c2ca1818f335 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1,3 +1,4 @@
1
1/* 2/*
2 rbd.c -- Export ceph rados objects as a Linux block device 3 rbd.c -- Export ceph rados objects as a Linux block device
3 4
@@ -32,12 +33,14 @@
32#include <linux/ceph/mon_client.h> 33#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h> 34#include <linux/ceph/decode.h>
34#include <linux/parser.h> 35#include <linux/parser.h>
36#include <linux/bsearch.h>
35 37
36#include <linux/kernel.h> 38#include <linux/kernel.h>
37#include <linux/device.h> 39#include <linux/device.h>
38#include <linux/module.h> 40#include <linux/module.h>
39#include <linux/fs.h> 41#include <linux/fs.h>
40#include <linux/blkdev.h> 42#include <linux/blkdev.h>
43#include <linux/slab.h>
41 44
42#include "rbd_types.h" 45#include "rbd_types.h"
43 46
@@ -52,13 +55,6 @@
52#define SECTOR_SHIFT 9 55#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT) 56#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54 57
55/* It might be useful to have these defined elsewhere */
56
57#define U8_MAX ((u8) (~0U))
58#define U16_MAX ((u16) (~0U))
59#define U32_MAX ((u32) (~0U))
60#define U64_MAX ((u64) (~0ULL))
61
62#define RBD_DRV_NAME "rbd" 58#define RBD_DRV_NAME "rbd"
63#define RBD_DRV_NAME_LONG "rbd (rados block device)" 59#define RBD_DRV_NAME_LONG "rbd (rados block device)"
64 60
@@ -72,6 +68,8 @@
72 68
73#define RBD_SNAP_HEAD_NAME "-" 69#define RBD_SNAP_HEAD_NAME "-"
74 70
71#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
72
75/* This allows a single page to hold an image name sent by OSD */ 73/* This allows a single page to hold an image name sent by OSD */
76#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) 74#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
77#define RBD_IMAGE_ID_LEN_MAX 64 75#define RBD_IMAGE_ID_LEN_MAX 64
@@ -80,11 +78,14 @@
80 78
81/* Feature bits */ 79/* Feature bits */
82 80
83#define RBD_FEATURE_LAYERING 1 81#define RBD_FEATURE_LAYERING (1<<0)
82#define RBD_FEATURE_STRIPINGV2 (1<<1)
83#define RBD_FEATURES_ALL \
84 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
84 85
85/* Features supported by this (client software) implementation. */ 86/* Features supported by this (client software) implementation. */
86 87
87#define RBD_FEATURES_ALL (0) 88#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
88 89
89/* 90/*
90 * An RBD device name will be "rbd#", where the "rbd" comes from 91 * An RBD device name will be "rbd#", where the "rbd" comes from
@@ -112,7 +113,8 @@ struct rbd_image_header {
112 char *snap_names; 113 char *snap_names;
113 u64 *snap_sizes; 114 u64 *snap_sizes;
114 115
115 u64 obj_version; 116 u64 stripe_unit;
117 u64 stripe_count;
116}; 118};
117 119
118/* 120/*
@@ -142,13 +144,13 @@ struct rbd_image_header {
142 */ 144 */
143struct rbd_spec { 145struct rbd_spec {
144 u64 pool_id; 146 u64 pool_id;
145 char *pool_name; 147 const char *pool_name;
146 148
147 char *image_id; 149 const char *image_id;
148 char *image_name; 150 const char *image_name;
149 151
150 u64 snap_id; 152 u64 snap_id;
151 char *snap_name; 153 const char *snap_name;
152 154
153 struct kref kref; 155 struct kref kref;
154}; 156};
@@ -174,13 +176,44 @@ enum obj_request_type {
174 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES 176 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175}; 177};
176 178
179enum obj_req_flags {
180 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
181 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
182 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
183 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
184};
185
177struct rbd_obj_request { 186struct rbd_obj_request {
178 const char *object_name; 187 const char *object_name;
179 u64 offset; /* object start byte */ 188 u64 offset; /* object start byte */
180 u64 length; /* bytes from offset */ 189 u64 length; /* bytes from offset */
190 unsigned long flags;
181 191
182 struct rbd_img_request *img_request; 192 /*
183 struct list_head links; /* img_request->obj_requests */ 193 * An object request associated with an image will have its
194 * img_data flag set; a standalone object request will not.
195 *
196 * A standalone object request will have which == BAD_WHICH
197 * and a null obj_request pointer.
198 *
199 * An object request initiated in support of a layered image
200 * object (to check for its existence before a write) will
201 * have which == BAD_WHICH and a non-null obj_request pointer.
202 *
203 * Finally, an object request for rbd image data will have
204 * which != BAD_WHICH, and will have a non-null img_request
205 * pointer. The value of which will be in the range
206 * 0..(img_request->obj_request_count-1).
207 */
208 union {
209 struct rbd_obj_request *obj_request; /* STAT op */
210 struct {
211 struct rbd_img_request *img_request;
212 u64 img_offset;
213 /* links for img_request->obj_requests list */
214 struct list_head links;
215 };
216 };
184 u32 which; /* posn image request list */ 217 u32 which; /* posn image request list */
185 218
186 enum obj_request_type type; 219 enum obj_request_type type;
@@ -191,13 +224,12 @@ struct rbd_obj_request {
191 u32 page_count; 224 u32 page_count;
192 }; 225 };
193 }; 226 };
227 struct page **copyup_pages;
194 228
195 struct ceph_osd_request *osd_req; 229 struct ceph_osd_request *osd_req;
196 230
197 u64 xferred; /* bytes transferred */ 231 u64 xferred; /* bytes transferred */
198 u64 version;
199 int result; 232 int result;
200 atomic_t done;
201 233
202 rbd_obj_callback_t callback; 234 rbd_obj_callback_t callback;
203 struct completion completion; 235 struct completion completion;
@@ -205,19 +237,31 @@ struct rbd_obj_request {
205 struct kref kref; 237 struct kref kref;
206}; 238};
207 239
240enum img_req_flags {
241 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
242 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
243 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
244};
245
208struct rbd_img_request { 246struct rbd_img_request {
209 struct request *rq;
210 struct rbd_device *rbd_dev; 247 struct rbd_device *rbd_dev;
211 u64 offset; /* starting image byte offset */ 248 u64 offset; /* starting image byte offset */
212 u64 length; /* byte count from offset */ 249 u64 length; /* byte count from offset */
213 bool write_request; /* false for read */ 250 unsigned long flags;
214 union { 251 union {
252 u64 snap_id; /* for reads */
215 struct ceph_snap_context *snapc; /* for writes */ 253 struct ceph_snap_context *snapc; /* for writes */
216 u64 snap_id; /* for reads */
217 }; 254 };
255 union {
256 struct request *rq; /* block request */
257 struct rbd_obj_request *obj_request; /* obj req initiator */
258 };
259 struct page **copyup_pages;
218 spinlock_t completion_lock;/* protects next_completion */ 260 spinlock_t completion_lock;/* protects next_completion */
219 u32 next_completion; 261 u32 next_completion;
220 rbd_img_callback_t callback; 262 rbd_img_callback_t callback;
263 u64 xferred;/* aggregate bytes transferred */
264 int result; /* first nonzero obj_request result */
221 265
222 u32 obj_request_count; 266 u32 obj_request_count;
223 struct list_head obj_requests; /* rbd_obj_request structs */ 267 struct list_head obj_requests; /* rbd_obj_request structs */
@@ -232,15 +276,6 @@ struct rbd_img_request {
232#define for_each_obj_request_safe(ireq, oreq, n) \ 276#define for_each_obj_request_safe(ireq, oreq, n) \
233 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) 277 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
234 278
235struct rbd_snap {
236 struct device dev;
237 const char *name;
238 u64 size;
239 struct list_head node;
240 u64 id;
241 u64 features;
242};
243
244struct rbd_mapping { 279struct rbd_mapping {
245 u64 size; 280 u64 size;
246 u64 features; 281 u64 features;
@@ -276,6 +311,7 @@ struct rbd_device {
276 311
277 struct rbd_spec *parent_spec; 312 struct rbd_spec *parent_spec;
278 u64 parent_overlap; 313 u64 parent_overlap;
314 struct rbd_device *parent;
279 315
280 /* protects updating the header */ 316 /* protects updating the header */
281 struct rw_semaphore header_rwsem; 317 struct rw_semaphore header_rwsem;
@@ -284,9 +320,6 @@ struct rbd_device {
284 320
285 struct list_head node; 321 struct list_head node;
286 322
287 /* list of snapshots */
288 struct list_head snaps;
289
290 /* sysfs related */ 323 /* sysfs related */
291 struct device dev; 324 struct device dev;
292 unsigned long open_count; /* protected by lock */ 325 unsigned long open_count; /* protected by lock */
@@ -312,16 +345,21 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock);
312static LIST_HEAD(rbd_client_list); /* clients */ 345static LIST_HEAD(rbd_client_list); /* clients */
313static DEFINE_SPINLOCK(rbd_client_list_lock); 346static DEFINE_SPINLOCK(rbd_client_list_lock);
314 347
315static int rbd_dev_snaps_update(struct rbd_device *rbd_dev); 348/* Slab caches for frequently-allocated structures */
316static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); 349
350static struct kmem_cache *rbd_img_request_cache;
351static struct kmem_cache *rbd_obj_request_cache;
352static struct kmem_cache *rbd_segment_name_cache;
317 353
318static void rbd_dev_release(struct device *dev); 354static int rbd_img_request_submit(struct rbd_img_request *img_request);
319static void rbd_remove_snap_dev(struct rbd_snap *snap); 355
356static void rbd_dev_device_release(struct device *dev);
320 357
321static ssize_t rbd_add(struct bus_type *bus, const char *buf, 358static ssize_t rbd_add(struct bus_type *bus, const char *buf,
322 size_t count); 359 size_t count);
323static ssize_t rbd_remove(struct bus_type *bus, const char *buf, 360static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
324 size_t count); 361 size_t count);
362static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
325 363
326static struct bus_attribute rbd_bus_attrs[] = { 364static struct bus_attribute rbd_bus_attrs[] = {
327 __ATTR(add, S_IWUSR, NULL, rbd_add), 365 __ATTR(add, S_IWUSR, NULL, rbd_add),
@@ -383,8 +421,19 @@ void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
383# define rbd_assert(expr) ((void) 0) 421# define rbd_assert(expr) ((void) 0)
384#endif /* !RBD_DEBUG */ 422#endif /* !RBD_DEBUG */
385 423
386static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver); 424static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
387static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver); 425static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
426static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
427
428static int rbd_dev_refresh(struct rbd_device *rbd_dev);
429static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
430static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
431 u64 snap_id);
432static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
433 u8 *order, u64 *snap_size);
434static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
435 u64 *snap_features);
436static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
388 437
389static int rbd_open(struct block_device *bdev, fmode_t mode) 438static int rbd_open(struct block_device *bdev, fmode_t mode)
390{ 439{
@@ -484,6 +533,13 @@ out_opt:
484 return ERR_PTR(ret); 533 return ERR_PTR(ret);
485} 534}
486 535
536static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
537{
538 kref_get(&rbdc->kref);
539
540 return rbdc;
541}
542
487/* 543/*
488 * Find a ceph client with specific addr and configuration. If 544 * Find a ceph client with specific addr and configuration. If
489 * found, bump its reference count. 545 * found, bump its reference count.
@@ -499,7 +555,8 @@ static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
499 spin_lock(&rbd_client_list_lock); 555 spin_lock(&rbd_client_list_lock);
500 list_for_each_entry(client_node, &rbd_client_list, node) { 556 list_for_each_entry(client_node, &rbd_client_list, node) {
501 if (!ceph_compare_options(ceph_opts, client_node->client)) { 557 if (!ceph_compare_options(ceph_opts, client_node->client)) {
502 kref_get(&client_node->kref); 558 __rbd_get_client(client_node);
559
503 found = true; 560 found = true;
504 break; 561 break;
505 } 562 }
@@ -722,7 +779,6 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
722 header->snap_sizes[i] = 779 header->snap_sizes[i] =
723 le64_to_cpu(ondisk->snaps[i].image_size); 780 le64_to_cpu(ondisk->snaps[i].image_size);
724 } else { 781 } else {
725 WARN_ON(ondisk->snap_names_len);
726 header->snap_names = NULL; 782 header->snap_names = NULL;
727 header->snap_sizes = NULL; 783 header->snap_sizes = NULL;
728 } 784 }
@@ -735,18 +791,13 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
735 /* Allocate and fill in the snapshot context */ 791 /* Allocate and fill in the snapshot context */
736 792
737 header->image_size = le64_to_cpu(ondisk->image_size); 793 header->image_size = le64_to_cpu(ondisk->image_size);
738 size = sizeof (struct ceph_snap_context); 794
739 size += snap_count * sizeof (header->snapc->snaps[0]); 795 header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
740 header->snapc = kzalloc(size, GFP_KERNEL);
741 if (!header->snapc) 796 if (!header->snapc)
742 goto out_err; 797 goto out_err;
743
744 atomic_set(&header->snapc->nref, 1);
745 header->snapc->seq = le64_to_cpu(ondisk->snap_seq); 798 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
746 header->snapc->num_snaps = snap_count;
747 for (i = 0; i < snap_count; i++) 799 for (i = 0; i < snap_count; i++)
748 header->snapc->snaps[i] = 800 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
749 le64_to_cpu(ondisk->snaps[i].id);
750 801
751 return 0; 802 return 0;
752 803
@@ -761,70 +812,174 @@ out_err:
761 return -ENOMEM; 812 return -ENOMEM;
762} 813}
763 814
764static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) 815static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
765{ 816{
766 struct rbd_snap *snap; 817 const char *snap_name;
767 818
819 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
820
821 /* Skip over names until we find the one we are looking for */
822
823 snap_name = rbd_dev->header.snap_names;
824 while (which--)
825 snap_name += strlen(snap_name) + 1;
826
827 return kstrdup(snap_name, GFP_KERNEL);
828}
829
830/*
831 * Snapshot id comparison function for use with qsort()/bsearch().
832 * Note that result is for snapshots in *descending* order.
833 */
834static int snapid_compare_reverse(const void *s1, const void *s2)
835{
836 u64 snap_id1 = *(u64 *)s1;
837 u64 snap_id2 = *(u64 *)s2;
838
839 if (snap_id1 < snap_id2)
840 return 1;
841 return snap_id1 == snap_id2 ? 0 : -1;
842}
843
844/*
845 * Search a snapshot context to see if the given snapshot id is
846 * present.
847 *
848 * Returns the position of the snapshot id in the array if it's found,
849 * or BAD_SNAP_INDEX otherwise.
850 *
851 * Note: The snapshot array is in kept sorted (by the osd) in
852 * reverse order, highest snapshot id first.
853 */
854static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
855{
856 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
857 u64 *found;
858
859 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
860 sizeof (snap_id), snapid_compare_reverse);
861
862 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
863}
864
865static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
866 u64 snap_id)
867{
868 u32 which;
869
870 which = rbd_dev_snap_index(rbd_dev, snap_id);
871 if (which == BAD_SNAP_INDEX)
872 return NULL;
873
874 return _rbd_dev_v1_snap_name(rbd_dev, which);
875}
876
877static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
878{
768 if (snap_id == CEPH_NOSNAP) 879 if (snap_id == CEPH_NOSNAP)
769 return RBD_SNAP_HEAD_NAME; 880 return RBD_SNAP_HEAD_NAME;
770 881
771 list_for_each_entry(snap, &rbd_dev->snaps, node) 882 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
772 if (snap_id == snap->id) 883 if (rbd_dev->image_format == 1)
773 return snap->name; 884 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
774 885
775 return NULL; 886 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
776} 887}
777 888
778static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name) 889static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
890 u64 *snap_size)
779{ 891{
892 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
893 if (snap_id == CEPH_NOSNAP) {
894 *snap_size = rbd_dev->header.image_size;
895 } else if (rbd_dev->image_format == 1) {
896 u32 which;
780 897
781 struct rbd_snap *snap; 898 which = rbd_dev_snap_index(rbd_dev, snap_id);
899 if (which == BAD_SNAP_INDEX)
900 return -ENOENT;
782 901
783 list_for_each_entry(snap, &rbd_dev->snaps, node) { 902 *snap_size = rbd_dev->header.snap_sizes[which];
784 if (!strcmp(snap_name, snap->name)) { 903 } else {
785 rbd_dev->spec->snap_id = snap->id; 904 u64 size = 0;
786 rbd_dev->mapping.size = snap->size; 905 int ret;
787 rbd_dev->mapping.features = snap->features;
788 906
789 return 0; 907 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
790 } 908 if (ret)
909 return ret;
910
911 *snap_size = size;
791 } 912 }
913 return 0;
914}
915
916static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
917 u64 *snap_features)
918{
919 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
920 if (snap_id == CEPH_NOSNAP) {
921 *snap_features = rbd_dev->header.features;
922 } else if (rbd_dev->image_format == 1) {
923 *snap_features = 0; /* No features for format 1 */
924 } else {
925 u64 features = 0;
926 int ret;
927
928 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
929 if (ret)
930 return ret;
792 931
793 return -ENOENT; 932 *snap_features = features;
933 }
934 return 0;
794} 935}
795 936
796static int rbd_dev_set_mapping(struct rbd_device *rbd_dev) 937static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
797{ 938{
939 const char *snap_name = rbd_dev->spec->snap_name;
940 u64 snap_id;
941 u64 size = 0;
942 u64 features = 0;
798 int ret; 943 int ret;
799 944
800 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME, 945 if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) {
801 sizeof (RBD_SNAP_HEAD_NAME))) { 946 snap_id = rbd_snap_id_by_name(rbd_dev, snap_name);
802 rbd_dev->spec->snap_id = CEPH_NOSNAP; 947 if (snap_id == CEPH_NOSNAP)
803 rbd_dev->mapping.size = rbd_dev->header.image_size; 948 return -ENOENT;
804 rbd_dev->mapping.features = rbd_dev->header.features;
805 ret = 0;
806 } else { 949 } else {
807 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name); 950 snap_id = CEPH_NOSNAP;
808 if (ret < 0)
809 goto done;
810 rbd_dev->mapping.read_only = true;
811 } 951 }
812 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
813 952
814done: 953 ret = rbd_snap_size(rbd_dev, snap_id, &size);
815 return ret; 954 if (ret)
955 return ret;
956 ret = rbd_snap_features(rbd_dev, snap_id, &features);
957 if (ret)
958 return ret;
959
960 rbd_dev->mapping.size = size;
961 rbd_dev->mapping.features = features;
962
963 /* If we are mapping a snapshot it must be marked read-only */
964
965 if (snap_id != CEPH_NOSNAP)
966 rbd_dev->mapping.read_only = true;
967
968 return 0;
816} 969}
817 970
818static void rbd_header_free(struct rbd_image_header *header) 971static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
819{ 972{
820 kfree(header->object_prefix); 973 rbd_dev->mapping.size = 0;
821 header->object_prefix = NULL; 974 rbd_dev->mapping.features = 0;
822 kfree(header->snap_sizes); 975 rbd_dev->mapping.read_only = true;
823 header->snap_sizes = NULL; 976}
824 kfree(header->snap_names); 977
825 header->snap_names = NULL; 978static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
826 ceph_put_snap_context(header->snapc); 979{
827 header->snapc = NULL; 980 rbd_dev->mapping.size = 0;
981 rbd_dev->mapping.features = 0;
982 rbd_dev->mapping.read_only = true;
828} 983}
829 984
830static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) 985static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
@@ -833,7 +988,7 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
833 u64 segment; 988 u64 segment;
834 int ret; 989 int ret;
835 990
836 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO); 991 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
837 if (!name) 992 if (!name)
838 return NULL; 993 return NULL;
839 segment = offset >> rbd_dev->header.obj_order; 994 segment = offset >> rbd_dev->header.obj_order;
@@ -849,6 +1004,13 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
849 return name; 1004 return name;
850} 1005}
851 1006
1007static void rbd_segment_name_free(const char *name)
1008{
1009 /* The explicit cast here is needed to drop the const qualifier */
1010
1011 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1012}
1013
852static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) 1014static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
853{ 1015{
854 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; 1016 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
@@ -921,6 +1083,37 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
921} 1083}
922 1084
923/* 1085/*
1086 * similar to zero_bio_chain(), zeros data defined by a page array,
1087 * starting at the given byte offset from the start of the array and
1088 * continuing up to the given end offset. The pages array is
1089 * assumed to be big enough to hold all bytes up to the end.
1090 */
1091static void zero_pages(struct page **pages, u64 offset, u64 end)
1092{
1093 struct page **page = &pages[offset >> PAGE_SHIFT];
1094
1095 rbd_assert(end > offset);
1096 rbd_assert(end - offset <= (u64)SIZE_MAX);
1097 while (offset < end) {
1098 size_t page_offset;
1099 size_t length;
1100 unsigned long flags;
1101 void *kaddr;
1102
1103 page_offset = (size_t)(offset & ~PAGE_MASK);
1104 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1105 local_irq_save(flags);
1106 kaddr = kmap_atomic(*page);
1107 memset(kaddr + page_offset, 0, length);
1108 kunmap_atomic(kaddr);
1109 local_irq_restore(flags);
1110
1111 offset += length;
1112 page++;
1113 }
1114}
1115
1116/*
924 * Clone a portion of a bio, starting at the given byte offset 1117 * Clone a portion of a bio, starting at the given byte offset
925 * and continuing for the number of bytes indicated. 1118 * and continuing for the number of bytes indicated.
926 */ 1119 */
@@ -1064,6 +1257,77 @@ out_err:
1064 return NULL; 1257 return NULL;
1065} 1258}
1066 1259
1260/*
1261 * The default/initial value for all object request flags is 0. For
1262 * each flag, once its value is set to 1 it is never reset to 0
1263 * again.
1264 */
1265static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1266{
1267 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1268 struct rbd_device *rbd_dev;
1269
1270 rbd_dev = obj_request->img_request->rbd_dev;
1271 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1272 obj_request);
1273 }
1274}
1275
1276static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1277{
1278 smp_mb();
1279 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1280}
1281
1282static void obj_request_done_set(struct rbd_obj_request *obj_request)
1283{
1284 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1285 struct rbd_device *rbd_dev = NULL;
1286
1287 if (obj_request_img_data_test(obj_request))
1288 rbd_dev = obj_request->img_request->rbd_dev;
1289 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1290 obj_request);
1291 }
1292}
1293
1294static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1295{
1296 smp_mb();
1297 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1298}
1299
1300/*
1301 * This sets the KNOWN flag after (possibly) setting the EXISTS
1302 * flag. The latter is set based on the "exists" value provided.
1303 *
1304 * Note that for our purposes once an object exists it never goes
1305 * away again. It's possible that the response from two existence
1306 * checks are separated by the creation of the target object, and
1307 * the first ("doesn't exist") response arrives *after* the second
1308 * ("does exist"). In that case we ignore the second one.
1309 */
1310static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1311 bool exists)
1312{
1313 if (exists)
1314 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1315 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1316 smp_mb();
1317}
1318
1319static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1320{
1321 smp_mb();
1322 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1323}
1324
1325static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1326{
1327 smp_mb();
1328 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1329}
1330
1067static void rbd_obj_request_get(struct rbd_obj_request *obj_request) 1331static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1068{ 1332{
1069 dout("%s: obj %p (was %d)\n", __func__, obj_request, 1333 dout("%s: obj %p (was %d)\n", __func__, obj_request,
@@ -1101,9 +1365,11 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1101{ 1365{
1102 rbd_assert(obj_request->img_request == NULL); 1366 rbd_assert(obj_request->img_request == NULL);
1103 1367
1104 rbd_obj_request_get(obj_request); 1368 /* Image request now owns object's original reference */
1105 obj_request->img_request = img_request; 1369 obj_request->img_request = img_request;
1106 obj_request->which = img_request->obj_request_count; 1370 obj_request->which = img_request->obj_request_count;
1371 rbd_assert(!obj_request_img_data_test(obj_request));
1372 obj_request_img_data_set(obj_request);
1107 rbd_assert(obj_request->which != BAD_WHICH); 1373 rbd_assert(obj_request->which != BAD_WHICH);
1108 img_request->obj_request_count++; 1374 img_request->obj_request_count++;
1109 list_add_tail(&obj_request->links, &img_request->obj_requests); 1375 list_add_tail(&obj_request->links, &img_request->obj_requests);
@@ -1123,6 +1389,7 @@ static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1123 img_request->obj_request_count--; 1389 img_request->obj_request_count--;
1124 rbd_assert(obj_request->which == img_request->obj_request_count); 1390 rbd_assert(obj_request->which == img_request->obj_request_count);
1125 obj_request->which = BAD_WHICH; 1391 obj_request->which = BAD_WHICH;
1392 rbd_assert(obj_request_img_data_test(obj_request));
1126 rbd_assert(obj_request->img_request == img_request); 1393 rbd_assert(obj_request->img_request == img_request);
1127 obj_request->img_request = NULL; 1394 obj_request->img_request = NULL;
1128 obj_request->callback = NULL; 1395 obj_request->callback = NULL;
@@ -1141,76 +1408,6 @@ static bool obj_request_type_valid(enum obj_request_type type)
1141 } 1408 }
1142} 1409}
1143 1410
1144static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1145{
1146 struct ceph_osd_req_op *op;
1147 va_list args;
1148 size_t size;
1149
1150 op = kzalloc(sizeof (*op), GFP_NOIO);
1151 if (!op)
1152 return NULL;
1153 op->op = opcode;
1154 va_start(args, opcode);
1155 switch (opcode) {
1156 case CEPH_OSD_OP_READ:
1157 case CEPH_OSD_OP_WRITE:
1158 /* rbd_osd_req_op_create(READ, offset, length) */
1159 /* rbd_osd_req_op_create(WRITE, offset, length) */
1160 op->extent.offset = va_arg(args, u64);
1161 op->extent.length = va_arg(args, u64);
1162 if (opcode == CEPH_OSD_OP_WRITE)
1163 op->payload_len = op->extent.length;
1164 break;
1165 case CEPH_OSD_OP_STAT:
1166 break;
1167 case CEPH_OSD_OP_CALL:
1168 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1169 op->cls.class_name = va_arg(args, char *);
1170 size = strlen(op->cls.class_name);
1171 rbd_assert(size <= (size_t) U8_MAX);
1172 op->cls.class_len = size;
1173 op->payload_len = size;
1174
1175 op->cls.method_name = va_arg(args, char *);
1176 size = strlen(op->cls.method_name);
1177 rbd_assert(size <= (size_t) U8_MAX);
1178 op->cls.method_len = size;
1179 op->payload_len += size;
1180
1181 op->cls.argc = 0;
1182 op->cls.indata = va_arg(args, void *);
1183 size = va_arg(args, size_t);
1184 rbd_assert(size <= (size_t) U32_MAX);
1185 op->cls.indata_len = (u32) size;
1186 op->payload_len += size;
1187 break;
1188 case CEPH_OSD_OP_NOTIFY_ACK:
1189 case CEPH_OSD_OP_WATCH:
1190 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1191 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1192 op->watch.cookie = va_arg(args, u64);
1193 op->watch.ver = va_arg(args, u64);
1194 op->watch.ver = cpu_to_le64(op->watch.ver);
1195 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1196 op->watch.flag = (u8) 1;
1197 break;
1198 default:
1199 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1200 kfree(op);
1201 op = NULL;
1202 break;
1203 }
1204 va_end(args);
1205
1206 return op;
1207}
1208
1209static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1210{
1211 kfree(op);
1212}
1213
1214static int rbd_obj_request_submit(struct ceph_osd_client *osdc, 1411static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1215 struct rbd_obj_request *obj_request) 1412 struct rbd_obj_request *obj_request)
1216{ 1413{
@@ -1221,7 +1418,24 @@ static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1221 1418
1222static void rbd_img_request_complete(struct rbd_img_request *img_request) 1419static void rbd_img_request_complete(struct rbd_img_request *img_request)
1223{ 1420{
1421
1224 dout("%s: img %p\n", __func__, img_request); 1422 dout("%s: img %p\n", __func__, img_request);
1423
1424 /*
1425 * If no error occurred, compute the aggregate transfer
1426 * count for the image request. We could instead use
1427 * atomic64_cmpxchg() to update it as each object request
1428 * completes; not clear which way is better off hand.
1429 */
1430 if (!img_request->result) {
1431 struct rbd_obj_request *obj_request;
1432 u64 xferred = 0;
1433
1434 for_each_obj_request(img_request, obj_request)
1435 xferred += obj_request->xferred;
1436 img_request->xferred = xferred;
1437 }
1438
1225 if (img_request->callback) 1439 if (img_request->callback)
1226 img_request->callback(img_request); 1440 img_request->callback(img_request);
1227 else 1441 else
@@ -1237,39 +1451,56 @@ static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1237 return wait_for_completion_interruptible(&obj_request->completion); 1451 return wait_for_completion_interruptible(&obj_request->completion);
1238} 1452}
1239 1453
1240static void obj_request_done_init(struct rbd_obj_request *obj_request) 1454/*
1455 * The default/initial value for all image request flags is 0. Each
1456 * is conditionally set to 1 at image request initialization time
1457 * and currently never change thereafter.
1458 */
1459static void img_request_write_set(struct rbd_img_request *img_request)
1241{ 1460{
1242 atomic_set(&obj_request->done, 0); 1461 set_bit(IMG_REQ_WRITE, &img_request->flags);
1243 smp_wmb(); 1462 smp_mb();
1244} 1463}
1245 1464
1246static void obj_request_done_set(struct rbd_obj_request *obj_request) 1465static bool img_request_write_test(struct rbd_img_request *img_request)
1247{ 1466{
1248 int done; 1467 smp_mb();
1468 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1469}
1249 1470
1250 done = atomic_inc_return(&obj_request->done); 1471static void img_request_child_set(struct rbd_img_request *img_request)
1251 if (done > 1) { 1472{
1252 struct rbd_img_request *img_request = obj_request->img_request; 1473 set_bit(IMG_REQ_CHILD, &img_request->flags);
1253 struct rbd_device *rbd_dev; 1474 smp_mb();
1475}
1254 1476
1255 rbd_dev = img_request ? img_request->rbd_dev : NULL; 1477static bool img_request_child_test(struct rbd_img_request *img_request)
1256 rbd_warn(rbd_dev, "obj_request %p was already done\n", 1478{
1257 obj_request); 1479 smp_mb();
1258 } 1480 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1259} 1481}
1260 1482
1261static bool obj_request_done_test(struct rbd_obj_request *obj_request) 1483static void img_request_layered_set(struct rbd_img_request *img_request)
1484{
1485 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1486 smp_mb();
1487}
1488
1489static bool img_request_layered_test(struct rbd_img_request *img_request)
1262{ 1490{
1263 smp_mb(); 1491 smp_mb();
1264 return atomic_read(&obj_request->done) != 0; 1492 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1265} 1493}
1266 1494
1267static void 1495static void
1268rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) 1496rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1269{ 1497{
1498 u64 xferred = obj_request->xferred;
1499 u64 length = obj_request->length;
1500
1270 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, 1501 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1271 obj_request, obj_request->img_request, obj_request->result, 1502 obj_request, obj_request->img_request, obj_request->result,
1272 obj_request->xferred, obj_request->length); 1503 xferred, length);
1273 /* 1504 /*
1274 * ENOENT means a hole in the image. We zero-fill the 1505 * ENOENT means a hole in the image. We zero-fill the
1275 * entire length of the request. A short read also implies 1506 * entire length of the request. A short read also implies
@@ -1277,15 +1508,20 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1277 * update the xferred count to indicate the whole request 1508 * update the xferred count to indicate the whole request
1278 * was satisfied. 1509 * was satisfied.
1279 */ 1510 */
1280 BUG_ON(obj_request->type != OBJ_REQUEST_BIO); 1511 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1281 if (obj_request->result == -ENOENT) { 1512 if (obj_request->result == -ENOENT) {
1282 zero_bio_chain(obj_request->bio_list, 0); 1513 if (obj_request->type == OBJ_REQUEST_BIO)
1514 zero_bio_chain(obj_request->bio_list, 0);
1515 else
1516 zero_pages(obj_request->pages, 0, length);
1283 obj_request->result = 0; 1517 obj_request->result = 0;
1284 obj_request->xferred = obj_request->length; 1518 obj_request->xferred = length;
1285 } else if (obj_request->xferred < obj_request->length && 1519 } else if (xferred < length && !obj_request->result) {
1286 !obj_request->result) { 1520 if (obj_request->type == OBJ_REQUEST_BIO)
1287 zero_bio_chain(obj_request->bio_list, obj_request->xferred); 1521 zero_bio_chain(obj_request->bio_list, xferred);
1288 obj_request->xferred = obj_request->length; 1522 else
1523 zero_pages(obj_request->pages, xferred, length);
1524 obj_request->xferred = length;
1289 } 1525 }
1290 obj_request_done_set(obj_request); 1526 obj_request_done_set(obj_request);
1291} 1527}
@@ -1308,9 +1544,23 @@ static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1308 1544
1309static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) 1545static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1310{ 1546{
1311 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request, 1547 struct rbd_img_request *img_request = NULL;
1312 obj_request->result, obj_request->xferred, obj_request->length); 1548 struct rbd_device *rbd_dev = NULL;
1313 if (obj_request->img_request) 1549 bool layered = false;
1550
1551 if (obj_request_img_data_test(obj_request)) {
1552 img_request = obj_request->img_request;
1553 layered = img_request && img_request_layered_test(img_request);
1554 rbd_dev = img_request->rbd_dev;
1555 }
1556
1557 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1558 obj_request, img_request, obj_request->result,
1559 obj_request->xferred, obj_request->length);
1560 if (layered && obj_request->result == -ENOENT &&
1561 obj_request->img_offset < rbd_dev->parent_overlap)
1562 rbd_img_parent_read(obj_request);
1563 else if (img_request)
1314 rbd_img_obj_request_read_callback(obj_request); 1564 rbd_img_obj_request_read_callback(obj_request);
1315 else 1565 else
1316 obj_request_done_set(obj_request); 1566 obj_request_done_set(obj_request);
@@ -1321,9 +1571,8 @@ static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1321 dout("%s: obj %p result %d %llu\n", __func__, obj_request, 1571 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1322 obj_request->result, obj_request->length); 1572 obj_request->result, obj_request->length);
1323 /* 1573 /*
1324 * There is no such thing as a successful short write. 1574 * There is no such thing as a successful short write. Set
1325 * Our xferred value is the number of bytes transferred 1575 * it to our originally-requested length.
1326 * back. Set it to our originally-requested length.
1327 */ 1576 */
1328 obj_request->xferred = obj_request->length; 1577 obj_request->xferred = obj_request->length;
1329 obj_request_done_set(obj_request); 1578 obj_request_done_set(obj_request);
@@ -1347,22 +1596,25 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1347 1596
1348 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg); 1597 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1349 rbd_assert(osd_req == obj_request->osd_req); 1598 rbd_assert(osd_req == obj_request->osd_req);
1350 rbd_assert(!!obj_request->img_request ^ 1599 if (obj_request_img_data_test(obj_request)) {
1351 (obj_request->which == BAD_WHICH)); 1600 rbd_assert(obj_request->img_request);
1601 rbd_assert(obj_request->which != BAD_WHICH);
1602 } else {
1603 rbd_assert(obj_request->which == BAD_WHICH);
1604 }
1352 1605
1353 if (osd_req->r_result < 0) 1606 if (osd_req->r_result < 0)
1354 obj_request->result = osd_req->r_result; 1607 obj_request->result = osd_req->r_result;
1355 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1356 1608
1357 WARN_ON(osd_req->r_num_ops != 1); /* For now */ 1609 BUG_ON(osd_req->r_num_ops > 2);
1358 1610
1359 /* 1611 /*
1360 * We support a 64-bit length, but ultimately it has to be 1612 * We support a 64-bit length, but ultimately it has to be
1361 * passed to blk_end_request(), which takes an unsigned int. 1613 * passed to blk_end_request(), which takes an unsigned int.
1362 */ 1614 */
1363 obj_request->xferred = osd_req->r_reply_op_len[0]; 1615 obj_request->xferred = osd_req->r_reply_op_len[0];
1364 rbd_assert(obj_request->xferred < (u64) UINT_MAX); 1616 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1365 opcode = osd_req->r_request_ops[0].op; 1617 opcode = osd_req->r_ops[0].op;
1366 switch (opcode) { 1618 switch (opcode) {
1367 case CEPH_OSD_OP_READ: 1619 case CEPH_OSD_OP_READ:
1368 rbd_osd_read_callback(obj_request); 1620 rbd_osd_read_callback(obj_request);
@@ -1388,28 +1640,49 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1388 rbd_obj_request_complete(obj_request); 1640 rbd_obj_request_complete(obj_request);
1389} 1641}
1390 1642
1643static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1644{
1645 struct rbd_img_request *img_request = obj_request->img_request;
1646 struct ceph_osd_request *osd_req = obj_request->osd_req;
1647 u64 snap_id;
1648
1649 rbd_assert(osd_req != NULL);
1650
1651 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1652 ceph_osdc_build_request(osd_req, obj_request->offset,
1653 NULL, snap_id, NULL);
1654}
1655
1656static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1657{
1658 struct rbd_img_request *img_request = obj_request->img_request;
1659 struct ceph_osd_request *osd_req = obj_request->osd_req;
1660 struct ceph_snap_context *snapc;
1661 struct timespec mtime = CURRENT_TIME;
1662
1663 rbd_assert(osd_req != NULL);
1664
1665 snapc = img_request ? img_request->snapc : NULL;
1666 ceph_osdc_build_request(osd_req, obj_request->offset,
1667 snapc, CEPH_NOSNAP, &mtime);
1668}
1669
1391static struct ceph_osd_request *rbd_osd_req_create( 1670static struct ceph_osd_request *rbd_osd_req_create(
1392 struct rbd_device *rbd_dev, 1671 struct rbd_device *rbd_dev,
1393 bool write_request, 1672 bool write_request,
1394 struct rbd_obj_request *obj_request, 1673 struct rbd_obj_request *obj_request)
1395 struct ceph_osd_req_op *op)
1396{ 1674{
1397 struct rbd_img_request *img_request = obj_request->img_request;
1398 struct ceph_snap_context *snapc = NULL; 1675 struct ceph_snap_context *snapc = NULL;
1399 struct ceph_osd_client *osdc; 1676 struct ceph_osd_client *osdc;
1400 struct ceph_osd_request *osd_req; 1677 struct ceph_osd_request *osd_req;
1401 struct timespec now;
1402 struct timespec *mtime;
1403 u64 snap_id = CEPH_NOSNAP;
1404 u64 offset = obj_request->offset;
1405 u64 length = obj_request->length;
1406 1678
1407 if (img_request) { 1679 if (obj_request_img_data_test(obj_request)) {
1408 rbd_assert(img_request->write_request == write_request); 1680 struct rbd_img_request *img_request = obj_request->img_request;
1409 if (img_request->write_request) 1681
1682 rbd_assert(write_request ==
1683 img_request_write_test(img_request));
1684 if (write_request)
1410 snapc = img_request->snapc; 1685 snapc = img_request->snapc;
1411 else
1412 snap_id = img_request->snap_id;
1413 } 1686 }
1414 1687
1415 /* Allocate and initialize the request, for the single op */ 1688 /* Allocate and initialize the request, for the single op */
@@ -1419,31 +1692,10 @@ static struct ceph_osd_request *rbd_osd_req_create(
1419 if (!osd_req) 1692 if (!osd_req)
1420 return NULL; /* ENOMEM */ 1693 return NULL; /* ENOMEM */
1421 1694
1422 rbd_assert(obj_request_type_valid(obj_request->type)); 1695 if (write_request)
1423 switch (obj_request->type) {
1424 case OBJ_REQUEST_NODATA:
1425 break; /* Nothing to do */
1426 case OBJ_REQUEST_BIO:
1427 rbd_assert(obj_request->bio_list != NULL);
1428 osd_req->r_bio = obj_request->bio_list;
1429 break;
1430 case OBJ_REQUEST_PAGES:
1431 osd_req->r_pages = obj_request->pages;
1432 osd_req->r_num_pages = obj_request->page_count;
1433 osd_req->r_page_alignment = offset & ~PAGE_MASK;
1434 break;
1435 }
1436
1437 if (write_request) {
1438 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; 1696 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1439 now = CURRENT_TIME; 1697 else
1440 mtime = &now;
1441 } else {
1442 osd_req->r_flags = CEPH_OSD_FLAG_READ; 1698 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1443 mtime = NULL; /* not needed for reads */
1444 offset = 0; /* These are not used... */
1445 length = 0; /* ...for osd read requests */
1446 }
1447 1699
1448 osd_req->r_callback = rbd_osd_req_callback; 1700 osd_req->r_callback = rbd_osd_req_callback;
1449 osd_req->r_priv = obj_request; 1701 osd_req->r_priv = obj_request;
@@ -1454,14 +1706,51 @@ static struct ceph_osd_request *rbd_osd_req_create(
1454 1706
1455 osd_req->r_file_layout = rbd_dev->layout; /* struct */ 1707 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1456 1708
1457 /* osd_req will get its own reference to snapc (if non-null) */ 1709 return osd_req;
1710}
1458 1711
1459 ceph_osdc_build_request(osd_req, offset, length, 1, op, 1712/*
1460 snapc, snap_id, mtime); 1713 * Create a copyup osd request based on the information in the
1714 * object request supplied. A copyup request has two osd ops,
1715 * a copyup method call, and a "normal" write request.
1716 */
1717static struct ceph_osd_request *
1718rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1719{
1720 struct rbd_img_request *img_request;
1721 struct ceph_snap_context *snapc;
1722 struct rbd_device *rbd_dev;
1723 struct ceph_osd_client *osdc;
1724 struct ceph_osd_request *osd_req;
1725
1726 rbd_assert(obj_request_img_data_test(obj_request));
1727 img_request = obj_request->img_request;
1728 rbd_assert(img_request);
1729 rbd_assert(img_request_write_test(img_request));
1730
1731 /* Allocate and initialize the request, for the two ops */
1732
1733 snapc = img_request->snapc;
1734 rbd_dev = img_request->rbd_dev;
1735 osdc = &rbd_dev->rbd_client->client->osdc;
1736 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1737 if (!osd_req)
1738 return NULL; /* ENOMEM */
1739
1740 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1741 osd_req->r_callback = rbd_osd_req_callback;
1742 osd_req->r_priv = obj_request;
1743
1744 osd_req->r_oid_len = strlen(obj_request->object_name);
1745 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1746 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1747
1748 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1461 1749
1462 return osd_req; 1750 return osd_req;
1463} 1751}
1464 1752
1753
1465static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) 1754static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1466{ 1755{
1467 ceph_osdc_put_request(osd_req); 1756 ceph_osdc_put_request(osd_req);
@@ -1480,18 +1769,23 @@ static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1480 rbd_assert(obj_request_type_valid(type)); 1769 rbd_assert(obj_request_type_valid(type));
1481 1770
1482 size = strlen(object_name) + 1; 1771 size = strlen(object_name) + 1;
1483 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL); 1772 name = kmalloc(size, GFP_KERNEL);
1484 if (!obj_request) 1773 if (!name)
1485 return NULL; 1774 return NULL;
1486 1775
1487 name = (char *)(obj_request + 1); 1776 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1777 if (!obj_request) {
1778 kfree(name);
1779 return NULL;
1780 }
1781
1488 obj_request->object_name = memcpy(name, object_name, size); 1782 obj_request->object_name = memcpy(name, object_name, size);
1489 obj_request->offset = offset; 1783 obj_request->offset = offset;
1490 obj_request->length = length; 1784 obj_request->length = length;
1785 obj_request->flags = 0;
1491 obj_request->which = BAD_WHICH; 1786 obj_request->which = BAD_WHICH;
1492 obj_request->type = type; 1787 obj_request->type = type;
1493 INIT_LIST_HEAD(&obj_request->links); 1788 INIT_LIST_HEAD(&obj_request->links);
1494 obj_request_done_init(obj_request);
1495 init_completion(&obj_request->completion); 1789 init_completion(&obj_request->completion);
1496 kref_init(&obj_request->kref); 1790 kref_init(&obj_request->kref);
1497 1791
@@ -1530,7 +1824,9 @@ static void rbd_obj_request_destroy(struct kref *kref)
1530 break; 1824 break;
1531 } 1825 }
1532 1826
1533 kfree(obj_request); 1827 kfree(obj_request->object_name);
1828 obj_request->object_name = NULL;
1829 kmem_cache_free(rbd_obj_request_cache, obj_request);
1534} 1830}
1535 1831
1536/* 1832/*
@@ -1541,37 +1837,40 @@ static void rbd_obj_request_destroy(struct kref *kref)
1541static struct rbd_img_request *rbd_img_request_create( 1837static struct rbd_img_request *rbd_img_request_create(
1542 struct rbd_device *rbd_dev, 1838 struct rbd_device *rbd_dev,
1543 u64 offset, u64 length, 1839 u64 offset, u64 length,
1544 bool write_request) 1840 bool write_request,
1841 bool child_request)
1545{ 1842{
1546 struct rbd_img_request *img_request; 1843 struct rbd_img_request *img_request;
1547 struct ceph_snap_context *snapc = NULL;
1548 1844
1549 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC); 1845 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1550 if (!img_request) 1846 if (!img_request)
1551 return NULL; 1847 return NULL;
1552 1848
1553 if (write_request) { 1849 if (write_request) {
1554 down_read(&rbd_dev->header_rwsem); 1850 down_read(&rbd_dev->header_rwsem);
1555 snapc = ceph_get_snap_context(rbd_dev->header.snapc); 1851 ceph_get_snap_context(rbd_dev->header.snapc);
1556 up_read(&rbd_dev->header_rwsem); 1852 up_read(&rbd_dev->header_rwsem);
1557 if (WARN_ON(!snapc)) {
1558 kfree(img_request);
1559 return NULL; /* Shouldn't happen */
1560 }
1561 } 1853 }
1562 1854
1563 img_request->rq = NULL; 1855 img_request->rq = NULL;
1564 img_request->rbd_dev = rbd_dev; 1856 img_request->rbd_dev = rbd_dev;
1565 img_request->offset = offset; 1857 img_request->offset = offset;
1566 img_request->length = length; 1858 img_request->length = length;
1567 img_request->write_request = write_request; 1859 img_request->flags = 0;
1568 if (write_request) 1860 if (write_request) {
1569 img_request->snapc = snapc; 1861 img_request_write_set(img_request);
1570 else 1862 img_request->snapc = rbd_dev->header.snapc;
1863 } else {
1571 img_request->snap_id = rbd_dev->spec->snap_id; 1864 img_request->snap_id = rbd_dev->spec->snap_id;
1865 }
1866 if (child_request)
1867 img_request_child_set(img_request);
1868 if (rbd_dev->parent_spec)
1869 img_request_layered_set(img_request);
1572 spin_lock_init(&img_request->completion_lock); 1870 spin_lock_init(&img_request->completion_lock);
1573 img_request->next_completion = 0; 1871 img_request->next_completion = 0;
1574 img_request->callback = NULL; 1872 img_request->callback = NULL;
1873 img_request->result = 0;
1575 img_request->obj_request_count = 0; 1874 img_request->obj_request_count = 0;
1576 INIT_LIST_HEAD(&img_request->obj_requests); 1875 INIT_LIST_HEAD(&img_request->obj_requests);
1577 kref_init(&img_request->kref); 1876 kref_init(&img_request->kref);
@@ -1600,78 +1899,204 @@ static void rbd_img_request_destroy(struct kref *kref)
1600 rbd_img_obj_request_del(img_request, obj_request); 1899 rbd_img_obj_request_del(img_request, obj_request);
1601 rbd_assert(img_request->obj_request_count == 0); 1900 rbd_assert(img_request->obj_request_count == 0);
1602 1901
1603 if (img_request->write_request) 1902 if (img_request_write_test(img_request))
1604 ceph_put_snap_context(img_request->snapc); 1903 ceph_put_snap_context(img_request->snapc);
1605 1904
1606 kfree(img_request); 1905 if (img_request_child_test(img_request))
1906 rbd_obj_request_put(img_request->obj_request);
1907
1908 kmem_cache_free(rbd_img_request_cache, img_request);
1909}
1910
1911static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1912{
1913 struct rbd_img_request *img_request;
1914 unsigned int xferred;
1915 int result;
1916 bool more;
1917
1918 rbd_assert(obj_request_img_data_test(obj_request));
1919 img_request = obj_request->img_request;
1920
1921 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1922 xferred = (unsigned int)obj_request->xferred;
1923 result = obj_request->result;
1924 if (result) {
1925 struct rbd_device *rbd_dev = img_request->rbd_dev;
1926
1927 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1928 img_request_write_test(img_request) ? "write" : "read",
1929 obj_request->length, obj_request->img_offset,
1930 obj_request->offset);
1931 rbd_warn(rbd_dev, " result %d xferred %x\n",
1932 result, xferred);
1933 if (!img_request->result)
1934 img_request->result = result;
1935 }
1936
1937 /* Image object requests don't own their page array */
1938
1939 if (obj_request->type == OBJ_REQUEST_PAGES) {
1940 obj_request->pages = NULL;
1941 obj_request->page_count = 0;
1942 }
1943
1944 if (img_request_child_test(img_request)) {
1945 rbd_assert(img_request->obj_request != NULL);
1946 more = obj_request->which < img_request->obj_request_count - 1;
1947 } else {
1948 rbd_assert(img_request->rq != NULL);
1949 more = blk_end_request(img_request->rq, result, xferred);
1950 }
1951
1952 return more;
1607} 1953}
1608 1954
1609static int rbd_img_request_fill_bio(struct rbd_img_request *img_request, 1955static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1610 struct bio *bio_list) 1956{
1957 struct rbd_img_request *img_request;
1958 u32 which = obj_request->which;
1959 bool more = true;
1960
1961 rbd_assert(obj_request_img_data_test(obj_request));
1962 img_request = obj_request->img_request;
1963
1964 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1965 rbd_assert(img_request != NULL);
1966 rbd_assert(img_request->obj_request_count > 0);
1967 rbd_assert(which != BAD_WHICH);
1968 rbd_assert(which < img_request->obj_request_count);
1969 rbd_assert(which >= img_request->next_completion);
1970
1971 spin_lock_irq(&img_request->completion_lock);
1972 if (which != img_request->next_completion)
1973 goto out;
1974
1975 for_each_obj_request_from(img_request, obj_request) {
1976 rbd_assert(more);
1977 rbd_assert(which < img_request->obj_request_count);
1978
1979 if (!obj_request_done_test(obj_request))
1980 break;
1981 more = rbd_img_obj_end_request(obj_request);
1982 which++;
1983 }
1984
1985 rbd_assert(more ^ (which == img_request->obj_request_count));
1986 img_request->next_completion = which;
1987out:
1988 spin_unlock_irq(&img_request->completion_lock);
1989
1990 if (!more)
1991 rbd_img_request_complete(img_request);
1992}
1993
1994/*
1995 * Split up an image request into one or more object requests, each
1996 * to a different object. The "type" parameter indicates whether
1997 * "data_desc" is the pointer to the head of a list of bio
1998 * structures, or the base of a page array. In either case this
1999 * function assumes data_desc describes memory sufficient to hold
2000 * all data described by the image request.
2001 */
2002static int rbd_img_request_fill(struct rbd_img_request *img_request,
2003 enum obj_request_type type,
2004 void *data_desc)
1611{ 2005{
1612 struct rbd_device *rbd_dev = img_request->rbd_dev; 2006 struct rbd_device *rbd_dev = img_request->rbd_dev;
1613 struct rbd_obj_request *obj_request = NULL; 2007 struct rbd_obj_request *obj_request = NULL;
1614 struct rbd_obj_request *next_obj_request; 2008 struct rbd_obj_request *next_obj_request;
1615 unsigned int bio_offset; 2009 bool write_request = img_request_write_test(img_request);
1616 u64 image_offset; 2010 struct bio *bio_list;
2011 unsigned int bio_offset = 0;
2012 struct page **pages;
2013 u64 img_offset;
1617 u64 resid; 2014 u64 resid;
1618 u16 opcode; 2015 u16 opcode;
1619 2016
1620 dout("%s: img %p bio %p\n", __func__, img_request, bio_list); 2017 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2018 (int)type, data_desc);
1621 2019
1622 opcode = img_request->write_request ? CEPH_OSD_OP_WRITE 2020 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1623 : CEPH_OSD_OP_READ; 2021 img_offset = img_request->offset;
1624 bio_offset = 0;
1625 image_offset = img_request->offset;
1626 rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1627 resid = img_request->length; 2022 resid = img_request->length;
1628 rbd_assert(resid > 0); 2023 rbd_assert(resid > 0);
2024
2025 if (type == OBJ_REQUEST_BIO) {
2026 bio_list = data_desc;
2027 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2028 } else {
2029 rbd_assert(type == OBJ_REQUEST_PAGES);
2030 pages = data_desc;
2031 }
2032
1629 while (resid) { 2033 while (resid) {
2034 struct ceph_osd_request *osd_req;
1630 const char *object_name; 2035 const char *object_name;
1631 unsigned int clone_size;
1632 struct ceph_osd_req_op *op;
1633 u64 offset; 2036 u64 offset;
1634 u64 length; 2037 u64 length;
1635 2038
1636 object_name = rbd_segment_name(rbd_dev, image_offset); 2039 object_name = rbd_segment_name(rbd_dev, img_offset);
1637 if (!object_name) 2040 if (!object_name)
1638 goto out_unwind; 2041 goto out_unwind;
1639 offset = rbd_segment_offset(rbd_dev, image_offset); 2042 offset = rbd_segment_offset(rbd_dev, img_offset);
1640 length = rbd_segment_length(rbd_dev, image_offset, resid); 2043 length = rbd_segment_length(rbd_dev, img_offset, resid);
1641 obj_request = rbd_obj_request_create(object_name, 2044 obj_request = rbd_obj_request_create(object_name,
1642 offset, length, 2045 offset, length, type);
1643 OBJ_REQUEST_BIO); 2046 /* object request has its own copy of the object name */
1644 kfree(object_name); /* object request has its own copy */ 2047 rbd_segment_name_free(object_name);
1645 if (!obj_request) 2048 if (!obj_request)
1646 goto out_unwind; 2049 goto out_unwind;
1647 2050
1648 rbd_assert(length <= (u64) UINT_MAX); 2051 if (type == OBJ_REQUEST_BIO) {
1649 clone_size = (unsigned int) length; 2052 unsigned int clone_size;
1650 obj_request->bio_list = bio_chain_clone_range(&bio_list, 2053
1651 &bio_offset, clone_size, 2054 rbd_assert(length <= (u64)UINT_MAX);
1652 GFP_ATOMIC); 2055 clone_size = (unsigned int)length;
1653 if (!obj_request->bio_list) 2056 obj_request->bio_list =
1654 goto out_partial; 2057 bio_chain_clone_range(&bio_list,
2058 &bio_offset,
2059 clone_size,
2060 GFP_ATOMIC);
2061 if (!obj_request->bio_list)
2062 goto out_partial;
2063 } else {
2064 unsigned int page_count;
2065
2066 obj_request->pages = pages;
2067 page_count = (u32)calc_pages_for(offset, length);
2068 obj_request->page_count = page_count;
2069 if ((offset + length) & ~PAGE_MASK)
2070 page_count--; /* more on last page */
2071 pages += page_count;
2072 }
1655 2073
1656 /* 2074 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1657 * Build up the op to use in building the osd 2075 obj_request);
1658 * request. Note that the contents of the op are 2076 if (!osd_req)
1659 * copied by rbd_osd_req_create().
1660 */
1661 op = rbd_osd_req_op_create(opcode, offset, length);
1662 if (!op)
1663 goto out_partial;
1664 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1665 img_request->write_request,
1666 obj_request, op);
1667 rbd_osd_req_op_destroy(op);
1668 if (!obj_request->osd_req)
1669 goto out_partial; 2077 goto out_partial;
1670 /* status and version are initially zero-filled */ 2078 obj_request->osd_req = osd_req;
2079 obj_request->callback = rbd_img_obj_callback;
1671 2080
2081 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2082 0, 0);
2083 if (type == OBJ_REQUEST_BIO)
2084 osd_req_op_extent_osd_data_bio(osd_req, 0,
2085 obj_request->bio_list, length);
2086 else
2087 osd_req_op_extent_osd_data_pages(osd_req, 0,
2088 obj_request->pages, length,
2089 offset & ~PAGE_MASK, false, false);
2090
2091 if (write_request)
2092 rbd_osd_req_format_write(obj_request);
2093 else
2094 rbd_osd_req_format_read(obj_request);
2095
2096 obj_request->img_offset = img_offset;
1672 rbd_img_obj_request_add(img_request, obj_request); 2097 rbd_img_obj_request_add(img_request, obj_request);
1673 2098
1674 image_offset += length; 2099 img_offset += length;
1675 resid -= length; 2100 resid -= length;
1676 } 2101 }
1677 2102
@@ -1686,61 +2111,389 @@ out_unwind:
1686 return -ENOMEM; 2111 return -ENOMEM;
1687} 2112}
1688 2113
1689static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) 2114static void
2115rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
1690{ 2116{
1691 struct rbd_img_request *img_request; 2117 struct rbd_img_request *img_request;
1692 u32 which = obj_request->which; 2118 struct rbd_device *rbd_dev;
1693 bool more = true; 2119 u64 length;
2120 u32 page_count;
1694 2121
2122 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2123 rbd_assert(obj_request_img_data_test(obj_request));
1695 img_request = obj_request->img_request; 2124 img_request = obj_request->img_request;
2125 rbd_assert(img_request);
1696 2126
1697 dout("%s: img %p obj %p\n", __func__, img_request, obj_request); 2127 rbd_dev = img_request->rbd_dev;
2128 rbd_assert(rbd_dev);
2129 length = (u64)1 << rbd_dev->header.obj_order;
2130 page_count = (u32)calc_pages_for(0, length);
2131
2132 rbd_assert(obj_request->copyup_pages);
2133 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2134 obj_request->copyup_pages = NULL;
2135
2136 /*
2137 * We want the transfer count to reflect the size of the
2138 * original write request. There is no such thing as a
2139 * successful short write, so if the request was successful
2140 * we can just set it to the originally-requested length.
2141 */
2142 if (!obj_request->result)
2143 obj_request->xferred = obj_request->length;
2144
2145 /* Finish up with the normal image object callback */
2146
2147 rbd_img_obj_callback(obj_request);
2148}
2149
2150static void
2151rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2152{
2153 struct rbd_obj_request *orig_request;
2154 struct ceph_osd_request *osd_req;
2155 struct ceph_osd_client *osdc;
2156 struct rbd_device *rbd_dev;
2157 struct page **pages;
2158 int result;
2159 u64 obj_size;
2160 u64 xferred;
2161
2162 rbd_assert(img_request_child_test(img_request));
2163
2164 /* First get what we need from the image request */
2165
2166 pages = img_request->copyup_pages;
2167 rbd_assert(pages != NULL);
2168 img_request->copyup_pages = NULL;
2169
2170 orig_request = img_request->obj_request;
2171 rbd_assert(orig_request != NULL);
2172 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2173 result = img_request->result;
2174 obj_size = img_request->length;
2175 xferred = img_request->xferred;
2176
2177 rbd_dev = img_request->rbd_dev;
2178 rbd_assert(rbd_dev);
2179 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2180
2181 rbd_img_request_put(img_request);
2182
2183 if (result)
2184 goto out_err;
2185
2186 /* Allocate the new copyup osd request for the original request */
2187
2188 result = -ENOMEM;
2189 rbd_assert(!orig_request->osd_req);
2190 osd_req = rbd_osd_req_create_copyup(orig_request);
2191 if (!osd_req)
2192 goto out_err;
2193 orig_request->osd_req = osd_req;
2194 orig_request->copyup_pages = pages;
2195
2196 /* Initialize the copyup op */
2197
2198 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2199 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2200 false, false);
2201
2202 /* Then the original write request op */
2203
2204 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2205 orig_request->offset,
2206 orig_request->length, 0, 0);
2207 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2208 orig_request->length);
2209
2210 rbd_osd_req_format_write(orig_request);
2211
2212 /* All set, send it off. */
2213
2214 orig_request->callback = rbd_img_obj_copyup_callback;
2215 osdc = &rbd_dev->rbd_client->client->osdc;
2216 result = rbd_obj_request_submit(osdc, orig_request);
2217 if (!result)
2218 return;
2219out_err:
2220 /* Record the error code and complete the request */
2221
2222 orig_request->result = result;
2223 orig_request->xferred = 0;
2224 obj_request_done_set(orig_request);
2225 rbd_obj_request_complete(orig_request);
2226}
2227
2228/*
2229 * Read from the parent image the range of data that covers the
2230 * entire target of the given object request. This is used for
2231 * satisfying a layered image write request when the target of an
2232 * object request from the image request does not exist.
2233 *
2234 * A page array big enough to hold the returned data is allocated
2235 * and supplied to rbd_img_request_fill() as the "data descriptor."
2236 * When the read completes, this page array will be transferred to
2237 * the original object request for the copyup operation.
2238 *
2239 * If an error occurs, record it as the result of the original
2240 * object request and mark it done so it gets completed.
2241 */
2242static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2243{
2244 struct rbd_img_request *img_request = NULL;
2245 struct rbd_img_request *parent_request = NULL;
2246 struct rbd_device *rbd_dev;
2247 u64 img_offset;
2248 u64 length;
2249 struct page **pages = NULL;
2250 u32 page_count;
2251 int result;
2252
2253 rbd_assert(obj_request_img_data_test(obj_request));
2254 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2255
2256 img_request = obj_request->img_request;
1698 rbd_assert(img_request != NULL); 2257 rbd_assert(img_request != NULL);
1699 rbd_assert(img_request->rq != NULL); 2258 rbd_dev = img_request->rbd_dev;
1700 rbd_assert(img_request->obj_request_count > 0); 2259 rbd_assert(rbd_dev->parent != NULL);
1701 rbd_assert(which != BAD_WHICH);
1702 rbd_assert(which < img_request->obj_request_count);
1703 rbd_assert(which >= img_request->next_completion);
1704 2260
1705 spin_lock_irq(&img_request->completion_lock); 2261 /*
1706 if (which != img_request->next_completion) 2262 * First things first. The original osd request is of no
1707 goto out; 2263 * use to use any more, we'll need a new one that can hold
2264 * the two ops in a copyup request. We'll get that later,
2265 * but for now we can release the old one.
2266 */
2267 rbd_osd_req_destroy(obj_request->osd_req);
2268 obj_request->osd_req = NULL;
1708 2269
1709 for_each_obj_request_from(img_request, obj_request) { 2270 /*
1710 unsigned int xferred; 2271 * Determine the byte range covered by the object in the
1711 int result; 2272 * child image to which the original request was to be sent.
2273 */
2274 img_offset = obj_request->img_offset - obj_request->offset;
2275 length = (u64)1 << rbd_dev->header.obj_order;
1712 2276
1713 rbd_assert(more); 2277 /*
1714 rbd_assert(which < img_request->obj_request_count); 2278 * There is no defined parent data beyond the parent
2279 * overlap, so limit what we read at that boundary if
2280 * necessary.
2281 */
2282 if (img_offset + length > rbd_dev->parent_overlap) {
2283 rbd_assert(img_offset < rbd_dev->parent_overlap);
2284 length = rbd_dev->parent_overlap - img_offset;
2285 }
1715 2286
1716 if (!obj_request_done_test(obj_request)) 2287 /*
1717 break; 2288 * Allocate a page array big enough to receive the data read
2289 * from the parent.
2290 */
2291 page_count = (u32)calc_pages_for(0, length);
2292 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2293 if (IS_ERR(pages)) {
2294 result = PTR_ERR(pages);
2295 pages = NULL;
2296 goto out_err;
2297 }
1718 2298
1719 rbd_assert(obj_request->xferred <= (u64) UINT_MAX); 2299 result = -ENOMEM;
1720 xferred = (unsigned int) obj_request->xferred; 2300 parent_request = rbd_img_request_create(rbd_dev->parent,
1721 result = (int) obj_request->result; 2301 img_offset, length,
1722 if (result) 2302 false, true);
1723 rbd_warn(NULL, "obj_request %s result %d xferred %u\n", 2303 if (!parent_request)
1724 img_request->write_request ? "write" : "read", 2304 goto out_err;
1725 result, xferred); 2305 rbd_obj_request_get(obj_request);
2306 parent_request->obj_request = obj_request;
1726 2307
1727 more = blk_end_request(img_request->rq, result, xferred); 2308 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
1728 which++; 2309 if (result)
2310 goto out_err;
2311 parent_request->copyup_pages = pages;
2312
2313 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2314 result = rbd_img_request_submit(parent_request);
2315 if (!result)
2316 return 0;
2317
2318 parent_request->copyup_pages = NULL;
2319 parent_request->obj_request = NULL;
2320 rbd_obj_request_put(obj_request);
2321out_err:
2322 if (pages)
2323 ceph_release_page_vector(pages, page_count);
2324 if (parent_request)
2325 rbd_img_request_put(parent_request);
2326 obj_request->result = result;
2327 obj_request->xferred = 0;
2328 obj_request_done_set(obj_request);
2329
2330 return result;
2331}
2332
2333static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2334{
2335 struct rbd_obj_request *orig_request;
2336 int result;
2337
2338 rbd_assert(!obj_request_img_data_test(obj_request));
2339
2340 /*
2341 * All we need from the object request is the original
2342 * request and the result of the STAT op. Grab those, then
2343 * we're done with the request.
2344 */
2345 orig_request = obj_request->obj_request;
2346 obj_request->obj_request = NULL;
2347 rbd_assert(orig_request);
2348 rbd_assert(orig_request->img_request);
2349
2350 result = obj_request->result;
2351 obj_request->result = 0;
2352
2353 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2354 obj_request, orig_request, result,
2355 obj_request->xferred, obj_request->length);
2356 rbd_obj_request_put(obj_request);
2357
2358 rbd_assert(orig_request);
2359 rbd_assert(orig_request->img_request);
2360
2361 /*
2362 * Our only purpose here is to determine whether the object
2363 * exists, and we don't want to treat the non-existence as
2364 * an error. If something else comes back, transfer the
2365 * error to the original request and complete it now.
2366 */
2367 if (!result) {
2368 obj_request_existence_set(orig_request, true);
2369 } else if (result == -ENOENT) {
2370 obj_request_existence_set(orig_request, false);
2371 } else if (result) {
2372 orig_request->result = result;
2373 goto out;
1729 } 2374 }
1730 2375
1731 rbd_assert(more ^ (which == img_request->obj_request_count)); 2376 /*
1732 img_request->next_completion = which; 2377 * Resubmit the original request now that we have recorded
2378 * whether the target object exists.
2379 */
2380 orig_request->result = rbd_img_obj_request_submit(orig_request);
1733out: 2381out:
1734 spin_unlock_irq(&img_request->completion_lock); 2382 if (orig_request->result)
2383 rbd_obj_request_complete(orig_request);
2384 rbd_obj_request_put(orig_request);
2385}
1735 2386
1736 if (!more) 2387static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
1737 rbd_img_request_complete(img_request); 2388{
2389 struct rbd_obj_request *stat_request;
2390 struct rbd_device *rbd_dev;
2391 struct ceph_osd_client *osdc;
2392 struct page **pages = NULL;
2393 u32 page_count;
2394 size_t size;
2395 int ret;
2396
2397 /*
2398 * The response data for a STAT call consists of:
2399 * le64 length;
2400 * struct {
2401 * le32 tv_sec;
2402 * le32 tv_nsec;
2403 * } mtime;
2404 */
2405 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2406 page_count = (u32)calc_pages_for(0, size);
2407 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2408 if (IS_ERR(pages))
2409 return PTR_ERR(pages);
2410
2411 ret = -ENOMEM;
2412 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2413 OBJ_REQUEST_PAGES);
2414 if (!stat_request)
2415 goto out;
2416
2417 rbd_obj_request_get(obj_request);
2418 stat_request->obj_request = obj_request;
2419 stat_request->pages = pages;
2420 stat_request->page_count = page_count;
2421
2422 rbd_assert(obj_request->img_request);
2423 rbd_dev = obj_request->img_request->rbd_dev;
2424 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2425 stat_request);
2426 if (!stat_request->osd_req)
2427 goto out;
2428 stat_request->callback = rbd_img_obj_exists_callback;
2429
2430 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2431 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2432 false, false);
2433 rbd_osd_req_format_read(stat_request);
2434
2435 osdc = &rbd_dev->rbd_client->client->osdc;
2436 ret = rbd_obj_request_submit(osdc, stat_request);
2437out:
2438 if (ret)
2439 rbd_obj_request_put(obj_request);
2440
2441 return ret;
2442}
2443
2444static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2445{
2446 struct rbd_img_request *img_request;
2447 struct rbd_device *rbd_dev;
2448 bool known;
2449
2450 rbd_assert(obj_request_img_data_test(obj_request));
2451
2452 img_request = obj_request->img_request;
2453 rbd_assert(img_request);
2454 rbd_dev = img_request->rbd_dev;
2455
2456 /*
2457 * Only writes to layered images need special handling.
2458 * Reads and non-layered writes are simple object requests.
2459 * Layered writes that start beyond the end of the overlap
2460 * with the parent have no parent data, so they too are
2461 * simple object requests. Finally, if the target object is
2462 * known to already exist, its parent data has already been
2463 * copied, so a write to the object can also be handled as a
2464 * simple object request.
2465 */
2466 if (!img_request_write_test(img_request) ||
2467 !img_request_layered_test(img_request) ||
2468 rbd_dev->parent_overlap <= obj_request->img_offset ||
2469 ((known = obj_request_known_test(obj_request)) &&
2470 obj_request_exists_test(obj_request))) {
2471
2472 struct rbd_device *rbd_dev;
2473 struct ceph_osd_client *osdc;
2474
2475 rbd_dev = obj_request->img_request->rbd_dev;
2476 osdc = &rbd_dev->rbd_client->client->osdc;
2477
2478 return rbd_obj_request_submit(osdc, obj_request);
2479 }
2480
2481 /*
2482 * It's a layered write. The target object might exist but
2483 * we may not know that yet. If we know it doesn't exist,
2484 * start by reading the data for the full target object from
2485 * the parent so we can use it for a copyup to the target.
2486 */
2487 if (known)
2488 return rbd_img_obj_parent_read_full(obj_request);
2489
2490 /* We don't know whether the target exists. Go find out. */
2491
2492 return rbd_img_obj_exists_submit(obj_request);
1738} 2493}
1739 2494
1740static int rbd_img_request_submit(struct rbd_img_request *img_request) 2495static int rbd_img_request_submit(struct rbd_img_request *img_request)
1741{ 2496{
1742 struct rbd_device *rbd_dev = img_request->rbd_dev;
1743 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1744 struct rbd_obj_request *obj_request; 2497 struct rbd_obj_request *obj_request;
1745 struct rbd_obj_request *next_obj_request; 2498 struct rbd_obj_request *next_obj_request;
1746 2499
@@ -1748,27 +2501,105 @@ static int rbd_img_request_submit(struct rbd_img_request *img_request)
1748 for_each_obj_request_safe(img_request, obj_request, next_obj_request) { 2501 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
1749 int ret; 2502 int ret;
1750 2503
1751 obj_request->callback = rbd_img_obj_callback; 2504 ret = rbd_img_obj_request_submit(obj_request);
1752 ret = rbd_obj_request_submit(osdc, obj_request);
1753 if (ret) 2505 if (ret)
1754 return ret; 2506 return ret;
1755 /*
1756 * The image request has its own reference to each
1757 * of its object requests, so we can safely drop the
1758 * initial one here.
1759 */
1760 rbd_obj_request_put(obj_request);
1761 } 2507 }
1762 2508
1763 return 0; 2509 return 0;
1764} 2510}
1765 2511
1766static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, 2512static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
1767 u64 ver, u64 notify_id)
1768{ 2513{
1769 struct rbd_obj_request *obj_request; 2514 struct rbd_obj_request *obj_request;
1770 struct ceph_osd_req_op *op; 2515 struct rbd_device *rbd_dev;
1771 struct ceph_osd_client *osdc; 2516 u64 obj_end;
2517
2518 rbd_assert(img_request_child_test(img_request));
2519
2520 obj_request = img_request->obj_request;
2521 rbd_assert(obj_request);
2522 rbd_assert(obj_request->img_request);
2523
2524 obj_request->result = img_request->result;
2525 if (obj_request->result)
2526 goto out;
2527
2528 /*
2529 * We need to zero anything beyond the parent overlap
2530 * boundary. Since rbd_img_obj_request_read_callback()
2531 * will zero anything beyond the end of a short read, an
2532 * easy way to do this is to pretend the data from the
2533 * parent came up short--ending at the overlap boundary.
2534 */
2535 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2536 obj_end = obj_request->img_offset + obj_request->length;
2537 rbd_dev = obj_request->img_request->rbd_dev;
2538 if (obj_end > rbd_dev->parent_overlap) {
2539 u64 xferred = 0;
2540
2541 if (obj_request->img_offset < rbd_dev->parent_overlap)
2542 xferred = rbd_dev->parent_overlap -
2543 obj_request->img_offset;
2544
2545 obj_request->xferred = min(img_request->xferred, xferred);
2546 } else {
2547 obj_request->xferred = img_request->xferred;
2548 }
2549out:
2550 rbd_img_request_put(img_request);
2551 rbd_img_obj_request_read_callback(obj_request);
2552 rbd_obj_request_complete(obj_request);
2553}
2554
2555static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2556{
2557 struct rbd_device *rbd_dev;
2558 struct rbd_img_request *img_request;
2559 int result;
2560
2561 rbd_assert(obj_request_img_data_test(obj_request));
2562 rbd_assert(obj_request->img_request != NULL);
2563 rbd_assert(obj_request->result == (s32) -ENOENT);
2564 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2565
2566 rbd_dev = obj_request->img_request->rbd_dev;
2567 rbd_assert(rbd_dev->parent != NULL);
2568 /* rbd_read_finish(obj_request, obj_request->length); */
2569 img_request = rbd_img_request_create(rbd_dev->parent,
2570 obj_request->img_offset,
2571 obj_request->length,
2572 false, true);
2573 result = -ENOMEM;
2574 if (!img_request)
2575 goto out_err;
2576
2577 rbd_obj_request_get(obj_request);
2578 img_request->obj_request = obj_request;
2579
2580 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2581 obj_request->bio_list);
2582 if (result)
2583 goto out_err;
2584
2585 img_request->callback = rbd_img_parent_read_callback;
2586 result = rbd_img_request_submit(img_request);
2587 if (result)
2588 goto out_err;
2589
2590 return;
2591out_err:
2592 if (img_request)
2593 rbd_img_request_put(img_request);
2594 obj_request->result = result;
2595 obj_request->xferred = 0;
2596 obj_request_done_set(obj_request);
2597}
2598
2599static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2600{
2601 struct rbd_obj_request *obj_request;
2602 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1772 int ret; 2603 int ret;
1773 2604
1774 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, 2605 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
@@ -1777,17 +2608,15 @@ static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1777 return -ENOMEM; 2608 return -ENOMEM;
1778 2609
1779 ret = -ENOMEM; 2610 ret = -ENOMEM;
1780 op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver); 2611 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1781 if (!op)
1782 goto out;
1783 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1784 obj_request, op);
1785 rbd_osd_req_op_destroy(op);
1786 if (!obj_request->osd_req) 2612 if (!obj_request->osd_req)
1787 goto out; 2613 goto out;
1788
1789 osdc = &rbd_dev->rbd_client->client->osdc;
1790 obj_request->callback = rbd_obj_request_put; 2614 obj_request->callback = rbd_obj_request_put;
2615
2616 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2617 notify_id, 0, 0);
2618 rbd_osd_req_format_read(obj_request);
2619
1791 ret = rbd_obj_request_submit(osdc, obj_request); 2620 ret = rbd_obj_request_submit(osdc, obj_request);
1792out: 2621out:
1793 if (ret) 2622 if (ret)
@@ -1799,21 +2628,16 @@ out:
1799static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) 2628static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1800{ 2629{
1801 struct rbd_device *rbd_dev = (struct rbd_device *)data; 2630 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1802 u64 hver;
1803 int rc;
1804 2631
1805 if (!rbd_dev) 2632 if (!rbd_dev)
1806 return; 2633 return;
1807 2634
1808 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__, 2635 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1809 rbd_dev->header_name, (unsigned long long) notify_id, 2636 rbd_dev->header_name, (unsigned long long)notify_id,
1810 (unsigned int) opcode); 2637 (unsigned int)opcode);
1811 rc = rbd_dev_refresh(rbd_dev, &hver); 2638 (void)rbd_dev_refresh(rbd_dev);
1812 if (rc)
1813 rbd_warn(rbd_dev, "got notification but failed to "
1814 " update snaps: %d\n", rc);
1815 2639
1816 rbd_obj_notify_ack(rbd_dev, hver, notify_id); 2640 rbd_obj_notify_ack(rbd_dev, notify_id);
1817} 2641}
1818 2642
1819/* 2643/*
@@ -1824,7 +2648,6 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1824{ 2648{
1825 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2649 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1826 struct rbd_obj_request *obj_request; 2650 struct rbd_obj_request *obj_request;
1827 struct ceph_osd_req_op *op;
1828 int ret; 2651 int ret;
1829 2652
1830 rbd_assert(start ^ !!rbd_dev->watch_event); 2653 rbd_assert(start ^ !!rbd_dev->watch_event);
@@ -1844,14 +2667,7 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1844 if (!obj_request) 2667 if (!obj_request)
1845 goto out_cancel; 2668 goto out_cancel;
1846 2669
1847 op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH, 2670 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
1848 rbd_dev->watch_event->cookie,
1849 rbd_dev->header.obj_version, start);
1850 if (!op)
1851 goto out_cancel;
1852 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
1853 obj_request, op);
1854 rbd_osd_req_op_destroy(op);
1855 if (!obj_request->osd_req) 2671 if (!obj_request->osd_req)
1856 goto out_cancel; 2672 goto out_cancel;
1857 2673
@@ -1860,6 +2676,11 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1860 else 2676 else
1861 ceph_osdc_unregister_linger_request(osdc, 2677 ceph_osdc_unregister_linger_request(osdc,
1862 rbd_dev->watch_request->osd_req); 2678 rbd_dev->watch_request->osd_req);
2679
2680 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2681 rbd_dev->watch_event->cookie, 0, start);
2682 rbd_osd_req_format_write(obj_request);
2683
1863 ret = rbd_obj_request_submit(osdc, obj_request); 2684 ret = rbd_obj_request_submit(osdc, obj_request);
1864 if (ret) 2685 if (ret)
1865 goto out_cancel; 2686 goto out_cancel;
@@ -1899,40 +2720,38 @@ out_cancel:
1899} 2720}
1900 2721
1901/* 2722/*
1902 * Synchronous osd object method call 2723 * Synchronous osd object method call. Returns the number of bytes
2724 * returned in the outbound buffer, or a negative error code.
1903 */ 2725 */
1904static int rbd_obj_method_sync(struct rbd_device *rbd_dev, 2726static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1905 const char *object_name, 2727 const char *object_name,
1906 const char *class_name, 2728 const char *class_name,
1907 const char *method_name, 2729 const char *method_name,
1908 const char *outbound, 2730 const void *outbound,
1909 size_t outbound_size, 2731 size_t outbound_size,
1910 char *inbound, 2732 void *inbound,
1911 size_t inbound_size, 2733 size_t inbound_size)
1912 u64 *version)
1913{ 2734{
2735 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1914 struct rbd_obj_request *obj_request; 2736 struct rbd_obj_request *obj_request;
1915 struct ceph_osd_client *osdc;
1916 struct ceph_osd_req_op *op;
1917 struct page **pages; 2737 struct page **pages;
1918 u32 page_count; 2738 u32 page_count;
1919 int ret; 2739 int ret;
1920 2740
1921 /* 2741 /*
1922 * Method calls are ultimately read operations but they 2742 * Method calls are ultimately read operations. The result
1923 * don't involve object data (so no offset or length). 2743 * should placed into the inbound buffer provided. They
1924 * The result should placed into the inbound buffer 2744 * also supply outbound data--parameters for the object
1925 * provided. They also supply outbound data--parameters for 2745 * method. Currently if this is present it will be a
1926 * the object method. Currently if this is present it will 2746 * snapshot id.
1927 * be a snapshot id.
1928 */ 2747 */
1929 page_count = (u32) calc_pages_for(0, inbound_size); 2748 page_count = (u32)calc_pages_for(0, inbound_size);
1930 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 2749 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1931 if (IS_ERR(pages)) 2750 if (IS_ERR(pages))
1932 return PTR_ERR(pages); 2751 return PTR_ERR(pages);
1933 2752
1934 ret = -ENOMEM; 2753 ret = -ENOMEM;
1935 obj_request = rbd_obj_request_create(object_name, 0, 0, 2754 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
1936 OBJ_REQUEST_PAGES); 2755 OBJ_REQUEST_PAGES);
1937 if (!obj_request) 2756 if (!obj_request)
1938 goto out; 2757 goto out;
@@ -1940,17 +2759,29 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1940 obj_request->pages = pages; 2759 obj_request->pages = pages;
1941 obj_request->page_count = page_count; 2760 obj_request->page_count = page_count;
1942 2761
1943 op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name, 2762 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1944 method_name, outbound, outbound_size);
1945 if (!op)
1946 goto out;
1947 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1948 obj_request, op);
1949 rbd_osd_req_op_destroy(op);
1950 if (!obj_request->osd_req) 2763 if (!obj_request->osd_req)
1951 goto out; 2764 goto out;
1952 2765
1953 osdc = &rbd_dev->rbd_client->client->osdc; 2766 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2767 class_name, method_name);
2768 if (outbound_size) {
2769 struct ceph_pagelist *pagelist;
2770
2771 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2772 if (!pagelist)
2773 goto out;
2774
2775 ceph_pagelist_init(pagelist);
2776 ceph_pagelist_append(pagelist, outbound, outbound_size);
2777 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2778 pagelist);
2779 }
2780 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2781 obj_request->pages, inbound_size,
2782 0, false, false);
2783 rbd_osd_req_format_read(obj_request);
2784
1954 ret = rbd_obj_request_submit(osdc, obj_request); 2785 ret = rbd_obj_request_submit(osdc, obj_request);
1955 if (ret) 2786 if (ret)
1956 goto out; 2787 goto out;
@@ -1961,10 +2792,10 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1961 ret = obj_request->result; 2792 ret = obj_request->result;
1962 if (ret < 0) 2793 if (ret < 0)
1963 goto out; 2794 goto out;
1964 ret = 0; 2795
2796 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2797 ret = (int)obj_request->xferred;
1965 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred); 2798 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1966 if (version)
1967 *version = obj_request->version;
1968out: 2799out:
1969 if (obj_request) 2800 if (obj_request)
1970 rbd_obj_request_put(obj_request); 2801 rbd_obj_request_put(obj_request);
@@ -2034,18 +2865,22 @@ static void rbd_request_fn(struct request_queue *q)
2034 } 2865 }
2035 2866
2036 result = -EINVAL; 2867 result = -EINVAL;
2037 if (WARN_ON(offset && length > U64_MAX - offset + 1)) 2868 if (offset && length > U64_MAX - offset + 1) {
2869 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2870 offset, length);
2038 goto end_request; /* Shouldn't happen */ 2871 goto end_request; /* Shouldn't happen */
2872 }
2039 2873
2040 result = -ENOMEM; 2874 result = -ENOMEM;
2041 img_request = rbd_img_request_create(rbd_dev, offset, length, 2875 img_request = rbd_img_request_create(rbd_dev, offset, length,
2042 write_request); 2876 write_request, false);
2043 if (!img_request) 2877 if (!img_request)
2044 goto end_request; 2878 goto end_request;
2045 2879
2046 img_request->rq = rq; 2880 img_request->rq = rq;
2047 2881
2048 result = rbd_img_request_fill_bio(img_request, rq->bio); 2882 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2883 rq->bio);
2049 if (!result) 2884 if (!result)
2050 result = rbd_img_request_submit(img_request); 2885 result = rbd_img_request_submit(img_request);
2051 if (result) 2886 if (result)
@@ -2053,8 +2888,10 @@ static void rbd_request_fn(struct request_queue *q)
2053end_request: 2888end_request:
2054 spin_lock_irq(q->queue_lock); 2889 spin_lock_irq(q->queue_lock);
2055 if (result < 0) { 2890 if (result < 0) {
2056 rbd_warn(rbd_dev, "obj_request %s result %d\n", 2891 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2057 write_request ? "write" : "read", result); 2892 write_request ? "write" : "read",
2893 length, offset, result);
2894
2058 __blk_end_request_all(rq, result); 2895 __blk_end_request_all(rq, result);
2059 } 2896 }
2060 } 2897 }
@@ -2113,22 +2950,22 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
2113 if (!disk) 2950 if (!disk)
2114 return; 2951 return;
2115 2952
2116 if (disk->flags & GENHD_FL_UP) 2953 rbd_dev->disk = NULL;
2954 if (disk->flags & GENHD_FL_UP) {
2117 del_gendisk(disk); 2955 del_gendisk(disk);
2118 if (disk->queue) 2956 if (disk->queue)
2119 blk_cleanup_queue(disk->queue); 2957 blk_cleanup_queue(disk->queue);
2958 }
2120 put_disk(disk); 2959 put_disk(disk);
2121} 2960}
2122 2961
2123static int rbd_obj_read_sync(struct rbd_device *rbd_dev, 2962static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2124 const char *object_name, 2963 const char *object_name,
2125 u64 offset, u64 length, 2964 u64 offset, u64 length, void *buf)
2126 char *buf, u64 *version)
2127 2965
2128{ 2966{
2129 struct ceph_osd_req_op *op; 2967 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2130 struct rbd_obj_request *obj_request; 2968 struct rbd_obj_request *obj_request;
2131 struct ceph_osd_client *osdc;
2132 struct page **pages = NULL; 2969 struct page **pages = NULL;
2133 u32 page_count; 2970 u32 page_count;
2134 size_t size; 2971 size_t size;
@@ -2148,16 +2985,19 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2148 obj_request->pages = pages; 2985 obj_request->pages = pages;
2149 obj_request->page_count = page_count; 2986 obj_request->page_count = page_count;
2150 2987
2151 op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length); 2988 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2152 if (!op)
2153 goto out;
2154 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2155 obj_request, op);
2156 rbd_osd_req_op_destroy(op);
2157 if (!obj_request->osd_req) 2989 if (!obj_request->osd_req)
2158 goto out; 2990 goto out;
2159 2991
2160 osdc = &rbd_dev->rbd_client->client->osdc; 2992 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2993 offset, length, 0, 0);
2994 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2995 obj_request->pages,
2996 obj_request->length,
2997 obj_request->offset & ~PAGE_MASK,
2998 false, false);
2999 rbd_osd_req_format_read(obj_request);
3000
2161 ret = rbd_obj_request_submit(osdc, obj_request); 3001 ret = rbd_obj_request_submit(osdc, obj_request);
2162 if (ret) 3002 if (ret)
2163 goto out; 3003 goto out;
@@ -2172,10 +3012,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2172 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX); 3012 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2173 size = (size_t) obj_request->xferred; 3013 size = (size_t) obj_request->xferred;
2174 ceph_copy_from_page_vector(pages, buf, 0, size); 3014 ceph_copy_from_page_vector(pages, buf, 0, size);
2175 rbd_assert(size <= (size_t) INT_MAX); 3015 rbd_assert(size <= (size_t)INT_MAX);
2176 ret = (int) size; 3016 ret = (int)size;
2177 if (version)
2178 *version = obj_request->version;
2179out: 3017out:
2180 if (obj_request) 3018 if (obj_request)
2181 rbd_obj_request_put(obj_request); 3019 rbd_obj_request_put(obj_request);
@@ -2196,7 +3034,7 @@ out:
2196 * Returns a pointer-coded errno if a failure occurs. 3034 * Returns a pointer-coded errno if a failure occurs.
2197 */ 3035 */
2198static struct rbd_image_header_ondisk * 3036static struct rbd_image_header_ondisk *
2199rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version) 3037rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
2200{ 3038{
2201 struct rbd_image_header_ondisk *ondisk = NULL; 3039 struct rbd_image_header_ondisk *ondisk = NULL;
2202 u32 snap_count = 0; 3040 u32 snap_count = 0;
@@ -2224,11 +3062,10 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2224 return ERR_PTR(-ENOMEM); 3062 return ERR_PTR(-ENOMEM);
2225 3063
2226 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name, 3064 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2227 0, size, 3065 0, size, ondisk);
2228 (char *) ondisk, version);
2229 if (ret < 0) 3066 if (ret < 0)
2230 goto out_err; 3067 goto out_err;
2231 if (WARN_ON((size_t) ret < size)) { 3068 if ((size_t)ret < size) {
2232 ret = -ENXIO; 3069 ret = -ENXIO;
2233 rbd_warn(rbd_dev, "short header read (want %zd got %d)", 3070 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2234 size, ret); 3071 size, ret);
@@ -2260,46 +3097,36 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
2260 struct rbd_image_header *header) 3097 struct rbd_image_header *header)
2261{ 3098{
2262 struct rbd_image_header_ondisk *ondisk; 3099 struct rbd_image_header_ondisk *ondisk;
2263 u64 ver = 0;
2264 int ret; 3100 int ret;
2265 3101
2266 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver); 3102 ondisk = rbd_dev_v1_header_read(rbd_dev);
2267 if (IS_ERR(ondisk)) 3103 if (IS_ERR(ondisk))
2268 return PTR_ERR(ondisk); 3104 return PTR_ERR(ondisk);
2269 ret = rbd_header_from_disk(header, ondisk); 3105 ret = rbd_header_from_disk(header, ondisk);
2270 if (ret >= 0)
2271 header->obj_version = ver;
2272 kfree(ondisk); 3106 kfree(ondisk);
2273 3107
2274 return ret; 3108 return ret;
2275} 3109}
2276 3110
2277static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2278{
2279 struct rbd_snap *snap;
2280 struct rbd_snap *next;
2281
2282 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2283 rbd_remove_snap_dev(snap);
2284}
2285
2286static void rbd_update_mapping_size(struct rbd_device *rbd_dev) 3111static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2287{ 3112{
2288 sector_t size;
2289
2290 if (rbd_dev->spec->snap_id != CEPH_NOSNAP) 3113 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2291 return; 3114 return;
2292 3115
2293 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE; 3116 if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
2294 dout("setting size to %llu sectors", (unsigned long long) size); 3117 sector_t size;
2295 rbd_dev->mapping.size = (u64) size; 3118
2296 set_capacity(rbd_dev->disk, size); 3119 rbd_dev->mapping.size = rbd_dev->header.image_size;
3120 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3121 dout("setting size to %llu sectors", (unsigned long long)size);
3122 set_capacity(rbd_dev->disk, size);
3123 }
2297} 3124}
2298 3125
2299/* 3126/*
2300 * only read the first part of the ondisk header, without the snaps info 3127 * only read the first part of the ondisk header, without the snaps info
2301 */ 3128 */
2302static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver) 3129static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
2303{ 3130{
2304 int ret; 3131 int ret;
2305 struct rbd_image_header h; 3132 struct rbd_image_header h;
@@ -2320,37 +3147,61 @@ static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2320 /* osd requests may still refer to snapc */ 3147 /* osd requests may still refer to snapc */
2321 ceph_put_snap_context(rbd_dev->header.snapc); 3148 ceph_put_snap_context(rbd_dev->header.snapc);
2322 3149
2323 if (hver)
2324 *hver = h.obj_version;
2325 rbd_dev->header.obj_version = h.obj_version;
2326 rbd_dev->header.image_size = h.image_size; 3150 rbd_dev->header.image_size = h.image_size;
2327 rbd_dev->header.snapc = h.snapc; 3151 rbd_dev->header.snapc = h.snapc;
2328 rbd_dev->header.snap_names = h.snap_names; 3152 rbd_dev->header.snap_names = h.snap_names;
2329 rbd_dev->header.snap_sizes = h.snap_sizes; 3153 rbd_dev->header.snap_sizes = h.snap_sizes;
2330 /* Free the extra copy of the object prefix */ 3154 /* Free the extra copy of the object prefix */
2331 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix)); 3155 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3156 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
2332 kfree(h.object_prefix); 3157 kfree(h.object_prefix);
2333 3158
2334 ret = rbd_dev_snaps_update(rbd_dev);
2335 if (!ret)
2336 ret = rbd_dev_snaps_register(rbd_dev);
2337
2338 up_write(&rbd_dev->header_rwsem); 3159 up_write(&rbd_dev->header_rwsem);
2339 3160
2340 return ret; 3161 return ret;
2341} 3162}
2342 3163
2343static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver) 3164/*
3165 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3166 * has disappeared from the (just updated) snapshot context.
3167 */
3168static void rbd_exists_validate(struct rbd_device *rbd_dev)
3169{
3170 u64 snap_id;
3171
3172 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3173 return;
3174
3175 snap_id = rbd_dev->spec->snap_id;
3176 if (snap_id == CEPH_NOSNAP)
3177 return;
3178
3179 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3180 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3181}
3182
3183static int rbd_dev_refresh(struct rbd_device *rbd_dev)
2344{ 3184{
3185 u64 image_size;
2345 int ret; 3186 int ret;
2346 3187
2347 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 3188 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3189 image_size = rbd_dev->header.image_size;
2348 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 3190 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2349 if (rbd_dev->image_format == 1) 3191 if (rbd_dev->image_format == 1)
2350 ret = rbd_dev_v1_refresh(rbd_dev, hver); 3192 ret = rbd_dev_v1_refresh(rbd_dev);
2351 else 3193 else
2352 ret = rbd_dev_v2_refresh(rbd_dev, hver); 3194 ret = rbd_dev_v2_refresh(rbd_dev);
3195
3196 /* If it's a mapped snapshot, validate its EXISTS flag */
3197
3198 rbd_exists_validate(rbd_dev);
2353 mutex_unlock(&ctl_mutex); 3199 mutex_unlock(&ctl_mutex);
3200 if (ret)
3201 rbd_warn(rbd_dev, "got notification but failed to "
3202 " update snaps: %d\n", ret);
3203 if (image_size != rbd_dev->header.image_size)
3204 revalidate_disk(rbd_dev->disk);
2354 3205
2355 return ret; 3206 return ret;
2356} 3207}
@@ -2394,8 +3245,6 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
2394 3245
2395 rbd_dev->disk = disk; 3246 rbd_dev->disk = disk;
2396 3247
2397 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2398
2399 return 0; 3248 return 0;
2400out_disk: 3249out_disk:
2401 put_disk(disk); 3250 put_disk(disk);
@@ -2416,13 +3265,9 @@ static ssize_t rbd_size_show(struct device *dev,
2416 struct device_attribute *attr, char *buf) 3265 struct device_attribute *attr, char *buf)
2417{ 3266{
2418 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3267 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2419 sector_t size;
2420 3268
2421 down_read(&rbd_dev->header_rwsem); 3269 return sprintf(buf, "%llu\n",
2422 size = get_capacity(rbd_dev->disk); 3270 (unsigned long long)rbd_dev->mapping.size);
2423 up_read(&rbd_dev->header_rwsem);
2424
2425 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2426} 3271}
2427 3272
2428/* 3273/*
@@ -2435,7 +3280,7 @@ static ssize_t rbd_features_show(struct device *dev,
2435 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3280 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2436 3281
2437 return sprintf(buf, "0x%016llx\n", 3282 return sprintf(buf, "0x%016llx\n",
2438 (unsigned long long) rbd_dev->mapping.features); 3283 (unsigned long long)rbd_dev->mapping.features);
2439} 3284}
2440 3285
2441static ssize_t rbd_major_show(struct device *dev, 3286static ssize_t rbd_major_show(struct device *dev,
@@ -2443,7 +3288,11 @@ static ssize_t rbd_major_show(struct device *dev,
2443{ 3288{
2444 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3289 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2445 3290
2446 return sprintf(buf, "%d\n", rbd_dev->major); 3291 if (rbd_dev->major)
3292 return sprintf(buf, "%d\n", rbd_dev->major);
3293
3294 return sprintf(buf, "(none)\n");
3295
2447} 3296}
2448 3297
2449static ssize_t rbd_client_id_show(struct device *dev, 3298static ssize_t rbd_client_id_show(struct device *dev,
@@ -2469,7 +3318,7 @@ static ssize_t rbd_pool_id_show(struct device *dev,
2469 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3318 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2470 3319
2471 return sprintf(buf, "%llu\n", 3320 return sprintf(buf, "%llu\n",
2472 (unsigned long long) rbd_dev->spec->pool_id); 3321 (unsigned long long) rbd_dev->spec->pool_id);
2473} 3322}
2474 3323
2475static ssize_t rbd_name_show(struct device *dev, 3324static ssize_t rbd_name_show(struct device *dev,
@@ -2555,7 +3404,7 @@ static ssize_t rbd_image_refresh(struct device *dev,
2555 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3404 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2556 int ret; 3405 int ret;
2557 3406
2558 ret = rbd_dev_refresh(rbd_dev, NULL); 3407 ret = rbd_dev_refresh(rbd_dev);
2559 3408
2560 return ret < 0 ? ret : size; 3409 return ret < 0 ? ret : size;
2561} 3410}
@@ -2606,71 +3455,6 @@ static struct device_type rbd_device_type = {
2606 .release = rbd_sysfs_dev_release, 3455 .release = rbd_sysfs_dev_release,
2607}; 3456};
2608 3457
2609
2610/*
2611 sysfs - snapshots
2612*/
2613
2614static ssize_t rbd_snap_size_show(struct device *dev,
2615 struct device_attribute *attr,
2616 char *buf)
2617{
2618 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2619
2620 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2621}
2622
2623static ssize_t rbd_snap_id_show(struct device *dev,
2624 struct device_attribute *attr,
2625 char *buf)
2626{
2627 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2628
2629 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2630}
2631
2632static ssize_t rbd_snap_features_show(struct device *dev,
2633 struct device_attribute *attr,
2634 char *buf)
2635{
2636 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2637
2638 return sprintf(buf, "0x%016llx\n",
2639 (unsigned long long) snap->features);
2640}
2641
2642static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2643static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2644static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2645
2646static struct attribute *rbd_snap_attrs[] = {
2647 &dev_attr_snap_size.attr,
2648 &dev_attr_snap_id.attr,
2649 &dev_attr_snap_features.attr,
2650 NULL,
2651};
2652
2653static struct attribute_group rbd_snap_attr_group = {
2654 .attrs = rbd_snap_attrs,
2655};
2656
2657static void rbd_snap_dev_release(struct device *dev)
2658{
2659 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2660 kfree(snap->name);
2661 kfree(snap);
2662}
2663
2664static const struct attribute_group *rbd_snap_attr_groups[] = {
2665 &rbd_snap_attr_group,
2666 NULL
2667};
2668
2669static struct device_type rbd_snap_device_type = {
2670 .groups = rbd_snap_attr_groups,
2671 .release = rbd_snap_dev_release,
2672};
2673
2674static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) 3458static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2675{ 3459{
2676 kref_get(&spec->kref); 3460 kref_get(&spec->kref);
@@ -2694,8 +3478,6 @@ static struct rbd_spec *rbd_spec_alloc(void)
2694 return NULL; 3478 return NULL;
2695 kref_init(&spec->kref); 3479 kref_init(&spec->kref);
2696 3480
2697 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2698
2699 return spec; 3481 return spec;
2700} 3482}
2701 3483
@@ -2722,7 +3504,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2722 spin_lock_init(&rbd_dev->lock); 3504 spin_lock_init(&rbd_dev->lock);
2723 rbd_dev->flags = 0; 3505 rbd_dev->flags = 0;
2724 INIT_LIST_HEAD(&rbd_dev->node); 3506 INIT_LIST_HEAD(&rbd_dev->node);
2725 INIT_LIST_HEAD(&rbd_dev->snaps);
2726 init_rwsem(&rbd_dev->header_rwsem); 3507 init_rwsem(&rbd_dev->header_rwsem);
2727 3508
2728 rbd_dev->spec = spec; 3509 rbd_dev->spec = spec;
@@ -2740,96 +3521,11 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2740 3521
2741static void rbd_dev_destroy(struct rbd_device *rbd_dev) 3522static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2742{ 3523{
2743 rbd_spec_put(rbd_dev->parent_spec);
2744 kfree(rbd_dev->header_name);
2745 rbd_put_client(rbd_dev->rbd_client); 3524 rbd_put_client(rbd_dev->rbd_client);
2746 rbd_spec_put(rbd_dev->spec); 3525 rbd_spec_put(rbd_dev->spec);
2747 kfree(rbd_dev); 3526 kfree(rbd_dev);
2748} 3527}
2749 3528
2750static bool rbd_snap_registered(struct rbd_snap *snap)
2751{
2752 bool ret = snap->dev.type == &rbd_snap_device_type;
2753 bool reg = device_is_registered(&snap->dev);
2754
2755 rbd_assert(!ret ^ reg);
2756
2757 return ret;
2758}
2759
2760static void rbd_remove_snap_dev(struct rbd_snap *snap)
2761{
2762 list_del(&snap->node);
2763 if (device_is_registered(&snap->dev))
2764 device_unregister(&snap->dev);
2765}
2766
2767static int rbd_register_snap_dev(struct rbd_snap *snap,
2768 struct device *parent)
2769{
2770 struct device *dev = &snap->dev;
2771 int ret;
2772
2773 dev->type = &rbd_snap_device_type;
2774 dev->parent = parent;
2775 dev->release = rbd_snap_dev_release;
2776 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2777 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2778
2779 ret = device_register(dev);
2780
2781 return ret;
2782}
2783
2784static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2785 const char *snap_name,
2786 u64 snap_id, u64 snap_size,
2787 u64 snap_features)
2788{
2789 struct rbd_snap *snap;
2790 int ret;
2791
2792 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2793 if (!snap)
2794 return ERR_PTR(-ENOMEM);
2795
2796 ret = -ENOMEM;
2797 snap->name = kstrdup(snap_name, GFP_KERNEL);
2798 if (!snap->name)
2799 goto err;
2800
2801 snap->id = snap_id;
2802 snap->size = snap_size;
2803 snap->features = snap_features;
2804
2805 return snap;
2806
2807err:
2808 kfree(snap->name);
2809 kfree(snap);
2810
2811 return ERR_PTR(ret);
2812}
2813
2814static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2815 u64 *snap_size, u64 *snap_features)
2816{
2817 char *snap_name;
2818
2819 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2820
2821 *snap_size = rbd_dev->header.snap_sizes[which];
2822 *snap_features = 0; /* No features for v1 */
2823
2824 /* Skip over names until we find the one we are looking for */
2825
2826 snap_name = rbd_dev->header.snap_names;
2827 while (which--)
2828 snap_name += strlen(snap_name) + 1;
2829
2830 return snap_name;
2831}
2832
2833/* 3529/*
2834 * Get the size and object order for an image snapshot, or if 3530 * Get the size and object order for an image snapshot, or if
2835 * snap_id is CEPH_NOSNAP, gets this information for the base 3531 * snap_id is CEPH_NOSNAP, gets this information for the base
@@ -2847,18 +3543,21 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2847 3543
2848 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3544 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2849 "rbd", "get_size", 3545 "rbd", "get_size",
2850 (char *) &snapid, sizeof (snapid), 3546 &snapid, sizeof (snapid),
2851 (char *) &size_buf, sizeof (size_buf), NULL); 3547 &size_buf, sizeof (size_buf));
2852 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3548 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2853 if (ret < 0) 3549 if (ret < 0)
2854 return ret; 3550 return ret;
3551 if (ret < sizeof (size_buf))
3552 return -ERANGE;
2855 3553
2856 *order = size_buf.order; 3554 if (order)
3555 *order = size_buf.order;
2857 *snap_size = le64_to_cpu(size_buf.size); 3556 *snap_size = le64_to_cpu(size_buf.size);
2858 3557
2859 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n", 3558 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2860 (unsigned long long) snap_id, (unsigned int) *order, 3559 (unsigned long long)snap_id, (unsigned int)*order,
2861 (unsigned long long) *snap_size); 3560 (unsigned long long)*snap_size);
2862 3561
2863 return 0; 3562 return 0;
2864} 3563}
@@ -2881,17 +3580,16 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2881 return -ENOMEM; 3580 return -ENOMEM;
2882 3581
2883 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3582 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2884 "rbd", "get_object_prefix", 3583 "rbd", "get_object_prefix", NULL, 0,
2885 NULL, 0, 3584 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
2886 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2887 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3585 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2888 if (ret < 0) 3586 if (ret < 0)
2889 goto out; 3587 goto out;
2890 3588
2891 p = reply_buf; 3589 p = reply_buf;
2892 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, 3590 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2893 p + RBD_OBJ_PREFIX_LEN_MAX, 3591 p + ret, NULL, GFP_NOIO);
2894 NULL, GFP_NOIO); 3592 ret = 0;
2895 3593
2896 if (IS_ERR(rbd_dev->header.object_prefix)) { 3594 if (IS_ERR(rbd_dev->header.object_prefix)) {
2897 ret = PTR_ERR(rbd_dev->header.object_prefix); 3595 ret = PTR_ERR(rbd_dev->header.object_prefix);
@@ -2899,7 +3597,6 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2899 } else { 3597 } else {
2900 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix); 3598 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2901 } 3599 }
2902
2903out: 3600out:
2904 kfree(reply_buf); 3601 kfree(reply_buf);
2905 3602
@@ -2913,29 +3610,30 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2913 struct { 3610 struct {
2914 __le64 features; 3611 __le64 features;
2915 __le64 incompat; 3612 __le64 incompat;
2916 } features_buf = { 0 }; 3613 } __attribute__ ((packed)) features_buf = { 0 };
2917 u64 incompat; 3614 u64 incompat;
2918 int ret; 3615 int ret;
2919 3616
2920 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3617 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2921 "rbd", "get_features", 3618 "rbd", "get_features",
2922 (char *) &snapid, sizeof (snapid), 3619 &snapid, sizeof (snapid),
2923 (char *) &features_buf, sizeof (features_buf), 3620 &features_buf, sizeof (features_buf));
2924 NULL);
2925 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3621 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2926 if (ret < 0) 3622 if (ret < 0)
2927 return ret; 3623 return ret;
3624 if (ret < sizeof (features_buf))
3625 return -ERANGE;
2928 3626
2929 incompat = le64_to_cpu(features_buf.incompat); 3627 incompat = le64_to_cpu(features_buf.incompat);
2930 if (incompat & ~RBD_FEATURES_ALL) 3628 if (incompat & ~RBD_FEATURES_SUPPORTED)
2931 return -ENXIO; 3629 return -ENXIO;
2932 3630
2933 *snap_features = le64_to_cpu(features_buf.features); 3631 *snap_features = le64_to_cpu(features_buf.features);
2934 3632
2935 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", 3633 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2936 (unsigned long long) snap_id, 3634 (unsigned long long)snap_id,
2937 (unsigned long long) *snap_features, 3635 (unsigned long long)*snap_features,
2938 (unsigned long long) le64_to_cpu(features_buf.incompat)); 3636 (unsigned long long)le64_to_cpu(features_buf.incompat));
2939 3637
2940 return 0; 3638 return 0;
2941} 3639}
@@ -2975,15 +3673,15 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2975 snapid = cpu_to_le64(CEPH_NOSNAP); 3673 snapid = cpu_to_le64(CEPH_NOSNAP);
2976 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3674 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2977 "rbd", "get_parent", 3675 "rbd", "get_parent",
2978 (char *) &snapid, sizeof (snapid), 3676 &snapid, sizeof (snapid),
2979 (char *) reply_buf, size, NULL); 3677 reply_buf, size);
2980 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3678 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2981 if (ret < 0) 3679 if (ret < 0)
2982 goto out_err; 3680 goto out_err;
2983 3681
2984 ret = -ERANGE;
2985 p = reply_buf; 3682 p = reply_buf;
2986 end = (char *) reply_buf + size; 3683 end = reply_buf + ret;
3684 ret = -ERANGE;
2987 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err); 3685 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2988 if (parent_spec->pool_id == CEPH_NOPOOL) 3686 if (parent_spec->pool_id == CEPH_NOPOOL)
2989 goto out; /* No parent? No problem. */ 3687 goto out; /* No parent? No problem. */
@@ -2991,8 +3689,11 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2991 /* The ceph file layout needs to fit pool id in 32 bits */ 3689 /* The ceph file layout needs to fit pool id in 32 bits */
2992 3690
2993 ret = -EIO; 3691 ret = -EIO;
2994 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX)) 3692 if (parent_spec->pool_id > (u64)U32_MAX) {
2995 goto out; 3693 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3694 (unsigned long long)parent_spec->pool_id, U32_MAX);
3695 goto out_err;
3696 }
2996 3697
2997 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 3698 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2998 if (IS_ERR(image_id)) { 3699 if (IS_ERR(image_id)) {
@@ -3015,6 +3716,56 @@ out_err:
3015 return ret; 3716 return ret;
3016} 3717}
3017 3718
3719static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3720{
3721 struct {
3722 __le64 stripe_unit;
3723 __le64 stripe_count;
3724 } __attribute__ ((packed)) striping_info_buf = { 0 };
3725 size_t size = sizeof (striping_info_buf);
3726 void *p;
3727 u64 obj_size;
3728 u64 stripe_unit;
3729 u64 stripe_count;
3730 int ret;
3731
3732 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3733 "rbd", "get_stripe_unit_count", NULL, 0,
3734 (char *)&striping_info_buf, size);
3735 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3736 if (ret < 0)
3737 return ret;
3738 if (ret < size)
3739 return -ERANGE;
3740
3741 /*
3742 * We don't actually support the "fancy striping" feature
3743 * (STRIPINGV2) yet, but if the striping sizes are the
3744 * defaults the behavior is the same as before. So find
3745 * out, and only fail if the image has non-default values.
3746 */
3747 ret = -EINVAL;
3748 obj_size = (u64)1 << rbd_dev->header.obj_order;
3749 p = &striping_info_buf;
3750 stripe_unit = ceph_decode_64(&p);
3751 if (stripe_unit != obj_size) {
3752 rbd_warn(rbd_dev, "unsupported stripe unit "
3753 "(got %llu want %llu)",
3754 stripe_unit, obj_size);
3755 return -EINVAL;
3756 }
3757 stripe_count = ceph_decode_64(&p);
3758 if (stripe_count != 1) {
3759 rbd_warn(rbd_dev, "unsupported stripe count "
3760 "(got %llu want 1)", stripe_count);
3761 return -EINVAL;
3762 }
3763 rbd_dev->header.stripe_unit = stripe_unit;
3764 rbd_dev->header.stripe_count = stripe_count;
3765
3766 return 0;
3767}
3768
3018static char *rbd_dev_image_name(struct rbd_device *rbd_dev) 3769static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3019{ 3770{
3020 size_t image_id_size; 3771 size_t image_id_size;
@@ -3036,8 +3787,8 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3036 return NULL; 3787 return NULL;
3037 3788
3038 p = image_id; 3789 p = image_id;
3039 end = (char *) image_id + image_id_size; 3790 end = image_id + image_id_size;
3040 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len); 3791 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3041 3792
3042 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; 3793 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3043 reply_buf = kmalloc(size, GFP_KERNEL); 3794 reply_buf = kmalloc(size, GFP_KERNEL);
@@ -3047,11 +3798,12 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3047 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY, 3798 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3048 "rbd", "dir_get_name", 3799 "rbd", "dir_get_name",
3049 image_id, image_id_size, 3800 image_id, image_id_size,
3050 (char *) reply_buf, size, NULL); 3801 reply_buf, size);
3051 if (ret < 0) 3802 if (ret < 0)
3052 goto out; 3803 goto out;
3053 p = reply_buf; 3804 p = reply_buf;
3054 end = (char *) reply_buf + size; 3805 end = reply_buf + ret;
3806
3055 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); 3807 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3056 if (IS_ERR(image_name)) 3808 if (IS_ERR(image_name))
3057 image_name = NULL; 3809 image_name = NULL;
@@ -3064,69 +3816,134 @@ out:
3064 return image_name; 3816 return image_name;
3065} 3817}
3066 3818
3819static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3820{
3821 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3822 const char *snap_name;
3823 u32 which = 0;
3824
3825 /* Skip over names until we find the one we are looking for */
3826
3827 snap_name = rbd_dev->header.snap_names;
3828 while (which < snapc->num_snaps) {
3829 if (!strcmp(name, snap_name))
3830 return snapc->snaps[which];
3831 snap_name += strlen(snap_name) + 1;
3832 which++;
3833 }
3834 return CEPH_NOSNAP;
3835}
3836
3837static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3838{
3839 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3840 u32 which;
3841 bool found = false;
3842 u64 snap_id;
3843
3844 for (which = 0; !found && which < snapc->num_snaps; which++) {
3845 const char *snap_name;
3846
3847 snap_id = snapc->snaps[which];
3848 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3849 if (IS_ERR(snap_name))
3850 break;
3851 found = !strcmp(name, snap_name);
3852 kfree(snap_name);
3853 }
3854 return found ? snap_id : CEPH_NOSNAP;
3855}
3856
3067/* 3857/*
3068 * When a parent image gets probed, we only have the pool, image, 3858 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3069 * and snapshot ids but not the names of any of them. This call 3859 * no snapshot by that name is found, or if an error occurs.
3070 * is made later to fill in those names. It has to be done after
3071 * rbd_dev_snaps_update() has completed because some of the
3072 * information (in particular, snapshot name) is not available
3073 * until then.
3074 */ 3860 */
3075static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev) 3861static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3076{ 3862{
3077 struct ceph_osd_client *osdc; 3863 if (rbd_dev->image_format == 1)
3078 const char *name; 3864 return rbd_v1_snap_id_by_name(rbd_dev, name);
3079 void *reply_buf = NULL; 3865
3866 return rbd_v2_snap_id_by_name(rbd_dev, name);
3867}
3868
3869/*
3870 * When an rbd image has a parent image, it is identified by the
3871 * pool, image, and snapshot ids (not names). This function fills
3872 * in the names for those ids. (It's OK if we can't figure out the
3873 * name for an image id, but the pool and snapshot ids should always
3874 * exist and have names.) All names in an rbd spec are dynamically
3875 * allocated.
3876 *
3877 * When an image being mapped (not a parent) is probed, we have the
3878 * pool name and pool id, image name and image id, and the snapshot
3879 * name. The only thing we're missing is the snapshot id.
3880 */
3881static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3882{
3883 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3884 struct rbd_spec *spec = rbd_dev->spec;
3885 const char *pool_name;
3886 const char *image_name;
3887 const char *snap_name;
3080 int ret; 3888 int ret;
3081 3889
3082 if (rbd_dev->spec->pool_name) 3890 /*
3083 return 0; /* Already have the names */ 3891 * An image being mapped will have the pool name (etc.), but
3892 * we need to look up the snapshot id.
3893 */
3894 if (spec->pool_name) {
3895 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3896 u64 snap_id;
3897
3898 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3899 if (snap_id == CEPH_NOSNAP)
3900 return -ENOENT;
3901 spec->snap_id = snap_id;
3902 } else {
3903 spec->snap_id = CEPH_NOSNAP;
3904 }
3084 3905
3085 /* Look up the pool name */ 3906 return 0;
3907 }
3086 3908
3087 osdc = &rbd_dev->rbd_client->client->osdc; 3909 /* Get the pool name; we have to make our own copy of this */
3088 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id); 3910
3089 if (!name) { 3911 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3090 rbd_warn(rbd_dev, "there is no pool with id %llu", 3912 if (!pool_name) {
3091 rbd_dev->spec->pool_id); /* Really a BUG() */ 3913 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3092 return -EIO; 3914 return -EIO;
3093 } 3915 }
3094 3916 pool_name = kstrdup(pool_name, GFP_KERNEL);
3095 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL); 3917 if (!pool_name)
3096 if (!rbd_dev->spec->pool_name)
3097 return -ENOMEM; 3918 return -ENOMEM;
3098 3919
3099 /* Fetch the image name; tolerate failure here */ 3920 /* Fetch the image name; tolerate failure here */
3100 3921
3101 name = rbd_dev_image_name(rbd_dev); 3922 image_name = rbd_dev_image_name(rbd_dev);
3102 if (name) 3923 if (!image_name)
3103 rbd_dev->spec->image_name = (char *) name;
3104 else
3105 rbd_warn(rbd_dev, "unable to get image name"); 3924 rbd_warn(rbd_dev, "unable to get image name");
3106 3925
3107 /* Look up the snapshot name. */ 3926 /* Look up the snapshot name, and make a copy */
3108 3927
3109 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id); 3928 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3110 if (!name) { 3929 if (!snap_name) {
3111 rbd_warn(rbd_dev, "no snapshot with id %llu", 3930 ret = -ENOMEM;
3112 rbd_dev->spec->snap_id); /* Really a BUG() */
3113 ret = -EIO;
3114 goto out_err; 3931 goto out_err;
3115 } 3932 }
3116 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL); 3933
3117 if(!rbd_dev->spec->snap_name) 3934 spec->pool_name = pool_name;
3118 goto out_err; 3935 spec->image_name = image_name;
3936 spec->snap_name = snap_name;
3119 3937
3120 return 0; 3938 return 0;
3121out_err: 3939out_err:
3122 kfree(reply_buf); 3940 kfree(image_name);
3123 kfree(rbd_dev->spec->pool_name); 3941 kfree(pool_name);
3124 rbd_dev->spec->pool_name = NULL;
3125 3942
3126 return ret; 3943 return ret;
3127} 3944}
3128 3945
3129static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver) 3946static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3130{ 3947{
3131 size_t size; 3948 size_t size;
3132 int ret; 3949 int ret;
@@ -3151,16 +3968,15 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3151 return -ENOMEM; 3968 return -ENOMEM;
3152 3969
3153 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3970 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3154 "rbd", "get_snapcontext", 3971 "rbd", "get_snapcontext", NULL, 0,
3155 NULL, 0, 3972 reply_buf, size);
3156 reply_buf, size, ver);
3157 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3973 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3158 if (ret < 0) 3974 if (ret < 0)
3159 goto out; 3975 goto out;
3160 3976
3161 ret = -ERANGE;
3162 p = reply_buf; 3977 p = reply_buf;
3163 end = (char *) reply_buf + size; 3978 end = reply_buf + ret;
3979 ret = -ERANGE;
3164 ceph_decode_64_safe(&p, end, seq, out); 3980 ceph_decode_64_safe(&p, end, seq, out);
3165 ceph_decode_32_safe(&p, end, snap_count, out); 3981 ceph_decode_32_safe(&p, end, snap_count, out);
3166 3982
@@ -3177,37 +3993,33 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3177 } 3993 }
3178 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64))) 3994 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3179 goto out; 3995 goto out;
3996 ret = 0;
3180 3997
3181 size = sizeof (struct ceph_snap_context) + 3998 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3182 snap_count * sizeof (snapc->snaps[0]);
3183 snapc = kmalloc(size, GFP_KERNEL);
3184 if (!snapc) { 3999 if (!snapc) {
3185 ret = -ENOMEM; 4000 ret = -ENOMEM;
3186 goto out; 4001 goto out;
3187 } 4002 }
3188
3189 atomic_set(&snapc->nref, 1);
3190 snapc->seq = seq; 4003 snapc->seq = seq;
3191 snapc->num_snaps = snap_count;
3192 for (i = 0; i < snap_count; i++) 4004 for (i = 0; i < snap_count; i++)
3193 snapc->snaps[i] = ceph_decode_64(&p); 4005 snapc->snaps[i] = ceph_decode_64(&p);
3194 4006
3195 rbd_dev->header.snapc = snapc; 4007 rbd_dev->header.snapc = snapc;
3196 4008
3197 dout(" snap context seq = %llu, snap_count = %u\n", 4009 dout(" snap context seq = %llu, snap_count = %u\n",
3198 (unsigned long long) seq, (unsigned int) snap_count); 4010 (unsigned long long)seq, (unsigned int)snap_count);
3199
3200out: 4011out:
3201 kfree(reply_buf); 4012 kfree(reply_buf);
3202 4013
3203 return 0; 4014 return ret;
3204} 4015}
3205 4016
3206static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which) 4017static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4018 u64 snap_id)
3207{ 4019{
3208 size_t size; 4020 size_t size;
3209 void *reply_buf; 4021 void *reply_buf;
3210 __le64 snap_id; 4022 __le64 snapid;
3211 int ret; 4023 int ret;
3212 void *p; 4024 void *p;
3213 void *end; 4025 void *end;
@@ -3218,236 +4030,52 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3218 if (!reply_buf) 4030 if (!reply_buf)
3219 return ERR_PTR(-ENOMEM); 4031 return ERR_PTR(-ENOMEM);
3220 4032
3221 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]); 4033 snapid = cpu_to_le64(snap_id);
3222 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 4034 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3223 "rbd", "get_snapshot_name", 4035 "rbd", "get_snapshot_name",
3224 (char *) &snap_id, sizeof (snap_id), 4036 &snapid, sizeof (snapid),
3225 reply_buf, size, NULL); 4037 reply_buf, size);
3226 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4038 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3227 if (ret < 0) 4039 if (ret < 0) {
4040 snap_name = ERR_PTR(ret);
3228 goto out; 4041 goto out;
4042 }
3229 4043
3230 p = reply_buf; 4044 p = reply_buf;
3231 end = (char *) reply_buf + size; 4045 end = reply_buf + ret;
3232 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 4046 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3233 if (IS_ERR(snap_name)) { 4047 if (IS_ERR(snap_name))
3234 ret = PTR_ERR(snap_name);
3235 goto out; 4048 goto out;
3236 } else {
3237 dout(" snap_id 0x%016llx snap_name = %s\n",
3238 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3239 }
3240 kfree(reply_buf);
3241 4049
3242 return snap_name; 4050 dout(" snap_id 0x%016llx snap_name = %s\n",
4051 (unsigned long long)snap_id, snap_name);
3243out: 4052out:
3244 kfree(reply_buf); 4053 kfree(reply_buf);
3245 4054
3246 return ERR_PTR(ret); 4055 return snap_name;
3247}
3248
3249static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3250 u64 *snap_size, u64 *snap_features)
3251{
3252 u64 snap_id;
3253 u8 order;
3254 int ret;
3255
3256 snap_id = rbd_dev->header.snapc->snaps[which];
3257 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3258 if (ret)
3259 return ERR_PTR(ret);
3260 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3261 if (ret)
3262 return ERR_PTR(ret);
3263
3264 return rbd_dev_v2_snap_name(rbd_dev, which);
3265}
3266
3267static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3268 u64 *snap_size, u64 *snap_features)
3269{
3270 if (rbd_dev->image_format == 1)
3271 return rbd_dev_v1_snap_info(rbd_dev, which,
3272 snap_size, snap_features);
3273 if (rbd_dev->image_format == 2)
3274 return rbd_dev_v2_snap_info(rbd_dev, which,
3275 snap_size, snap_features);
3276 return ERR_PTR(-EINVAL);
3277} 4056}
3278 4057
3279static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver) 4058static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
3280{ 4059{
3281 int ret; 4060 int ret;
3282 __u8 obj_order;
3283 4061
3284 down_write(&rbd_dev->header_rwsem); 4062 down_write(&rbd_dev->header_rwsem);
3285 4063
3286 /* Grab old order first, to see if it changes */
3287
3288 obj_order = rbd_dev->header.obj_order,
3289 ret = rbd_dev_v2_image_size(rbd_dev); 4064 ret = rbd_dev_v2_image_size(rbd_dev);
3290 if (ret) 4065 if (ret)
3291 goto out; 4066 goto out;
3292 if (rbd_dev->header.obj_order != obj_order) {
3293 ret = -EIO;
3294 goto out;
3295 }
3296 rbd_update_mapping_size(rbd_dev); 4067 rbd_update_mapping_size(rbd_dev);
3297 4068
3298 ret = rbd_dev_v2_snap_context(rbd_dev, hver); 4069 ret = rbd_dev_v2_snap_context(rbd_dev);
3299 dout("rbd_dev_v2_snap_context returned %d\n", ret); 4070 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3300 if (ret) 4071 if (ret)
3301 goto out; 4072 goto out;
3302 ret = rbd_dev_snaps_update(rbd_dev);
3303 dout("rbd_dev_snaps_update returned %d\n", ret);
3304 if (ret)
3305 goto out;
3306 ret = rbd_dev_snaps_register(rbd_dev);
3307 dout("rbd_dev_snaps_register returned %d\n", ret);
3308out: 4073out:
3309 up_write(&rbd_dev->header_rwsem); 4074 up_write(&rbd_dev->header_rwsem);
3310 4075
3311 return ret; 4076 return ret;
3312} 4077}
3313 4078
3314/*
3315 * Scan the rbd device's current snapshot list and compare it to the
3316 * newly-received snapshot context. Remove any existing snapshots
3317 * not present in the new snapshot context. Add a new snapshot for
3318 * any snaphots in the snapshot context not in the current list.
3319 * And verify there are no changes to snapshots we already know
3320 * about.
3321 *
3322 * Assumes the snapshots in the snapshot context are sorted by
3323 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3324 * are also maintained in that order.)
3325 */
3326static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3327{
3328 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3329 const u32 snap_count = snapc->num_snaps;
3330 struct list_head *head = &rbd_dev->snaps;
3331 struct list_head *links = head->next;
3332 u32 index = 0;
3333
3334 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3335 while (index < snap_count || links != head) {
3336 u64 snap_id;
3337 struct rbd_snap *snap;
3338 char *snap_name;
3339 u64 snap_size = 0;
3340 u64 snap_features = 0;
3341
3342 snap_id = index < snap_count ? snapc->snaps[index]
3343 : CEPH_NOSNAP;
3344 snap = links != head ? list_entry(links, struct rbd_snap, node)
3345 : NULL;
3346 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3347
3348 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3349 struct list_head *next = links->next;
3350
3351 /*
3352 * A previously-existing snapshot is not in
3353 * the new snap context.
3354 *
3355 * If the now missing snapshot is the one the
3356 * image is mapped to, clear its exists flag
3357 * so we can avoid sending any more requests
3358 * to it.
3359 */
3360 if (rbd_dev->spec->snap_id == snap->id)
3361 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3362 rbd_remove_snap_dev(snap);
3363 dout("%ssnap id %llu has been removed\n",
3364 rbd_dev->spec->snap_id == snap->id ?
3365 "mapped " : "",
3366 (unsigned long long) snap->id);
3367
3368 /* Done with this list entry; advance */
3369
3370 links = next;
3371 continue;
3372 }
3373
3374 snap_name = rbd_dev_snap_info(rbd_dev, index,
3375 &snap_size, &snap_features);
3376 if (IS_ERR(snap_name))
3377 return PTR_ERR(snap_name);
3378
3379 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3380 (unsigned long long) snap_id);
3381 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3382 struct rbd_snap *new_snap;
3383
3384 /* We haven't seen this snapshot before */
3385
3386 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3387 snap_id, snap_size, snap_features);
3388 if (IS_ERR(new_snap)) {
3389 int err = PTR_ERR(new_snap);
3390
3391 dout(" failed to add dev, error %d\n", err);
3392
3393 return err;
3394 }
3395
3396 /* New goes before existing, or at end of list */
3397
3398 dout(" added dev%s\n", snap ? "" : " at end\n");
3399 if (snap)
3400 list_add_tail(&new_snap->node, &snap->node);
3401 else
3402 list_add_tail(&new_snap->node, head);
3403 } else {
3404 /* Already have this one */
3405
3406 dout(" already present\n");
3407
3408 rbd_assert(snap->size == snap_size);
3409 rbd_assert(!strcmp(snap->name, snap_name));
3410 rbd_assert(snap->features == snap_features);
3411
3412 /* Done with this list entry; advance */
3413
3414 links = links->next;
3415 }
3416
3417 /* Advance to the next entry in the snapshot context */
3418
3419 index++;
3420 }
3421 dout("%s: done\n", __func__);
3422
3423 return 0;
3424}
3425
3426/*
3427 * Scan the list of snapshots and register the devices for any that
3428 * have not already been registered.
3429 */
3430static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3431{
3432 struct rbd_snap *snap;
3433 int ret = 0;
3434
3435 dout("%s:\n", __func__);
3436 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3437 return -EIO;
3438
3439 list_for_each_entry(snap, &rbd_dev->snaps, node) {
3440 if (!rbd_snap_registered(snap)) {
3441 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3442 if (ret < 0)
3443 break;
3444 }
3445 }
3446 dout("%s: returning %d\n", __func__, ret);
3447
3448 return ret;
3449}
3450
3451static int rbd_bus_add_dev(struct rbd_device *rbd_dev) 4079static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3452{ 4080{
3453 struct device *dev; 4081 struct device *dev;
@@ -3459,7 +4087,7 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3459 dev->bus = &rbd_bus_type; 4087 dev->bus = &rbd_bus_type;
3460 dev->type = &rbd_device_type; 4088 dev->type = &rbd_device_type;
3461 dev->parent = &rbd_root_dev; 4089 dev->parent = &rbd_root_dev;
3462 dev->release = rbd_dev_release; 4090 dev->release = rbd_dev_device_release;
3463 dev_set_name(dev, "%d", rbd_dev->dev_id); 4091 dev_set_name(dev, "%d", rbd_dev->dev_id);
3464 ret = device_register(dev); 4092 ret = device_register(dev);
3465 4093
@@ -3673,6 +4301,7 @@ static int rbd_add_parse_args(const char *buf,
3673 size_t len; 4301 size_t len;
3674 char *options; 4302 char *options;
3675 const char *mon_addrs; 4303 const char *mon_addrs;
4304 char *snap_name;
3676 size_t mon_addrs_size; 4305 size_t mon_addrs_size;
3677 struct rbd_spec *spec = NULL; 4306 struct rbd_spec *spec = NULL;
3678 struct rbd_options *rbd_opts = NULL; 4307 struct rbd_options *rbd_opts = NULL;
@@ -3731,10 +4360,11 @@ static int rbd_add_parse_args(const char *buf,
3731 ret = -ENAMETOOLONG; 4360 ret = -ENAMETOOLONG;
3732 goto out_err; 4361 goto out_err;
3733 } 4362 }
3734 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL); 4363 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3735 if (!spec->snap_name) 4364 if (!snap_name)
3736 goto out_mem; 4365 goto out_mem;
3737 *(spec->snap_name + len) = '\0'; 4366 *(snap_name + len) = '\0';
4367 spec->snap_name = snap_name;
3738 4368
3739 /* Initialize all rbd options to the defaults */ 4369 /* Initialize all rbd options to the defaults */
3740 4370
@@ -3788,15 +4418,19 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3788 size_t size; 4418 size_t size;
3789 char *object_name; 4419 char *object_name;
3790 void *response; 4420 void *response;
3791 void *p; 4421 char *image_id;
3792 4422
3793 /* 4423 /*
3794 * When probing a parent image, the image id is already 4424 * When probing a parent image, the image id is already
3795 * known (and the image name likely is not). There's no 4425 * known (and the image name likely is not). There's no
3796 * need to fetch the image id again in this case. 4426 * need to fetch the image id again in this case. We
4427 * do still need to set the image format though.
3797 */ 4428 */
3798 if (rbd_dev->spec->image_id) 4429 if (rbd_dev->spec->image_id) {
4430 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4431
3799 return 0; 4432 return 0;
4433 }
3800 4434
3801 /* 4435 /*
3802 * First, see if the format 2 image id file exists, and if 4436 * First, see if the format 2 image id file exists, and if
@@ -3818,23 +4452,32 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3818 goto out; 4452 goto out;
3819 } 4453 }
3820 4454
4455 /* If it doesn't exist we'll assume it's a format 1 image */
4456
3821 ret = rbd_obj_method_sync(rbd_dev, object_name, 4457 ret = rbd_obj_method_sync(rbd_dev, object_name,
3822 "rbd", "get_id", 4458 "rbd", "get_id", NULL, 0,
3823 NULL, 0, 4459 response, RBD_IMAGE_ID_LEN_MAX);
3824 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3825 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4460 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3826 if (ret < 0) 4461 if (ret == -ENOENT) {
3827 goto out; 4462 image_id = kstrdup("", GFP_KERNEL);
3828 4463 ret = image_id ? 0 : -ENOMEM;
3829 p = response; 4464 if (!ret)
3830 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p, 4465 rbd_dev->image_format = 1;
3831 p + RBD_IMAGE_ID_LEN_MAX, 4466 } else if (ret > sizeof (__le32)) {
4467 void *p = response;
4468
4469 image_id = ceph_extract_encoded_string(&p, p + ret,
3832 NULL, GFP_NOIO); 4470 NULL, GFP_NOIO);
3833 if (IS_ERR(rbd_dev->spec->image_id)) { 4471 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
3834 ret = PTR_ERR(rbd_dev->spec->image_id); 4472 if (!ret)
3835 rbd_dev->spec->image_id = NULL; 4473 rbd_dev->image_format = 2;
3836 } else { 4474 } else {
3837 dout("image_id is %s\n", rbd_dev->spec->image_id); 4475 ret = -EINVAL;
4476 }
4477
4478 if (!ret) {
4479 rbd_dev->spec->image_id = image_id;
4480 dout("image_id is %s\n", image_id);
3838 } 4481 }
3839out: 4482out:
3840 kfree(response); 4483 kfree(response);
@@ -3843,27 +4486,30 @@ out:
3843 return ret; 4486 return ret;
3844} 4487}
3845 4488
3846static int rbd_dev_v1_probe(struct rbd_device *rbd_dev) 4489/* Undo whatever state changes are made by v1 or v2 image probe */
4490
4491static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
3847{ 4492{
3848 int ret; 4493 struct rbd_image_header *header;
3849 size_t size;
3850 4494
3851 /* Version 1 images have no id; empty string is used */ 4495 rbd_dev_remove_parent(rbd_dev);
4496 rbd_spec_put(rbd_dev->parent_spec);
4497 rbd_dev->parent_spec = NULL;
4498 rbd_dev->parent_overlap = 0;
3852 4499
3853 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL); 4500 /* Free dynamic fields from the header, then zero it out */
3854 if (!rbd_dev->spec->image_id)
3855 return -ENOMEM;
3856 4501
3857 /* Record the header object name for this rbd image. */ 4502 header = &rbd_dev->header;
4503 ceph_put_snap_context(header->snapc);
4504 kfree(header->snap_sizes);
4505 kfree(header->snap_names);
4506 kfree(header->object_prefix);
4507 memset(header, 0, sizeof (*header));
4508}
3858 4509
3859 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX); 4510static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3860 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 4511{
3861 if (!rbd_dev->header_name) { 4512 int ret;
3862 ret = -ENOMEM;
3863 goto out_err;
3864 }
3865 sprintf(rbd_dev->header_name, "%s%s",
3866 rbd_dev->spec->image_name, RBD_SUFFIX);
3867 4513
3868 /* Populate rbd image metadata */ 4514 /* Populate rbd image metadata */
3869 4515
@@ -3876,8 +4522,6 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3876 rbd_dev->parent_spec = NULL; 4522 rbd_dev->parent_spec = NULL;
3877 rbd_dev->parent_overlap = 0; 4523 rbd_dev->parent_overlap = 0;
3878 4524
3879 rbd_dev->image_format = 1;
3880
3881 dout("discovered version 1 image, header name is %s\n", 4525 dout("discovered version 1 image, header name is %s\n",
3882 rbd_dev->header_name); 4526 rbd_dev->header_name);
3883 4527
@@ -3894,43 +4538,45 @@ out_err:
3894 4538
3895static int rbd_dev_v2_probe(struct rbd_device *rbd_dev) 4539static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3896{ 4540{
3897 size_t size;
3898 int ret; 4541 int ret;
3899 u64 ver = 0;
3900
3901 /*
3902 * Image id was filled in by the caller. Record the header
3903 * object name for this rbd image.
3904 */
3905 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3906 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3907 if (!rbd_dev->header_name)
3908 return -ENOMEM;
3909 sprintf(rbd_dev->header_name, "%s%s",
3910 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3911
3912 /* Get the size and object order for the image */
3913 4542
3914 ret = rbd_dev_v2_image_size(rbd_dev); 4543 ret = rbd_dev_v2_image_size(rbd_dev);
3915 if (ret < 0) 4544 if (ret)
3916 goto out_err; 4545 goto out_err;
3917 4546
3918 /* Get the object prefix (a.k.a. block_name) for the image */ 4547 /* Get the object prefix (a.k.a. block_name) for the image */
3919 4548
3920 ret = rbd_dev_v2_object_prefix(rbd_dev); 4549 ret = rbd_dev_v2_object_prefix(rbd_dev);
3921 if (ret < 0) 4550 if (ret)
3922 goto out_err; 4551 goto out_err;
3923 4552
3924 /* Get the and check features for the image */ 4553 /* Get the and check features for the image */
3925 4554
3926 ret = rbd_dev_v2_features(rbd_dev); 4555 ret = rbd_dev_v2_features(rbd_dev);
3927 if (ret < 0) 4556 if (ret)
3928 goto out_err; 4557 goto out_err;
3929 4558
3930 /* If the image supports layering, get the parent info */ 4559 /* If the image supports layering, get the parent info */
3931 4560
3932 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { 4561 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3933 ret = rbd_dev_v2_parent_info(rbd_dev); 4562 ret = rbd_dev_v2_parent_info(rbd_dev);
4563 if (ret)
4564 goto out_err;
4565
4566 /*
4567 * Don't print a warning for parent images. We can
4568 * tell this point because we won't know its pool
4569 * name yet (just its pool id).
4570 */
4571 if (rbd_dev->spec->pool_name)
4572 rbd_warn(rbd_dev, "WARNING: kernel layering "
4573 "is EXPERIMENTAL!");
4574 }
4575
4576 /* If the image supports fancy striping, get its parameters */
4577
4578 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4579 ret = rbd_dev_v2_striping_info(rbd_dev);
3934 if (ret < 0) 4580 if (ret < 0)
3935 goto out_err; 4581 goto out_err;
3936 } 4582 }
@@ -3942,12 +4588,9 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3942 4588
3943 /* Get the snapshot context, plus the header version */ 4589 /* Get the snapshot context, plus the header version */
3944 4590
3945 ret = rbd_dev_v2_snap_context(rbd_dev, &ver); 4591 ret = rbd_dev_v2_snap_context(rbd_dev);
3946 if (ret) 4592 if (ret)
3947 goto out_err; 4593 goto out_err;
3948 rbd_dev->header.obj_version = ver;
3949
3950 rbd_dev->image_format = 2;
3951 4594
3952 dout("discovered version 2 image, header name is %s\n", 4595 dout("discovered version 2 image, header name is %s\n",
3953 rbd_dev->header_name); 4596 rbd_dev->header_name);
@@ -3965,22 +4608,54 @@ out_err:
3965 return ret; 4608 return ret;
3966} 4609}
3967 4610
3968static int rbd_dev_probe_finish(struct rbd_device *rbd_dev) 4611static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
3969{ 4612{
4613 struct rbd_device *parent = NULL;
4614 struct rbd_spec *parent_spec;
4615 struct rbd_client *rbdc;
3970 int ret; 4616 int ret;
3971 4617
3972 /* no need to lock here, as rbd_dev is not registered yet */ 4618 if (!rbd_dev->parent_spec)
3973 ret = rbd_dev_snaps_update(rbd_dev); 4619 return 0;
3974 if (ret) 4620 /*
3975 return ret; 4621 * We need to pass a reference to the client and the parent
4622 * spec when creating the parent rbd_dev. Images related by
4623 * parent/child relationships always share both.
4624 */
4625 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4626 rbdc = __rbd_get_client(rbd_dev->rbd_client);
3976 4627
3977 ret = rbd_dev_probe_update_spec(rbd_dev); 4628 ret = -ENOMEM;
3978 if (ret) 4629 parent = rbd_dev_create(rbdc, parent_spec);
3979 goto err_out_snaps; 4630 if (!parent)
4631 goto out_err;
4632
4633 ret = rbd_dev_image_probe(parent);
4634 if (ret < 0)
4635 goto out_err;
4636 rbd_dev->parent = parent;
4637
4638 return 0;
4639out_err:
4640 if (parent) {
4641 rbd_spec_put(rbd_dev->parent_spec);
4642 kfree(rbd_dev->header_name);
4643 rbd_dev_destroy(parent);
4644 } else {
4645 rbd_put_client(rbdc);
4646 rbd_spec_put(parent_spec);
4647 }
4648
4649 return ret;
4650}
4651
4652static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4653{
4654 int ret;
3980 4655
3981 ret = rbd_dev_set_mapping(rbd_dev); 4656 ret = rbd_dev_mapping_set(rbd_dev);
3982 if (ret) 4657 if (ret)
3983 goto err_out_snaps; 4658 return ret;
3984 4659
3985 /* generate unique id: find highest unique id, add one */ 4660 /* generate unique id: find highest unique id, add one */
3986 rbd_dev_id_get(rbd_dev); 4661 rbd_dev_id_get(rbd_dev);
@@ -4007,54 +4682,81 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4007 if (ret) 4682 if (ret)
4008 goto err_out_disk; 4683 goto err_out_disk;
4009 4684
4010 /*
4011 * At this point cleanup in the event of an error is the job
4012 * of the sysfs code (initiated by rbd_bus_del_dev()).
4013 */
4014 down_write(&rbd_dev->header_rwsem);
4015 ret = rbd_dev_snaps_register(rbd_dev);
4016 up_write(&rbd_dev->header_rwsem);
4017 if (ret)
4018 goto err_out_bus;
4019
4020 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4021 if (ret)
4022 goto err_out_bus;
4023
4024 /* Everything's ready. Announce the disk to the world. */ 4685 /* Everything's ready. Announce the disk to the world. */
4025 4686
4687 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4688 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4026 add_disk(rbd_dev->disk); 4689 add_disk(rbd_dev->disk);
4027 4690
4028 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, 4691 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4029 (unsigned long long) rbd_dev->mapping.size); 4692 (unsigned long long) rbd_dev->mapping.size);
4030 4693
4031 return ret; 4694 return ret;
4032err_out_bus:
4033 /* this will also clean up rest of rbd_dev stuff */
4034 4695
4035 rbd_bus_del_dev(rbd_dev);
4036
4037 return ret;
4038err_out_disk: 4696err_out_disk:
4039 rbd_free_disk(rbd_dev); 4697 rbd_free_disk(rbd_dev);
4040err_out_blkdev: 4698err_out_blkdev:
4041 unregister_blkdev(rbd_dev->major, rbd_dev->name); 4699 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4042err_out_id: 4700err_out_id:
4043 rbd_dev_id_put(rbd_dev); 4701 rbd_dev_id_put(rbd_dev);
4044err_out_snaps: 4702 rbd_dev_mapping_clear(rbd_dev);
4045 rbd_remove_all_snaps(rbd_dev);
4046 4703
4047 return ret; 4704 return ret;
4048} 4705}
4049 4706
4707static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4708{
4709 struct rbd_spec *spec = rbd_dev->spec;
4710 size_t size;
4711
4712 /* Record the header object name for this rbd image. */
4713
4714 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4715
4716 if (rbd_dev->image_format == 1)
4717 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4718 else
4719 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4720
4721 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4722 if (!rbd_dev->header_name)
4723 return -ENOMEM;
4724
4725 if (rbd_dev->image_format == 1)
4726 sprintf(rbd_dev->header_name, "%s%s",
4727 spec->image_name, RBD_SUFFIX);
4728 else
4729 sprintf(rbd_dev->header_name, "%s%s",
4730 RBD_HEADER_PREFIX, spec->image_id);
4731 return 0;
4732}
4733
4734static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4735{
4736 int ret;
4737
4738 rbd_dev_unprobe(rbd_dev);
4739 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4740 if (ret)
4741 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4742 kfree(rbd_dev->header_name);
4743 rbd_dev->header_name = NULL;
4744 rbd_dev->image_format = 0;
4745 kfree(rbd_dev->spec->image_id);
4746 rbd_dev->spec->image_id = NULL;
4747
4748 rbd_dev_destroy(rbd_dev);
4749}
4750
4050/* 4751/*
4051 * Probe for the existence of the header object for the given rbd 4752 * Probe for the existence of the header object for the given rbd
4052 * device. For format 2 images this includes determining the image 4753 * device. For format 2 images this includes determining the image
4053 * id. 4754 * id.
4054 */ 4755 */
4055static int rbd_dev_probe(struct rbd_device *rbd_dev) 4756static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4056{ 4757{
4057 int ret; 4758 int ret;
4759 int tmp;
4058 4760
4059 /* 4761 /*
4060 * Get the id from the image id object. If it's not a 4762 * Get the id from the image id object. If it's not a
@@ -4063,18 +4765,48 @@ static int rbd_dev_probe(struct rbd_device *rbd_dev)
4063 */ 4765 */
4064 ret = rbd_dev_image_id(rbd_dev); 4766 ret = rbd_dev_image_id(rbd_dev);
4065 if (ret) 4767 if (ret)
4768 return ret;
4769 rbd_assert(rbd_dev->spec->image_id);
4770 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4771
4772 ret = rbd_dev_header_name(rbd_dev);
4773 if (ret)
4774 goto err_out_format;
4775
4776 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4777 if (ret)
4778 goto out_header_name;
4779
4780 if (rbd_dev->image_format == 1)
4066 ret = rbd_dev_v1_probe(rbd_dev); 4781 ret = rbd_dev_v1_probe(rbd_dev);
4067 else 4782 else
4068 ret = rbd_dev_v2_probe(rbd_dev); 4783 ret = rbd_dev_v2_probe(rbd_dev);
4069 if (ret) { 4784 if (ret)
4070 dout("probe failed, returning %d\n", ret); 4785 goto err_out_watch;
4071
4072 return ret;
4073 }
4074 4786
4075 ret = rbd_dev_probe_finish(rbd_dev); 4787 ret = rbd_dev_spec_update(rbd_dev);
4076 if (ret) 4788 if (ret)
4077 rbd_header_free(&rbd_dev->header); 4789 goto err_out_probe;
4790
4791 ret = rbd_dev_probe_parent(rbd_dev);
4792 if (!ret)
4793 return 0;
4794
4795err_out_probe:
4796 rbd_dev_unprobe(rbd_dev);
4797err_out_watch:
4798 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4799 if (tmp)
4800 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4801out_header_name:
4802 kfree(rbd_dev->header_name);
4803 rbd_dev->header_name = NULL;
4804err_out_format:
4805 rbd_dev->image_format = 0;
4806 kfree(rbd_dev->spec->image_id);
4807 rbd_dev->spec->image_id = NULL;
4808
4809 dout("probe failed, returning %d\n", ret);
4078 4810
4079 return ret; 4811 return ret;
4080} 4812}
@@ -4111,11 +4843,13 @@ static ssize_t rbd_add(struct bus_type *bus,
4111 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name); 4843 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4112 if (rc < 0) 4844 if (rc < 0)
4113 goto err_out_client; 4845 goto err_out_client;
4114 spec->pool_id = (u64) rc; 4846 spec->pool_id = (u64)rc;
4115 4847
4116 /* The ceph file layout needs to fit pool id in 32 bits */ 4848 /* The ceph file layout needs to fit pool id in 32 bits */
4117 4849
4118 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) { 4850 if (spec->pool_id > (u64)U32_MAX) {
4851 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4852 (unsigned long long)spec->pool_id, U32_MAX);
4119 rc = -EIO; 4853 rc = -EIO;
4120 goto err_out_client; 4854 goto err_out_client;
4121 } 4855 }
@@ -4130,11 +4864,15 @@ static ssize_t rbd_add(struct bus_type *bus,
4130 kfree(rbd_opts); 4864 kfree(rbd_opts);
4131 rbd_opts = NULL; /* done with this */ 4865 rbd_opts = NULL; /* done with this */
4132 4866
4133 rc = rbd_dev_probe(rbd_dev); 4867 rc = rbd_dev_image_probe(rbd_dev);
4134 if (rc < 0) 4868 if (rc < 0)
4135 goto err_out_rbd_dev; 4869 goto err_out_rbd_dev;
4136 4870
4137 return count; 4871 rc = rbd_dev_device_setup(rbd_dev);
4872 if (!rc)
4873 return count;
4874
4875 rbd_dev_image_release(rbd_dev);
4138err_out_rbd_dev: 4876err_out_rbd_dev:
4139 rbd_dev_destroy(rbd_dev); 4877 rbd_dev_destroy(rbd_dev);
4140err_out_client: 4878err_out_client:
@@ -4149,7 +4887,7 @@ err_out_module:
4149 4887
4150 dout("Error adding device %s\n", buf); 4888 dout("Error adding device %s\n", buf);
4151 4889
4152 return (ssize_t) rc; 4890 return (ssize_t)rc;
4153} 4891}
4154 4892
4155static struct rbd_device *__rbd_get_dev(unsigned long dev_id) 4893static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
@@ -4169,27 +4907,43 @@ static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4169 return NULL; 4907 return NULL;
4170} 4908}
4171 4909
4172static void rbd_dev_release(struct device *dev) 4910static void rbd_dev_device_release(struct device *dev)
4173{ 4911{
4174 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4912 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4175 4913
4176 if (rbd_dev->watch_event)
4177 rbd_dev_header_watch_sync(rbd_dev, 0);
4178
4179 /* clean up and free blkdev */
4180 rbd_free_disk(rbd_dev); 4914 rbd_free_disk(rbd_dev);
4915 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4916 rbd_dev_clear_mapping(rbd_dev);
4181 unregister_blkdev(rbd_dev->major, rbd_dev->name); 4917 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4182 4918 rbd_dev->major = 0;
4183 /* release allocated disk header fields */
4184 rbd_header_free(&rbd_dev->header);
4185
4186 /* done with the id, and with the rbd_dev */
4187 rbd_dev_id_put(rbd_dev); 4919 rbd_dev_id_put(rbd_dev);
4188 rbd_assert(rbd_dev->rbd_client != NULL); 4920 rbd_dev_mapping_clear(rbd_dev);
4189 rbd_dev_destroy(rbd_dev); 4921}
4190 4922
4191 /* release module ref */ 4923static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4192 module_put(THIS_MODULE); 4924{
4925 while (rbd_dev->parent) {
4926 struct rbd_device *first = rbd_dev;
4927 struct rbd_device *second = first->parent;
4928 struct rbd_device *third;
4929
4930 /*
4931 * Follow to the parent with no grandparent and
4932 * remove it.
4933 */
4934 while (second && (third = second->parent)) {
4935 first = second;
4936 second = third;
4937 }
4938 rbd_assert(second);
4939 rbd_dev_image_release(second);
4940 first->parent = NULL;
4941 first->parent_overlap = 0;
4942
4943 rbd_assert(first->parent_spec);
4944 rbd_spec_put(first->parent_spec);
4945 first->parent_spec = NULL;
4946 }
4193} 4947}
4194 4948
4195static ssize_t rbd_remove(struct bus_type *bus, 4949static ssize_t rbd_remove(struct bus_type *bus,
@@ -4197,13 +4951,13 @@ static ssize_t rbd_remove(struct bus_type *bus,
4197 size_t count) 4951 size_t count)
4198{ 4952{
4199 struct rbd_device *rbd_dev = NULL; 4953 struct rbd_device *rbd_dev = NULL;
4200 int target_id, rc; 4954 int target_id;
4201 unsigned long ul; 4955 unsigned long ul;
4202 int ret = count; 4956 int ret;
4203 4957
4204 rc = strict_strtoul(buf, 10, &ul); 4958 ret = strict_strtoul(buf, 10, &ul);
4205 if (rc) 4959 if (ret)
4206 return rc; 4960 return ret;
4207 4961
4208 /* convert to int; abort if we lost anything in the conversion */ 4962 /* convert to int; abort if we lost anything in the conversion */
4209 target_id = (int) ul; 4963 target_id = (int) ul;
@@ -4226,10 +4980,10 @@ static ssize_t rbd_remove(struct bus_type *bus,
4226 spin_unlock_irq(&rbd_dev->lock); 4980 spin_unlock_irq(&rbd_dev->lock);
4227 if (ret < 0) 4981 if (ret < 0)
4228 goto done; 4982 goto done;
4229 4983 ret = count;
4230 rbd_remove_all_snaps(rbd_dev);
4231 rbd_bus_del_dev(rbd_dev); 4984 rbd_bus_del_dev(rbd_dev);
4232 4985 rbd_dev_image_release(rbd_dev);
4986 module_put(THIS_MODULE);
4233done: 4987done:
4234 mutex_unlock(&ctl_mutex); 4988 mutex_unlock(&ctl_mutex);
4235 4989
@@ -4261,6 +5015,56 @@ static void rbd_sysfs_cleanup(void)
4261 device_unregister(&rbd_root_dev); 5015 device_unregister(&rbd_root_dev);
4262} 5016}
4263 5017
5018static int rbd_slab_init(void)
5019{
5020 rbd_assert(!rbd_img_request_cache);
5021 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5022 sizeof (struct rbd_img_request),
5023 __alignof__(struct rbd_img_request),
5024 0, NULL);
5025 if (!rbd_img_request_cache)
5026 return -ENOMEM;
5027
5028 rbd_assert(!rbd_obj_request_cache);
5029 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5030 sizeof (struct rbd_obj_request),
5031 __alignof__(struct rbd_obj_request),
5032 0, NULL);
5033 if (!rbd_obj_request_cache)
5034 goto out_err;
5035
5036 rbd_assert(!rbd_segment_name_cache);
5037 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5038 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5039 if (rbd_segment_name_cache)
5040 return 0;
5041out_err:
5042 if (rbd_obj_request_cache) {
5043 kmem_cache_destroy(rbd_obj_request_cache);
5044 rbd_obj_request_cache = NULL;
5045 }
5046
5047 kmem_cache_destroy(rbd_img_request_cache);
5048 rbd_img_request_cache = NULL;
5049
5050 return -ENOMEM;
5051}
5052
5053static void rbd_slab_exit(void)
5054{
5055 rbd_assert(rbd_segment_name_cache);
5056 kmem_cache_destroy(rbd_segment_name_cache);
5057 rbd_segment_name_cache = NULL;
5058
5059 rbd_assert(rbd_obj_request_cache);
5060 kmem_cache_destroy(rbd_obj_request_cache);
5061 rbd_obj_request_cache = NULL;
5062
5063 rbd_assert(rbd_img_request_cache);
5064 kmem_cache_destroy(rbd_img_request_cache);
5065 rbd_img_request_cache = NULL;
5066}
5067
4264static int __init rbd_init(void) 5068static int __init rbd_init(void)
4265{ 5069{
4266 int rc; 5070 int rc;
@@ -4270,16 +5074,22 @@ static int __init rbd_init(void)
4270 5074
4271 return -EINVAL; 5075 return -EINVAL;
4272 } 5076 }
4273 rc = rbd_sysfs_init(); 5077 rc = rbd_slab_init();
4274 if (rc) 5078 if (rc)
4275 return rc; 5079 return rc;
4276 pr_info("loaded " RBD_DRV_NAME_LONG "\n"); 5080 rc = rbd_sysfs_init();
4277 return 0; 5081 if (rc)
5082 rbd_slab_exit();
5083 else
5084 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5085
5086 return rc;
4278} 5087}
4279 5088
4280static void __exit rbd_exit(void) 5089static void __exit rbd_exit(void)
4281{ 5090{
4282 rbd_sysfs_cleanup(); 5091 rbd_sysfs_cleanup();
5092 rbd_slab_exit();
4283} 5093}
4284 5094
4285module_init(rbd_init); 5095module_init(rbd_init);