aboutsummaryrefslogtreecommitdiffstats
path: root/drivers
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2013-02-28 20:43:09 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2013-02-28 20:43:09 -0500
commit1cf0209c431fa7790253c532039d53b0773193aa (patch)
tree24310eaaf4c9583988d9098f6c85a4a34970b5b9 /drivers
parentde1a2262b006220dae2561a299a6ea128c46f4fe (diff)
parent83ca14fdd35821554058e5fd4fa7b118ee504a33 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil: "A few groups of patches here. Alex has been hard at work improving the RBD code, layout groundwork for understanding the new formats and doing layering. Most of the infrastructure is now in place for the final bits that will come with the next window. There are a few changes to the data layout. Jim Schutt's patch fixes some non-ideal CRUSH behavior, and a set of patches from me updates the client to speak a newer version of the protocol and implement an improved hashing strategy across storage nodes (when the server side supports it too). A pair of patches from Sam Lang fix the atomicity of open+create operations. Several patches from Yan, Zheng fix various mds/client issues that turned up during multi-mds torture tests. A final set of patches expose file layouts via virtual xattrs, and allow the policies to be set on directories via xattrs as well (avoiding the awkward ioctl interface and providing a consistent interface for both kernel mount and ceph-fuse users)." * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (143 commits) libceph: add support for HASHPSPOOL pool flag libceph: update osd request/reply encoding libceph: calculate placement based on the internal data types ceph: update support for PGID64, PGPOOL3, OSDENC protocol features ceph: update "ceph_features.h" libceph: decode into cpu-native ceph_pg type libceph: rename ceph_pg -> ceph_pg_v1 rbd: pass length, not op for osd completions rbd: move rbd_osd_trivial_callback() libceph: use a do..while loop in con_work() libceph: use a flag to indicate a fault has occurred libceph: separate non-locked fault handling libceph: encapsulate connection backoff libceph: eliminate sparse warnings ceph: eliminate sparse warnings in fs code rbd: eliminate sparse warnings libceph: define connection flag helpers rbd: normalize dout() calls rbd: barriers are hard rbd: ignore zero-length requests ...
Diffstat (limited to 'drivers')
-rw-r--r--drivers/block/rbd.c1852
1 files changed, 1161 insertions, 691 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 89576a0b3f2e..6c81a4c040b9 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -52,9 +52,12 @@
52#define SECTOR_SHIFT 9 52#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT) 53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54 54
55/* It might be useful to have this defined elsewhere too */ 55/* It might be useful to have these defined elsewhere */
56 56
57#define U64_MAX ((u64) (~0ULL)) 57#define U8_MAX ((u8) (~0U))
58#define U16_MAX ((u16) (~0U))
59#define U32_MAX ((u32) (~0U))
60#define U64_MAX ((u64) (~0ULL))
58 61
59#define RBD_DRV_NAME "rbd" 62#define RBD_DRV_NAME "rbd"
60#define RBD_DRV_NAME_LONG "rbd (rados block device)" 63#define RBD_DRV_NAME_LONG "rbd (rados block device)"
@@ -66,7 +69,6 @@
66 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) 69 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67 70
68#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ 71#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69#define RBD_MAX_OPT_LEN 1024
70 72
71#define RBD_SNAP_HEAD_NAME "-" 73#define RBD_SNAP_HEAD_NAME "-"
72 74
@@ -93,8 +95,6 @@
93#define DEV_NAME_LEN 32 95#define DEV_NAME_LEN 32
94#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1) 96#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
95 97
96#define RBD_READ_ONLY_DEFAULT false
97
98/* 98/*
99 * block device image metadata (in-memory version) 99 * block device image metadata (in-memory version)
100 */ 100 */
@@ -119,16 +119,33 @@ struct rbd_image_header {
119 * An rbd image specification. 119 * An rbd image specification.
120 * 120 *
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely 121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122 * identify an image. 122 * identify an image. Each rbd_dev structure includes a pointer to
123 * an rbd_spec structure that encapsulates this identity.
124 *
125 * Each of the id's in an rbd_spec has an associated name. For a
126 * user-mapped image, the names are supplied and the id's associated
127 * with them are looked up. For a layered image, a parent image is
128 * defined by the tuple, and the names are looked up.
129 *
130 * An rbd_dev structure contains a parent_spec pointer which is
131 * non-null if the image it represents is a child in a layered
132 * image. This pointer will refer to the rbd_spec structure used
133 * by the parent rbd_dev for its own identity (i.e., the structure
134 * is shared between the parent and child).
135 *
136 * Since these structures are populated once, during the discovery
137 * phase of image construction, they are effectively immutable so
138 * we make no effort to synchronize access to them.
139 *
140 * Note that code herein does not assume the image name is known (it
141 * could be a null pointer).
123 */ 142 */
124struct rbd_spec { 143struct rbd_spec {
125 u64 pool_id; 144 u64 pool_id;
126 char *pool_name; 145 char *pool_name;
127 146
128 char *image_id; 147 char *image_id;
129 size_t image_id_len;
130 char *image_name; 148 char *image_name;
131 size_t image_name_len;
132 149
133 u64 snap_id; 150 u64 snap_id;
134 char *snap_name; 151 char *snap_name;
@@ -136,10 +153,6 @@ struct rbd_spec {
136 struct kref kref; 153 struct kref kref;
137}; 154};
138 155
139struct rbd_options {
140 bool read_only;
141};
142
143/* 156/*
144 * an instance of the client. multiple devices may share an rbd client. 157 * an instance of the client. multiple devices may share an rbd client.
145 */ 158 */
@@ -149,37 +162,76 @@ struct rbd_client {
149 struct list_head node; 162 struct list_head node;
150}; 163};
151 164
152/* 165struct rbd_img_request;
153 * a request completion status 166typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
154 */ 167
155struct rbd_req_status { 168#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
156 int done; 169
157 int rc; 170struct rbd_obj_request;
158 u64 bytes; 171typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172
173enum obj_request_type {
174 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
159}; 175};
160 176
161/* 177struct rbd_obj_request {
162 * a collection of requests 178 const char *object_name;
163 */ 179 u64 offset; /* object start byte */
164struct rbd_req_coll { 180 u64 length; /* bytes from offset */
165 int total; 181
166 int num_done; 182 struct rbd_img_request *img_request;
183 struct list_head links; /* img_request->obj_requests */
184 u32 which; /* posn image request list */
185
186 enum obj_request_type type;
187 union {
188 struct bio *bio_list;
189 struct {
190 struct page **pages;
191 u32 page_count;
192 };
193 };
194
195 struct ceph_osd_request *osd_req;
196
197 u64 xferred; /* bytes transferred */
198 u64 version;
199 int result;
200 atomic_t done;
201
202 rbd_obj_callback_t callback;
203 struct completion completion;
204
167 struct kref kref; 205 struct kref kref;
168 struct rbd_req_status status[0];
169}; 206};
170 207
171/* 208struct rbd_img_request {
172 * a single io request 209 struct request *rq;
173 */ 210 struct rbd_device *rbd_dev;
174struct rbd_request { 211 u64 offset; /* starting image byte offset */
175 struct request *rq; /* blk layer request */ 212 u64 length; /* byte count from offset */
176 struct bio *bio; /* cloned bio */ 213 bool write_request; /* false for read */
177 struct page **pages; /* list of used pages */ 214 union {
178 u64 len; 215 struct ceph_snap_context *snapc; /* for writes */
179 int coll_index; 216 u64 snap_id; /* for reads */
180 struct rbd_req_coll *coll; 217 };
218 spinlock_t completion_lock;/* protects next_completion */
219 u32 next_completion;
220 rbd_img_callback_t callback;
221
222 u32 obj_request_count;
223 struct list_head obj_requests; /* rbd_obj_request structs */
224
225 struct kref kref;
181}; 226};
182 227
228#define for_each_obj_request(ireq, oreq) \
229 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
230#define for_each_obj_request_from(ireq, oreq) \
231 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
232#define for_each_obj_request_safe(ireq, oreq, n) \
233 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
234
183struct rbd_snap { 235struct rbd_snap {
184 struct device dev; 236 struct device dev;
185 const char *name; 237 const char *name;
@@ -209,16 +261,18 @@ struct rbd_device {
209 261
210 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 262 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
211 263
212 spinlock_t lock; /* queue lock */ 264 spinlock_t lock; /* queue, flags, open_count */
213 265
214 struct rbd_image_header header; 266 struct rbd_image_header header;
215 bool exists; 267 unsigned long flags; /* possibly lock protected */
216 struct rbd_spec *spec; 268 struct rbd_spec *spec;
217 269
218 char *header_name; 270 char *header_name;
219 271
272 struct ceph_file_layout layout;
273
220 struct ceph_osd_event *watch_event; 274 struct ceph_osd_event *watch_event;
221 struct ceph_osd_request *watch_request; 275 struct rbd_obj_request *watch_request;
222 276
223 struct rbd_spec *parent_spec; 277 struct rbd_spec *parent_spec;
224 u64 parent_overlap; 278 u64 parent_overlap;
@@ -235,7 +289,19 @@ struct rbd_device {
235 289
236 /* sysfs related */ 290 /* sysfs related */
237 struct device dev; 291 struct device dev;
238 unsigned long open_count; 292 unsigned long open_count; /* protected by lock */
293};
294
295/*
296 * Flag bits for rbd_dev->flags. If atomicity is required,
297 * rbd_dev->lock is used to protect access.
298 *
299 * Currently, only the "removing" flag (which is coupled with the
300 * "open_count" field) requires atomic access.
301 */
302enum rbd_dev_flags {
303 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
304 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
239}; 305};
240 306
241static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ 307static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
@@ -277,6 +343,33 @@ static struct device rbd_root_dev = {
277 .release = rbd_root_dev_release, 343 .release = rbd_root_dev_release,
278}; 344};
279 345
346static __printf(2, 3)
347void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
348{
349 struct va_format vaf;
350 va_list args;
351
352 va_start(args, fmt);
353 vaf.fmt = fmt;
354 vaf.va = &args;
355
356 if (!rbd_dev)
357 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
358 else if (rbd_dev->disk)
359 printk(KERN_WARNING "%s: %s: %pV\n",
360 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
361 else if (rbd_dev->spec && rbd_dev->spec->image_name)
362 printk(KERN_WARNING "%s: image %s: %pV\n",
363 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
364 else if (rbd_dev->spec && rbd_dev->spec->image_id)
365 printk(KERN_WARNING "%s: id %s: %pV\n",
366 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
367 else /* punt */
368 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
369 RBD_DRV_NAME, rbd_dev, &vaf);
370 va_end(args);
371}
372
280#ifdef RBD_DEBUG 373#ifdef RBD_DEBUG
281#define rbd_assert(expr) \ 374#define rbd_assert(expr) \
282 if (unlikely(!(expr))) { \ 375 if (unlikely(!(expr))) { \
@@ -296,14 +389,23 @@ static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
296static int rbd_open(struct block_device *bdev, fmode_t mode) 389static int rbd_open(struct block_device *bdev, fmode_t mode)
297{ 390{
298 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 391 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
392 bool removing = false;
299 393
300 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only) 394 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
301 return -EROFS; 395 return -EROFS;
302 396
397 spin_lock_irq(&rbd_dev->lock);
398 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
399 removing = true;
400 else
401 rbd_dev->open_count++;
402 spin_unlock_irq(&rbd_dev->lock);
403 if (removing)
404 return -ENOENT;
405
303 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 406 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
304 (void) get_device(&rbd_dev->dev); 407 (void) get_device(&rbd_dev->dev);
305 set_device_ro(bdev, rbd_dev->mapping.read_only); 408 set_device_ro(bdev, rbd_dev->mapping.read_only);
306 rbd_dev->open_count++;
307 mutex_unlock(&ctl_mutex); 409 mutex_unlock(&ctl_mutex);
308 410
309 return 0; 411 return 0;
@@ -312,10 +414,14 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
312static int rbd_release(struct gendisk *disk, fmode_t mode) 414static int rbd_release(struct gendisk *disk, fmode_t mode)
313{ 415{
314 struct rbd_device *rbd_dev = disk->private_data; 416 struct rbd_device *rbd_dev = disk->private_data;
417 unsigned long open_count_before;
418
419 spin_lock_irq(&rbd_dev->lock);
420 open_count_before = rbd_dev->open_count--;
421 spin_unlock_irq(&rbd_dev->lock);
422 rbd_assert(open_count_before > 0);
315 423
316 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 424 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
317 rbd_assert(rbd_dev->open_count > 0);
318 rbd_dev->open_count--;
319 put_device(&rbd_dev->dev); 425 put_device(&rbd_dev->dev);
320 mutex_unlock(&ctl_mutex); 426 mutex_unlock(&ctl_mutex);
321 427
@@ -337,7 +443,7 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
337 struct rbd_client *rbdc; 443 struct rbd_client *rbdc;
338 int ret = -ENOMEM; 444 int ret = -ENOMEM;
339 445
340 dout("rbd_client_create\n"); 446 dout("%s:\n", __func__);
341 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL); 447 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
342 if (!rbdc) 448 if (!rbdc)
343 goto out_opt; 449 goto out_opt;
@@ -361,8 +467,8 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
361 spin_unlock(&rbd_client_list_lock); 467 spin_unlock(&rbd_client_list_lock);
362 468
363 mutex_unlock(&ctl_mutex); 469 mutex_unlock(&ctl_mutex);
470 dout("%s: rbdc %p\n", __func__, rbdc);
364 471
365 dout("rbd_client_create created %p\n", rbdc);
366 return rbdc; 472 return rbdc;
367 473
368out_err: 474out_err:
@@ -373,6 +479,8 @@ out_mutex:
373out_opt: 479out_opt:
374 if (ceph_opts) 480 if (ceph_opts)
375 ceph_destroy_options(ceph_opts); 481 ceph_destroy_options(ceph_opts);
482 dout("%s: error %d\n", __func__, ret);
483
376 return ERR_PTR(ret); 484 return ERR_PTR(ret);
377} 485}
378 486
@@ -426,6 +534,12 @@ static match_table_t rbd_opts_tokens = {
426 {-1, NULL} 534 {-1, NULL}
427}; 535};
428 536
537struct rbd_options {
538 bool read_only;
539};
540
541#define RBD_READ_ONLY_DEFAULT false
542
429static int parse_rbd_opts_token(char *c, void *private) 543static int parse_rbd_opts_token(char *c, void *private)
430{ 544{
431 struct rbd_options *rbd_opts = private; 545 struct rbd_options *rbd_opts = private;
@@ -493,7 +607,7 @@ static void rbd_client_release(struct kref *kref)
493{ 607{
494 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 608 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
495 609
496 dout("rbd_release_client %p\n", rbdc); 610 dout("%s: rbdc %p\n", __func__, rbdc);
497 spin_lock(&rbd_client_list_lock); 611 spin_lock(&rbd_client_list_lock);
498 list_del(&rbdc->node); 612 list_del(&rbdc->node);
499 spin_unlock(&rbd_client_list_lock); 613 spin_unlock(&rbd_client_list_lock);
@@ -512,18 +626,6 @@ static void rbd_put_client(struct rbd_client *rbdc)
512 kref_put(&rbdc->kref, rbd_client_release); 626 kref_put(&rbdc->kref, rbd_client_release);
513} 627}
514 628
515/*
516 * Destroy requests collection
517 */
518static void rbd_coll_release(struct kref *kref)
519{
520 struct rbd_req_coll *coll =
521 container_of(kref, struct rbd_req_coll, kref);
522
523 dout("rbd_coll_release %p\n", coll);
524 kfree(coll);
525}
526
527static bool rbd_image_format_valid(u32 image_format) 629static bool rbd_image_format_valid(u32 image_format)
528{ 630{
529 return image_format == 1 || image_format == 2; 631 return image_format == 1 || image_format == 2;
@@ -707,7 +809,8 @@ static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
707 goto done; 809 goto done;
708 rbd_dev->mapping.read_only = true; 810 rbd_dev->mapping.read_only = true;
709 } 811 }
710 rbd_dev->exists = true; 812 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
813
711done: 814done:
712 return ret; 815 return ret;
713} 816}
@@ -724,7 +827,7 @@ static void rbd_header_free(struct rbd_image_header *header)
724 header->snapc = NULL; 827 header->snapc = NULL;
725} 828}
726 829
727static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) 830static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
728{ 831{
729 char *name; 832 char *name;
730 u64 segment; 833 u64 segment;
@@ -767,23 +870,6 @@ static u64 rbd_segment_length(struct rbd_device *rbd_dev,
767 return length; 870 return length;
768} 871}
769 872
770static int rbd_get_num_segments(struct rbd_image_header *header,
771 u64 ofs, u64 len)
772{
773 u64 start_seg;
774 u64 end_seg;
775
776 if (!len)
777 return 0;
778 if (len - 1 > U64_MAX - ofs)
779 return -ERANGE;
780
781 start_seg = ofs >> header->obj_order;
782 end_seg = (ofs + len - 1) >> header->obj_order;
783
784 return end_seg - start_seg + 1;
785}
786
787/* 873/*
788 * returns the size of an object in the image 874 * returns the size of an object in the image
789 */ 875 */
@@ -949,8 +1035,10 @@ static struct bio *bio_chain_clone_range(struct bio **bio_src,
949 unsigned int bi_size; 1035 unsigned int bi_size;
950 struct bio *bio; 1036 struct bio *bio;
951 1037
952 if (!bi) 1038 if (!bi) {
1039 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
953 goto out_err; /* EINVAL; ran out of bio's */ 1040 goto out_err; /* EINVAL; ran out of bio's */
1041 }
954 bi_size = min_t(unsigned int, bi->bi_size - off, len); 1042 bi_size = min_t(unsigned int, bi->bi_size - off, len);
955 bio = bio_clone_range(bi, off, bi_size, gfpmask); 1043 bio = bio_clone_range(bi, off, bi_size, gfpmask);
956 if (!bio) 1044 if (!bio)
@@ -976,399 +1064,721 @@ out_err:
976 return NULL; 1064 return NULL;
977} 1065}
978 1066
979/* 1067static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
980 * helpers for osd request op vectors.
981 */
982static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
983 int opcode, u32 payload_len)
984{ 1068{
985 struct ceph_osd_req_op *ops; 1069 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1070 atomic_read(&obj_request->kref.refcount));
1071 kref_get(&obj_request->kref);
1072}
1073
1074static void rbd_obj_request_destroy(struct kref *kref);
1075static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1076{
1077 rbd_assert(obj_request != NULL);
1078 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1079 atomic_read(&obj_request->kref.refcount));
1080 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1081}
1082
1083static void rbd_img_request_get(struct rbd_img_request *img_request)
1084{
1085 dout("%s: img %p (was %d)\n", __func__, img_request,
1086 atomic_read(&img_request->kref.refcount));
1087 kref_get(&img_request->kref);
1088}
1089
1090static void rbd_img_request_destroy(struct kref *kref);
1091static void rbd_img_request_put(struct rbd_img_request *img_request)
1092{
1093 rbd_assert(img_request != NULL);
1094 dout("%s: img %p (was %d)\n", __func__, img_request,
1095 atomic_read(&img_request->kref.refcount));
1096 kref_put(&img_request->kref, rbd_img_request_destroy);
1097}
1098
1099static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1100 struct rbd_obj_request *obj_request)
1101{
1102 rbd_assert(obj_request->img_request == NULL);
1103
1104 rbd_obj_request_get(obj_request);
1105 obj_request->img_request = img_request;
1106 obj_request->which = img_request->obj_request_count;
1107 rbd_assert(obj_request->which != BAD_WHICH);
1108 img_request->obj_request_count++;
1109 list_add_tail(&obj_request->links, &img_request->obj_requests);
1110 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1111 obj_request->which);
1112}
986 1113
987 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO); 1114static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
988 if (!ops) 1115 struct rbd_obj_request *obj_request)
1116{
1117 rbd_assert(obj_request->which != BAD_WHICH);
1118
1119 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1120 obj_request->which);
1121 list_del(&obj_request->links);
1122 rbd_assert(img_request->obj_request_count > 0);
1123 img_request->obj_request_count--;
1124 rbd_assert(obj_request->which == img_request->obj_request_count);
1125 obj_request->which = BAD_WHICH;
1126 rbd_assert(obj_request->img_request == img_request);
1127 obj_request->img_request = NULL;
1128 obj_request->callback = NULL;
1129 rbd_obj_request_put(obj_request);
1130}
1131
1132static bool obj_request_type_valid(enum obj_request_type type)
1133{
1134 switch (type) {
1135 case OBJ_REQUEST_NODATA:
1136 case OBJ_REQUEST_BIO:
1137 case OBJ_REQUEST_PAGES:
1138 return true;
1139 default:
1140 return false;
1141 }
1142}
1143
1144static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1145{
1146 struct ceph_osd_req_op *op;
1147 va_list args;
1148 size_t size;
1149
1150 op = kzalloc(sizeof (*op), GFP_NOIO);
1151 if (!op)
989 return NULL; 1152 return NULL;
1153 op->op = opcode;
1154 va_start(args, opcode);
1155 switch (opcode) {
1156 case CEPH_OSD_OP_READ:
1157 case CEPH_OSD_OP_WRITE:
1158 /* rbd_osd_req_op_create(READ, offset, length) */
1159 /* rbd_osd_req_op_create(WRITE, offset, length) */
1160 op->extent.offset = va_arg(args, u64);
1161 op->extent.length = va_arg(args, u64);
1162 if (opcode == CEPH_OSD_OP_WRITE)
1163 op->payload_len = op->extent.length;
1164 break;
1165 case CEPH_OSD_OP_STAT:
1166 break;
1167 case CEPH_OSD_OP_CALL:
1168 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1169 op->cls.class_name = va_arg(args, char *);
1170 size = strlen(op->cls.class_name);
1171 rbd_assert(size <= (size_t) U8_MAX);
1172 op->cls.class_len = size;
1173 op->payload_len = size;
1174
1175 op->cls.method_name = va_arg(args, char *);
1176 size = strlen(op->cls.method_name);
1177 rbd_assert(size <= (size_t) U8_MAX);
1178 op->cls.method_len = size;
1179 op->payload_len += size;
1180
1181 op->cls.argc = 0;
1182 op->cls.indata = va_arg(args, void *);
1183 size = va_arg(args, size_t);
1184 rbd_assert(size <= (size_t) U32_MAX);
1185 op->cls.indata_len = (u32) size;
1186 op->payload_len += size;
1187 break;
1188 case CEPH_OSD_OP_NOTIFY_ACK:
1189 case CEPH_OSD_OP_WATCH:
1190 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1191 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1192 op->watch.cookie = va_arg(args, u64);
1193 op->watch.ver = va_arg(args, u64);
1194 op->watch.ver = cpu_to_le64(op->watch.ver);
1195 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1196 op->watch.flag = (u8) 1;
1197 break;
1198 default:
1199 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1200 kfree(op);
1201 op = NULL;
1202 break;
1203 }
1204 va_end(args);
990 1205
991 ops[0].op = opcode; 1206 return op;
1207}
992 1208
993 /* 1209static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
994 * op extent offset and length will be set later on 1210{
995 * in calc_raw_layout() 1211 kfree(op);
996 */ 1212}
997 ops[0].payload_len = payload_len; 1213
1214static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1215 struct rbd_obj_request *obj_request)
1216{
1217 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
998 1218
999 return ops; 1219 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1000} 1220}
1001 1221
1002static void rbd_destroy_ops(struct ceph_osd_req_op *ops) 1222static void rbd_img_request_complete(struct rbd_img_request *img_request)
1003{ 1223{
1004 kfree(ops); 1224 dout("%s: img %p\n", __func__, img_request);
1225 if (img_request->callback)
1226 img_request->callback(img_request);
1227 else
1228 rbd_img_request_put(img_request);
1005} 1229}
1006 1230
1007static void rbd_coll_end_req_index(struct request *rq, 1231/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1008 struct rbd_req_coll *coll, 1232
1009 int index, 1233static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1010 int ret, u64 len)
1011{ 1234{
1012 struct request_queue *q; 1235 dout("%s: obj %p\n", __func__, obj_request);
1013 int min, max, i;
1014 1236
1015 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n", 1237 return wait_for_completion_interruptible(&obj_request->completion);
1016 coll, index, ret, (unsigned long long) len); 1238}
1017 1239
1018 if (!rq) 1240static void obj_request_done_init(struct rbd_obj_request *obj_request)
1019 return; 1241{
1242 atomic_set(&obj_request->done, 0);
1243 smp_wmb();
1244}
1020 1245
1021 if (!coll) { 1246static void obj_request_done_set(struct rbd_obj_request *obj_request)
1022 blk_end_request(rq, ret, len); 1247{
1023 return; 1248 int done;
1249
1250 done = atomic_inc_return(&obj_request->done);
1251 if (done > 1) {
1252 struct rbd_img_request *img_request = obj_request->img_request;
1253 struct rbd_device *rbd_dev;
1254
1255 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1256 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1257 obj_request);
1024 } 1258 }
1259}
1025 1260
1026 q = rq->q; 1261static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1027 1262{
1028 spin_lock_irq(q->queue_lock); 1263 smp_mb();
1029 coll->status[index].done = 1; 1264 return atomic_read(&obj_request->done) != 0;
1030 coll->status[index].rc = ret; 1265}
1031 coll->status[index].bytes = len; 1266
1032 max = min = coll->num_done; 1267static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1033 while (max < coll->total && coll->status[max].done) 1268{
1034 max++; 1269 dout("%s: obj %p cb %p\n", __func__, obj_request,
1035 1270 obj_request->callback);
1036 for (i = min; i<max; i++) { 1271 if (obj_request->callback)
1037 __blk_end_request(rq, coll->status[i].rc, 1272 obj_request->callback(obj_request);
1038 coll->status[i].bytes); 1273 else
1039 coll->num_done++; 1274 complete_all(&obj_request->completion);
1040 kref_put(&coll->kref, rbd_coll_release); 1275}
1276
1277static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1278{
1279 dout("%s: obj %p\n", __func__, obj_request);
1280 obj_request_done_set(obj_request);
1281}
1282
1283static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1284{
1285 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1286 obj_request->result, obj_request->xferred, obj_request->length);
1287 /*
1288 * ENOENT means a hole in the object. We zero-fill the
1289 * entire length of the request. A short read also implies
1290 * zero-fill to the end of the request. Either way we
1291 * update the xferred count to indicate the whole request
1292 * was satisfied.
1293 */
1294 if (obj_request->result == -ENOENT) {
1295 zero_bio_chain(obj_request->bio_list, 0);
1296 obj_request->result = 0;
1297 obj_request->xferred = obj_request->length;
1298 } else if (obj_request->xferred < obj_request->length &&
1299 !obj_request->result) {
1300 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1301 obj_request->xferred = obj_request->length;
1041 } 1302 }
1042 spin_unlock_irq(q->queue_lock); 1303 obj_request_done_set(obj_request);
1043} 1304}
1044 1305
1045static void rbd_coll_end_req(struct rbd_request *req, 1306static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1046 int ret, u64 len)
1047{ 1307{
1048 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len); 1308 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1309 obj_request->result, obj_request->length);
1310 /*
1311 * There is no such thing as a successful short write.
1312 * Our xferred value is the number of bytes transferred
1313 * back. Set it to our originally-requested length.
1314 */
1315 obj_request->xferred = obj_request->length;
1316 obj_request_done_set(obj_request);
1049} 1317}
1050 1318
1051/* 1319/*
1052 * Send ceph osd request 1320 * For a simple stat call there's nothing to do. We'll do more if
1321 * this is part of a write sequence for a layered image.
1053 */ 1322 */
1054static int rbd_do_request(struct request *rq, 1323static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1055 struct rbd_device *rbd_dev, 1324{
1056 struct ceph_snap_context *snapc, 1325 dout("%s: obj %p\n", __func__, obj_request);
1057 u64 snapid, 1326 obj_request_done_set(obj_request);
1058 const char *object_name, u64 ofs, u64 len, 1327}
1059 struct bio *bio,
1060 struct page **pages,
1061 int num_pages,
1062 int flags,
1063 struct ceph_osd_req_op *ops,
1064 struct rbd_req_coll *coll,
1065 int coll_index,
1066 void (*rbd_cb)(struct ceph_osd_request *req,
1067 struct ceph_msg *msg),
1068 struct ceph_osd_request **linger_req,
1069 u64 *ver)
1070{
1071 struct ceph_osd_request *req;
1072 struct ceph_file_layout *layout;
1073 int ret;
1074 u64 bno;
1075 struct timespec mtime = CURRENT_TIME;
1076 struct rbd_request *req_data;
1077 struct ceph_osd_request_head *reqhead;
1078 struct ceph_osd_client *osdc;
1079 1328
1080 req_data = kzalloc(sizeof(*req_data), GFP_NOIO); 1329static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1081 if (!req_data) { 1330 struct ceph_msg *msg)
1082 if (coll) 1331{
1083 rbd_coll_end_req_index(rq, coll, coll_index, 1332 struct rbd_obj_request *obj_request = osd_req->r_priv;
1084 -ENOMEM, len); 1333 u16 opcode;
1085 return -ENOMEM; 1334
1335 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1336 rbd_assert(osd_req == obj_request->osd_req);
1337 rbd_assert(!!obj_request->img_request ^
1338 (obj_request->which == BAD_WHICH));
1339
1340 if (osd_req->r_result < 0)
1341 obj_request->result = osd_req->r_result;
1342 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1343
1344 WARN_ON(osd_req->r_num_ops != 1); /* For now */
1345
1346 /*
1347 * We support a 64-bit length, but ultimately it has to be
1348 * passed to blk_end_request(), which takes an unsigned int.
1349 */
1350 obj_request->xferred = osd_req->r_reply_op_len[0];
1351 rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1352 opcode = osd_req->r_request_ops[0].op;
1353 switch (opcode) {
1354 case CEPH_OSD_OP_READ:
1355 rbd_osd_read_callback(obj_request);
1356 break;
1357 case CEPH_OSD_OP_WRITE:
1358 rbd_osd_write_callback(obj_request);
1359 break;
1360 case CEPH_OSD_OP_STAT:
1361 rbd_osd_stat_callback(obj_request);
1362 break;
1363 case CEPH_OSD_OP_CALL:
1364 case CEPH_OSD_OP_NOTIFY_ACK:
1365 case CEPH_OSD_OP_WATCH:
1366 rbd_osd_trivial_callback(obj_request);
1367 break;
1368 default:
1369 rbd_warn(NULL, "%s: unsupported op %hu\n",
1370 obj_request->object_name, (unsigned short) opcode);
1371 break;
1086 } 1372 }
1087 1373
1088 if (coll) { 1374 if (obj_request_done_test(obj_request))
1089 req_data->coll = coll; 1375 rbd_obj_request_complete(obj_request);
1090 req_data->coll_index = coll_index; 1376}
1377
1378static struct ceph_osd_request *rbd_osd_req_create(
1379 struct rbd_device *rbd_dev,
1380 bool write_request,
1381 struct rbd_obj_request *obj_request,
1382 struct ceph_osd_req_op *op)
1383{
1384 struct rbd_img_request *img_request = obj_request->img_request;
1385 struct ceph_snap_context *snapc = NULL;
1386 struct ceph_osd_client *osdc;
1387 struct ceph_osd_request *osd_req;
1388 struct timespec now;
1389 struct timespec *mtime;
1390 u64 snap_id = CEPH_NOSNAP;
1391 u64 offset = obj_request->offset;
1392 u64 length = obj_request->length;
1393
1394 if (img_request) {
1395 rbd_assert(img_request->write_request == write_request);
1396 if (img_request->write_request)
1397 snapc = img_request->snapc;
1398 else
1399 snap_id = img_request->snap_id;
1091 } 1400 }
1092 1401
1093 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n", 1402 /* Allocate and initialize the request, for the single op */
1094 object_name, (unsigned long long) ofs,
1095 (unsigned long long) len, coll, coll_index);
1096 1403
1097 osdc = &rbd_dev->rbd_client->client->osdc; 1404 osdc = &rbd_dev->rbd_client->client->osdc;
1098 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops, 1405 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1099 false, GFP_NOIO, pages, bio); 1406 if (!osd_req)
1100 if (!req) { 1407 return NULL; /* ENOMEM */
1101 ret = -ENOMEM; 1408
1102 goto done_pages; 1409 rbd_assert(obj_request_type_valid(obj_request->type));
1410 switch (obj_request->type) {
1411 case OBJ_REQUEST_NODATA:
1412 break; /* Nothing to do */
1413 case OBJ_REQUEST_BIO:
1414 rbd_assert(obj_request->bio_list != NULL);
1415 osd_req->r_bio = obj_request->bio_list;
1416 break;
1417 case OBJ_REQUEST_PAGES:
1418 osd_req->r_pages = obj_request->pages;
1419 osd_req->r_num_pages = obj_request->page_count;
1420 osd_req->r_page_alignment = offset & ~PAGE_MASK;
1421 break;
1103 } 1422 }
1104 1423
1105 req->r_callback = rbd_cb; 1424 if (write_request) {
1425 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1426 now = CURRENT_TIME;
1427 mtime = &now;
1428 } else {
1429 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1430 mtime = NULL; /* not needed for reads */
1431 offset = 0; /* These are not used... */
1432 length = 0; /* ...for osd read requests */
1433 }
1106 1434
1107 req_data->rq = rq; 1435 osd_req->r_callback = rbd_osd_req_callback;
1108 req_data->bio = bio; 1436 osd_req->r_priv = obj_request;
1109 req_data->pages = pages;
1110 req_data->len = len;
1111 1437
1112 req->r_priv = req_data; 1438 osd_req->r_oid_len = strlen(obj_request->object_name);
1439 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1440 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1113 1441
1114 reqhead = req->r_request->front.iov_base; 1442 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1115 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1116 1443
1117 strncpy(req->r_oid, object_name, sizeof(req->r_oid)); 1444 /* osd_req will get its own reference to snapc (if non-null) */
1118 req->r_oid_len = strlen(req->r_oid);
1119 1445
1120 layout = &req->r_file_layout; 1446 ceph_osdc_build_request(osd_req, offset, length, 1, op,
1121 memset(layout, 0, sizeof(*layout)); 1447 snapc, snap_id, mtime);
1122 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1123 layout->fl_stripe_count = cpu_to_le32(1);
1124 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1125 layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
1126 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1127 req, ops);
1128 rbd_assert(ret == 0);
1129 1448
1130 ceph_osdc_build_request(req, ofs, &len, 1449 return osd_req;
1131 ops, 1450}
1132 snapc,
1133 &mtime,
1134 req->r_oid, req->r_oid_len);
1135 1451
1136 if (linger_req) { 1452static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1137 ceph_osdc_set_request_linger(osdc, req); 1453{
1138 *linger_req = req; 1454 ceph_osdc_put_request(osd_req);
1139 } 1455}
1140 1456
1141 ret = ceph_osdc_start_request(osdc, req, false); 1457/* object_name is assumed to be a non-null pointer and NUL-terminated */
1142 if (ret < 0) 1458
1143 goto done_err; 1459static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1144 1460 u64 offset, u64 length,
1145 if (!rbd_cb) { 1461 enum obj_request_type type)
1146 ret = ceph_osdc_wait_request(osdc, req); 1462{
1147 if (ver) 1463 struct rbd_obj_request *obj_request;
1148 *ver = le64_to_cpu(req->r_reassert_version.version); 1464 size_t size;
1149 dout("reassert_ver=%llu\n", 1465 char *name;
1150 (unsigned long long) 1466
1151 le64_to_cpu(req->r_reassert_version.version)); 1467 rbd_assert(obj_request_type_valid(type));
1152 ceph_osdc_put_request(req); 1468
1469 size = strlen(object_name) + 1;
1470 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1471 if (!obj_request)
1472 return NULL;
1473
1474 name = (char *)(obj_request + 1);
1475 obj_request->object_name = memcpy(name, object_name, size);
1476 obj_request->offset = offset;
1477 obj_request->length = length;
1478 obj_request->which = BAD_WHICH;
1479 obj_request->type = type;
1480 INIT_LIST_HEAD(&obj_request->links);
1481 obj_request_done_init(obj_request);
1482 init_completion(&obj_request->completion);
1483 kref_init(&obj_request->kref);
1484
1485 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1486 offset, length, (int)type, obj_request);
1487
1488 return obj_request;
1489}
1490
1491static void rbd_obj_request_destroy(struct kref *kref)
1492{
1493 struct rbd_obj_request *obj_request;
1494
1495 obj_request = container_of(kref, struct rbd_obj_request, kref);
1496
1497 dout("%s: obj %p\n", __func__, obj_request);
1498
1499 rbd_assert(obj_request->img_request == NULL);
1500 rbd_assert(obj_request->which == BAD_WHICH);
1501
1502 if (obj_request->osd_req)
1503 rbd_osd_req_destroy(obj_request->osd_req);
1504
1505 rbd_assert(obj_request_type_valid(obj_request->type));
1506 switch (obj_request->type) {
1507 case OBJ_REQUEST_NODATA:
1508 break; /* Nothing to do */
1509 case OBJ_REQUEST_BIO:
1510 if (obj_request->bio_list)
1511 bio_chain_put(obj_request->bio_list);
1512 break;
1513 case OBJ_REQUEST_PAGES:
1514 if (obj_request->pages)
1515 ceph_release_page_vector(obj_request->pages,
1516 obj_request->page_count);
1517 break;
1153 } 1518 }
1154 return ret;
1155 1519
1156done_err: 1520 kfree(obj_request);
1157 bio_chain_put(req_data->bio);
1158 ceph_osdc_put_request(req);
1159done_pages:
1160 rbd_coll_end_req(req_data, ret, len);
1161 kfree(req_data);
1162 return ret;
1163} 1521}
1164 1522
1165/* 1523/*
1166 * Ceph osd op callback 1524 * Caller is responsible for filling in the list of object requests
1525 * that comprises the image request, and the Linux request pointer
1526 * (if there is one).
1167 */ 1527 */
1168static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) 1528static struct rbd_img_request *rbd_img_request_create(
1169{ 1529 struct rbd_device *rbd_dev,
1170 struct rbd_request *req_data = req->r_priv; 1530 u64 offset, u64 length,
1171 struct ceph_osd_reply_head *replyhead; 1531 bool write_request)
1172 struct ceph_osd_op *op; 1532{
1173 __s32 rc; 1533 struct rbd_img_request *img_request;
1174 u64 bytes; 1534 struct ceph_snap_context *snapc = NULL;
1175 int read_op;
1176
1177 /* parse reply */
1178 replyhead = msg->front.iov_base;
1179 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1180 op = (void *)(replyhead + 1);
1181 rc = le32_to_cpu(replyhead->result);
1182 bytes = le64_to_cpu(op->extent.length);
1183 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1184
1185 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1186 (unsigned long long) bytes, read_op, (int) rc);
1187
1188 if (rc == -ENOENT && read_op) {
1189 zero_bio_chain(req_data->bio, 0);
1190 rc = 0;
1191 } else if (rc == 0 && read_op && bytes < req_data->len) {
1192 zero_bio_chain(req_data->bio, bytes);
1193 bytes = req_data->len;
1194 }
1195 1535
1196 rbd_coll_end_req(req_data, rc, bytes); 1536 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1537 if (!img_request)
1538 return NULL;
1197 1539
1198 if (req_data->bio) 1540 if (write_request) {
1199 bio_chain_put(req_data->bio); 1541 down_read(&rbd_dev->header_rwsem);
1542 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1543 up_read(&rbd_dev->header_rwsem);
1544 if (WARN_ON(!snapc)) {
1545 kfree(img_request);
1546 return NULL; /* Shouldn't happen */
1547 }
1548 }
1200 1549
1201 ceph_osdc_put_request(req); 1550 img_request->rq = NULL;
1202 kfree(req_data); 1551 img_request->rbd_dev = rbd_dev;
1552 img_request->offset = offset;
1553 img_request->length = length;
1554 img_request->write_request = write_request;
1555 if (write_request)
1556 img_request->snapc = snapc;
1557 else
1558 img_request->snap_id = rbd_dev->spec->snap_id;
1559 spin_lock_init(&img_request->completion_lock);
1560 img_request->next_completion = 0;
1561 img_request->callback = NULL;
1562 img_request->obj_request_count = 0;
1563 INIT_LIST_HEAD(&img_request->obj_requests);
1564 kref_init(&img_request->kref);
1565
1566 rbd_img_request_get(img_request); /* Avoid a warning */
1567 rbd_img_request_put(img_request); /* TEMPORARY */
1568
1569 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1570 write_request ? "write" : "read", offset, length,
1571 img_request);
1572
1573 return img_request;
1203} 1574}
1204 1575
1205static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) 1576static void rbd_img_request_destroy(struct kref *kref)
1206{ 1577{
1207 ceph_osdc_put_request(req); 1578 struct rbd_img_request *img_request;
1579 struct rbd_obj_request *obj_request;
1580 struct rbd_obj_request *next_obj_request;
1581
1582 img_request = container_of(kref, struct rbd_img_request, kref);
1583
1584 dout("%s: img %p\n", __func__, img_request);
1585
1586 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1587 rbd_img_obj_request_del(img_request, obj_request);
1588 rbd_assert(img_request->obj_request_count == 0);
1589
1590 if (img_request->write_request)
1591 ceph_put_snap_context(img_request->snapc);
1592
1593 kfree(img_request);
1208} 1594}
1209 1595
1210/* 1596static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1211 * Do a synchronous ceph osd operation 1597 struct bio *bio_list)
1212 */
1213static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1214 struct ceph_snap_context *snapc,
1215 u64 snapid,
1216 int flags,
1217 struct ceph_osd_req_op *ops,
1218 const char *object_name,
1219 u64 ofs, u64 inbound_size,
1220 char *inbound,
1221 struct ceph_osd_request **linger_req,
1222 u64 *ver)
1223{ 1598{
1224 int ret; 1599 struct rbd_device *rbd_dev = img_request->rbd_dev;
1225 struct page **pages; 1600 struct rbd_obj_request *obj_request = NULL;
1226 int num_pages; 1601 struct rbd_obj_request *next_obj_request;
1227 1602 unsigned int bio_offset;
1228 rbd_assert(ops != NULL); 1603 u64 image_offset;
1604 u64 resid;
1605 u16 opcode;
1606
1607 dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1608
1609 opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
1610 : CEPH_OSD_OP_READ;
1611 bio_offset = 0;
1612 image_offset = img_request->offset;
1613 rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1614 resid = img_request->length;
1615 rbd_assert(resid > 0);
1616 while (resid) {
1617 const char *object_name;
1618 unsigned int clone_size;
1619 struct ceph_osd_req_op *op;
1620 u64 offset;
1621 u64 length;
1622
1623 object_name = rbd_segment_name(rbd_dev, image_offset);
1624 if (!object_name)
1625 goto out_unwind;
1626 offset = rbd_segment_offset(rbd_dev, image_offset);
1627 length = rbd_segment_length(rbd_dev, image_offset, resid);
1628 obj_request = rbd_obj_request_create(object_name,
1629 offset, length,
1630 OBJ_REQUEST_BIO);
1631 kfree(object_name); /* object request has its own copy */
1632 if (!obj_request)
1633 goto out_unwind;
1634
1635 rbd_assert(length <= (u64) UINT_MAX);
1636 clone_size = (unsigned int) length;
1637 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1638 &bio_offset, clone_size,
1639 GFP_ATOMIC);
1640 if (!obj_request->bio_list)
1641 goto out_partial;
1229 1642
1230 num_pages = calc_pages_for(ofs, inbound_size); 1643 /*
1231 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 1644 * Build up the op to use in building the osd
1232 if (IS_ERR(pages)) 1645 * request. Note that the contents of the op are
1233 return PTR_ERR(pages); 1646 * copied by rbd_osd_req_create().
1647 */
1648 op = rbd_osd_req_op_create(opcode, offset, length);
1649 if (!op)
1650 goto out_partial;
1651 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1652 img_request->write_request,
1653 obj_request, op);
1654 rbd_osd_req_op_destroy(op);
1655 if (!obj_request->osd_req)
1656 goto out_partial;
1657 /* status and version are initially zero-filled */
1658
1659 rbd_img_obj_request_add(img_request, obj_request);
1660
1661 image_offset += length;
1662 resid -= length;
1663 }
1234 1664
1235 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid, 1665 return 0;
1236 object_name, ofs, inbound_size, NULL,
1237 pages, num_pages,
1238 flags,
1239 ops,
1240 NULL, 0,
1241 NULL,
1242 linger_req, ver);
1243 if (ret < 0)
1244 goto done;
1245 1666
1246 if ((flags & CEPH_OSD_FLAG_READ) && inbound) 1667out_partial:
1247 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret); 1668 rbd_obj_request_put(obj_request);
1669out_unwind:
1670 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1671 rbd_obj_request_put(obj_request);
1248 1672
1249done: 1673 return -ENOMEM;
1250 ceph_release_page_vector(pages, num_pages);
1251 return ret;
1252} 1674}
1253 1675
1254/* 1676static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1255 * Do an asynchronous ceph osd operation 1677{
1256 */ 1678 struct rbd_img_request *img_request;
1257static int rbd_do_op(struct request *rq, 1679 u32 which = obj_request->which;
1258 struct rbd_device *rbd_dev, 1680 bool more = true;
1259 struct ceph_snap_context *snapc, 1681
1260 u64 ofs, u64 len, 1682 img_request = obj_request->img_request;
1261 struct bio *bio, 1683
1262 struct rbd_req_coll *coll, 1684 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1263 int coll_index) 1685 rbd_assert(img_request != NULL);
1264{ 1686 rbd_assert(img_request->rq != NULL);
1265 char *seg_name; 1687 rbd_assert(img_request->obj_request_count > 0);
1266 u64 seg_ofs; 1688 rbd_assert(which != BAD_WHICH);
1267 u64 seg_len; 1689 rbd_assert(which < img_request->obj_request_count);
1268 int ret; 1690 rbd_assert(which >= img_request->next_completion);
1269 struct ceph_osd_req_op *ops; 1691
1270 u32 payload_len; 1692 spin_lock_irq(&img_request->completion_lock);
1271 int opcode; 1693 if (which != img_request->next_completion)
1272 int flags; 1694 goto out;
1273 u64 snapid; 1695
1274 1696 for_each_obj_request_from(img_request, obj_request) {
1275 seg_name = rbd_segment_name(rbd_dev, ofs); 1697 unsigned int xferred;
1276 if (!seg_name) 1698 int result;
1277 return -ENOMEM; 1699
1278 seg_len = rbd_segment_length(rbd_dev, ofs, len); 1700 rbd_assert(more);
1279 seg_ofs = rbd_segment_offset(rbd_dev, ofs); 1701 rbd_assert(which < img_request->obj_request_count);
1280 1702
1281 if (rq_data_dir(rq) == WRITE) { 1703 if (!obj_request_done_test(obj_request))
1282 opcode = CEPH_OSD_OP_WRITE; 1704 break;
1283 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK; 1705
1284 snapid = CEPH_NOSNAP; 1706 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1285 payload_len = seg_len; 1707 xferred = (unsigned int) obj_request->xferred;
1286 } else { 1708 result = (int) obj_request->result;
1287 opcode = CEPH_OSD_OP_READ; 1709 if (result)
1288 flags = CEPH_OSD_FLAG_READ; 1710 rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1289 snapc = NULL; 1711 img_request->write_request ? "write" : "read",
1290 snapid = rbd_dev->spec->snap_id; 1712 result, xferred);
1291 payload_len = 0; 1713
1714 more = blk_end_request(img_request->rq, result, xferred);
1715 which++;
1292 } 1716 }
1293 1717
1294 ret = -ENOMEM; 1718 rbd_assert(more ^ (which == img_request->obj_request_count));
1295 ops = rbd_create_rw_ops(1, opcode, payload_len); 1719 img_request->next_completion = which;
1296 if (!ops) 1720out:
1297 goto done; 1721 spin_unlock_irq(&img_request->completion_lock);
1298 1722
1299 /* we've taken care of segment sizes earlier when we 1723 if (!more)
1300 cloned the bios. We should never have a segment 1724 rbd_img_request_complete(img_request);
1301 truncated at this point */
1302 rbd_assert(seg_len == len);
1303
1304 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1305 seg_name, seg_ofs, seg_len,
1306 bio,
1307 NULL, 0,
1308 flags,
1309 ops,
1310 coll, coll_index,
1311 rbd_req_cb, 0, NULL);
1312
1313 rbd_destroy_ops(ops);
1314done:
1315 kfree(seg_name);
1316 return ret;
1317} 1725}
1318 1726
1319/* 1727static int rbd_img_request_submit(struct rbd_img_request *img_request)
1320 * Request sync osd read 1728{
1321 */ 1729 struct rbd_device *rbd_dev = img_request->rbd_dev;
1322static int rbd_req_sync_read(struct rbd_device *rbd_dev, 1730 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1323 u64 snapid, 1731 struct rbd_obj_request *obj_request;
1324 const char *object_name,
1325 u64 ofs, u64 len,
1326 char *buf,
1327 u64 *ver)
1328{
1329 struct ceph_osd_req_op *ops;
1330 int ret;
1331 1732
1332 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0); 1733 dout("%s: img %p\n", __func__, img_request);
1333 if (!ops) 1734 for_each_obj_request(img_request, obj_request) {
1334 return -ENOMEM; 1735 int ret;
1335 1736
1336 ret = rbd_req_sync_op(rbd_dev, NULL, 1737 obj_request->callback = rbd_img_obj_callback;
1337 snapid, 1738 ret = rbd_obj_request_submit(osdc, obj_request);
1338 CEPH_OSD_FLAG_READ, 1739 if (ret)
1339 ops, object_name, ofs, len, buf, NULL, ver); 1740 return ret;
1340 rbd_destroy_ops(ops); 1741 /*
1742 * The image request has its own reference to each
1743 * of its object requests, so we can safely drop the
1744 * initial one here.
1745 */
1746 rbd_obj_request_put(obj_request);
1747 }
1341 1748
1342 return ret; 1749 return 0;
1343} 1750}
1344 1751
1345/* 1752static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1346 * Request sync osd watch 1753 u64 ver, u64 notify_id)
1347 */
1348static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1349 u64 ver,
1350 u64 notify_id)
1351{ 1754{
1352 struct ceph_osd_req_op *ops; 1755 struct rbd_obj_request *obj_request;
1756 struct ceph_osd_req_op *op;
1757 struct ceph_osd_client *osdc;
1353 int ret; 1758 int ret;
1354 1759
1355 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0); 1760 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1356 if (!ops) 1761 OBJ_REQUEST_NODATA);
1762 if (!obj_request)
1357 return -ENOMEM; 1763 return -ENOMEM;
1358 1764
1359 ops[0].watch.ver = cpu_to_le64(ver); 1765 ret = -ENOMEM;
1360 ops[0].watch.cookie = notify_id; 1766 op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1361 ops[0].watch.flag = 0; 1767 if (!op)
1768 goto out;
1769 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1770 obj_request, op);
1771 rbd_osd_req_op_destroy(op);
1772 if (!obj_request->osd_req)
1773 goto out;
1362 1774
1363 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP, 1775 osdc = &rbd_dev->rbd_client->client->osdc;
1364 rbd_dev->header_name, 0, 0, NULL, 1776 obj_request->callback = rbd_obj_request_put;
1365 NULL, 0, 1777 ret = rbd_obj_request_submit(osdc, obj_request);
1366 CEPH_OSD_FLAG_READ, 1778out:
1367 ops, 1779 if (ret)
1368 NULL, 0, 1780 rbd_obj_request_put(obj_request);
1369 rbd_simple_req_cb, 0, NULL);
1370 1781
1371 rbd_destroy_ops(ops);
1372 return ret; 1782 return ret;
1373} 1783}
1374 1784
@@ -1381,95 +1791,103 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1381 if (!rbd_dev) 1791 if (!rbd_dev)
1382 return; 1792 return;
1383 1793
1384 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n", 1794 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1385 rbd_dev->header_name, (unsigned long long) notify_id, 1795 rbd_dev->header_name, (unsigned long long) notify_id,
1386 (unsigned int) opcode); 1796 (unsigned int) opcode);
1387 rc = rbd_dev_refresh(rbd_dev, &hver); 1797 rc = rbd_dev_refresh(rbd_dev, &hver);
1388 if (rc) 1798 if (rc)
1389 pr_warning(RBD_DRV_NAME "%d got notification but failed to " 1799 rbd_warn(rbd_dev, "got notification but failed to "
1390 " update snaps: %d\n", rbd_dev->major, rc); 1800 " update snaps: %d\n", rc);
1391 1801
1392 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id); 1802 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1393} 1803}
1394 1804
1395/* 1805/*
1396 * Request sync osd watch 1806 * Request sync osd watch/unwatch. The value of "start" determines
1807 * whether a watch request is being initiated or torn down.
1397 */ 1808 */
1398static int rbd_req_sync_watch(struct rbd_device *rbd_dev) 1809static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1399{ 1810{
1400 struct ceph_osd_req_op *ops;
1401 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1811 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1812 struct rbd_obj_request *obj_request;
1813 struct ceph_osd_req_op *op;
1402 int ret; 1814 int ret;
1403 1815
1404 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0); 1816 rbd_assert(start ^ !!rbd_dev->watch_event);
1405 if (!ops) 1817 rbd_assert(start ^ !!rbd_dev->watch_request);
1406 return -ENOMEM;
1407 1818
1408 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, 1819 if (start) {
1409 (void *)rbd_dev, &rbd_dev->watch_event); 1820 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1410 if (ret < 0) 1821 &rbd_dev->watch_event);
1411 goto fail; 1822 if (ret < 0)
1823 return ret;
1824 rbd_assert(rbd_dev->watch_event != NULL);
1825 }
1412 1826
1413 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version); 1827 ret = -ENOMEM;
1414 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie); 1828 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1415 ops[0].watch.flag = 1; 1829 OBJ_REQUEST_NODATA);
1830 if (!obj_request)
1831 goto out_cancel;
1832
1833 op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1834 rbd_dev->watch_event->cookie,
1835 rbd_dev->header.obj_version, start);
1836 if (!op)
1837 goto out_cancel;
1838 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
1839 obj_request, op);
1840 rbd_osd_req_op_destroy(op);
1841 if (!obj_request->osd_req)
1842 goto out_cancel;
1843
1844 if (start)
1845 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1846 else
1847 ceph_osdc_unregister_linger_request(osdc,
1848 rbd_dev->watch_request->osd_req);
1849 ret = rbd_obj_request_submit(osdc, obj_request);
1850 if (ret)
1851 goto out_cancel;
1852 ret = rbd_obj_request_wait(obj_request);
1853 if (ret)
1854 goto out_cancel;
1855 ret = obj_request->result;
1856 if (ret)
1857 goto out_cancel;
1416 1858
1417 ret = rbd_req_sync_op(rbd_dev, NULL, 1859 /*
1418 CEPH_NOSNAP, 1860 * A watch request is set to linger, so the underlying osd
1419 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1861 * request won't go away until we unregister it. We retain
1420 ops, 1862 * a pointer to the object request during that time (in
1421 rbd_dev->header_name, 1863 * rbd_dev->watch_request), so we'll keep a reference to
1422 0, 0, NULL, 1864 * it. We'll drop that reference (below) after we've
1423 &rbd_dev->watch_request, NULL); 1865 * unregistered it.
1866 */
1867 if (start) {
1868 rbd_dev->watch_request = obj_request;
1424 1869
1425 if (ret < 0) 1870 return 0;
1426 goto fail_event; 1871 }
1427 1872
1428 rbd_destroy_ops(ops); 1873 /* We have successfully torn down the watch request */
1429 return 0;
1430 1874
1431fail_event: 1875 rbd_obj_request_put(rbd_dev->watch_request);
1876 rbd_dev->watch_request = NULL;
1877out_cancel:
1878 /* Cancel the event if we're tearing down, or on error */
1432 ceph_osdc_cancel_event(rbd_dev->watch_event); 1879 ceph_osdc_cancel_event(rbd_dev->watch_event);
1433 rbd_dev->watch_event = NULL; 1880 rbd_dev->watch_event = NULL;
1434fail: 1881 if (obj_request)
1435 rbd_destroy_ops(ops); 1882 rbd_obj_request_put(obj_request);
1436 return ret;
1437}
1438 1883
1439/*
1440 * Request sync osd unwatch
1441 */
1442static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1443{
1444 struct ceph_osd_req_op *ops;
1445 int ret;
1446
1447 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1448 if (!ops)
1449 return -ENOMEM;
1450
1451 ops[0].watch.ver = 0;
1452 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1453 ops[0].watch.flag = 0;
1454
1455 ret = rbd_req_sync_op(rbd_dev, NULL,
1456 CEPH_NOSNAP,
1457 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1458 ops,
1459 rbd_dev->header_name,
1460 0, 0, NULL, NULL, NULL);
1461
1462
1463 rbd_destroy_ops(ops);
1464 ceph_osdc_cancel_event(rbd_dev->watch_event);
1465 rbd_dev->watch_event = NULL;
1466 return ret; 1884 return ret;
1467} 1885}
1468 1886
1469/* 1887/*
1470 * Synchronous osd object method call 1888 * Synchronous osd object method call
1471 */ 1889 */
1472static int rbd_req_sync_exec(struct rbd_device *rbd_dev, 1890static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1473 const char *object_name, 1891 const char *object_name,
1474 const char *class_name, 1892 const char *class_name,
1475 const char *method_name, 1893 const char *method_name,
@@ -1477,169 +1895,154 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1477 size_t outbound_size, 1895 size_t outbound_size,
1478 char *inbound, 1896 char *inbound,
1479 size_t inbound_size, 1897 size_t inbound_size,
1480 int flags, 1898 u64 *version)
1481 u64 *ver)
1482{ 1899{
1483 struct ceph_osd_req_op *ops; 1900 struct rbd_obj_request *obj_request;
1484 int class_name_len = strlen(class_name); 1901 struct ceph_osd_client *osdc;
1485 int method_name_len = strlen(method_name); 1902 struct ceph_osd_req_op *op;
1486 int payload_size; 1903 struct page **pages;
1904 u32 page_count;
1487 int ret; 1905 int ret;
1488 1906
1489 /* 1907 /*
1490 * Any input parameters required by the method we're calling 1908 * Method calls are ultimately read operations but they
1491 * will be sent along with the class and method names as 1909 * don't involve object data (so no offset or length).
1492 * part of the message payload. That data and its size are 1910 * The result should placed into the inbound buffer
1493 * supplied via the indata and indata_len fields (named from 1911 * provided. They also supply outbound data--parameters for
1494 * the perspective of the server side) in the OSD request 1912 * the object method. Currently if this is present it will
1495 * operation. 1913 * be a snapshot id.
1496 */ 1914 */
1497 payload_size = class_name_len + method_name_len + outbound_size; 1915 page_count = (u32) calc_pages_for(0, inbound_size);
1498 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size); 1916 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1499 if (!ops) 1917 if (IS_ERR(pages))
1500 return -ENOMEM; 1918 return PTR_ERR(pages);
1501 1919
1502 ops[0].cls.class_name = class_name; 1920 ret = -ENOMEM;
1503 ops[0].cls.class_len = (__u8) class_name_len; 1921 obj_request = rbd_obj_request_create(object_name, 0, 0,
1504 ops[0].cls.method_name = method_name; 1922 OBJ_REQUEST_PAGES);
1505 ops[0].cls.method_len = (__u8) method_name_len; 1923 if (!obj_request)
1506 ops[0].cls.argc = 0; 1924 goto out;
1507 ops[0].cls.indata = outbound;
1508 ops[0].cls.indata_len = outbound_size;
1509 1925
1510 ret = rbd_req_sync_op(rbd_dev, NULL, 1926 obj_request->pages = pages;
1511 CEPH_NOSNAP, 1927 obj_request->page_count = page_count;
1512 flags, ops,
1513 object_name, 0, inbound_size, inbound,
1514 NULL, ver);
1515 1928
1516 rbd_destroy_ops(ops); 1929 op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1930 method_name, outbound, outbound_size);
1931 if (!op)
1932 goto out;
1933 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1934 obj_request, op);
1935 rbd_osd_req_op_destroy(op);
1936 if (!obj_request->osd_req)
1937 goto out;
1517 1938
1518 dout("cls_exec returned %d\n", ret); 1939 osdc = &rbd_dev->rbd_client->client->osdc;
1519 return ret; 1940 ret = rbd_obj_request_submit(osdc, obj_request);
1520} 1941 if (ret)
1942 goto out;
1943 ret = rbd_obj_request_wait(obj_request);
1944 if (ret)
1945 goto out;
1521 1946
1522static struct rbd_req_coll *rbd_alloc_coll(int num_reqs) 1947 ret = obj_request->result;
1523{ 1948 if (ret < 0)
1524 struct rbd_req_coll *coll = 1949 goto out;
1525 kzalloc(sizeof(struct rbd_req_coll) + 1950 ret = 0;
1526 sizeof(struct rbd_req_status) * num_reqs, 1951 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1527 GFP_ATOMIC); 1952 if (version)
1953 *version = obj_request->version;
1954out:
1955 if (obj_request)
1956 rbd_obj_request_put(obj_request);
1957 else
1958 ceph_release_page_vector(pages, page_count);
1528 1959
1529 if (!coll) 1960 return ret;
1530 return NULL;
1531 coll->total = num_reqs;
1532 kref_init(&coll->kref);
1533 return coll;
1534} 1961}
1535 1962
1536/* 1963static void rbd_request_fn(struct request_queue *q)
1537 * block device queue callback 1964 __releases(q->queue_lock) __acquires(q->queue_lock)
1538 */
1539static void rbd_rq_fn(struct request_queue *q)
1540{ 1965{
1541 struct rbd_device *rbd_dev = q->queuedata; 1966 struct rbd_device *rbd_dev = q->queuedata;
1967 bool read_only = rbd_dev->mapping.read_only;
1542 struct request *rq; 1968 struct request *rq;
1969 int result;
1543 1970
1544 while ((rq = blk_fetch_request(q))) { 1971 while ((rq = blk_fetch_request(q))) {
1545 struct bio *bio; 1972 bool write_request = rq_data_dir(rq) == WRITE;
1546 bool do_write; 1973 struct rbd_img_request *img_request;
1547 unsigned int size; 1974 u64 offset;
1548 u64 ofs; 1975 u64 length;
1549 int num_segs, cur_seg = 0; 1976
1550 struct rbd_req_coll *coll; 1977 /* Ignore any non-FS requests that filter through. */
1551 struct ceph_snap_context *snapc;
1552 unsigned int bio_offset;
1553
1554 dout("fetched request\n");
1555
1556 /* filter out block requests we don't understand */
1557 if ((rq->cmd_type != REQ_TYPE_FS)) {
1558 __blk_end_request_all(rq, 0);
1559 continue;
1560 }
1561 1978
1562 /* deduce our operation (read, write) */ 1979 if (rq->cmd_type != REQ_TYPE_FS) {
1563 do_write = (rq_data_dir(rq) == WRITE); 1980 dout("%s: non-fs request type %d\n", __func__,
1564 if (do_write && rbd_dev->mapping.read_only) { 1981 (int) rq->cmd_type);
1565 __blk_end_request_all(rq, -EROFS); 1982 __blk_end_request_all(rq, 0);
1566 continue; 1983 continue;
1567 } 1984 }
1568 1985
1569 spin_unlock_irq(q->queue_lock); 1986 /* Ignore/skip any zero-length requests */
1570 1987
1571 down_read(&rbd_dev->header_rwsem); 1988 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1989 length = (u64) blk_rq_bytes(rq);
1572 1990
1573 if (!rbd_dev->exists) { 1991 if (!length) {
1574 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); 1992 dout("%s: zero-length request\n", __func__);
1575 up_read(&rbd_dev->header_rwsem); 1993 __blk_end_request_all(rq, 0);
1576 dout("request for non-existent snapshot");
1577 spin_lock_irq(q->queue_lock);
1578 __blk_end_request_all(rq, -ENXIO);
1579 continue; 1994 continue;
1580 } 1995 }
1581 1996
1582 snapc = ceph_get_snap_context(rbd_dev->header.snapc); 1997 spin_unlock_irq(q->queue_lock);
1583
1584 up_read(&rbd_dev->header_rwsem);
1585
1586 size = blk_rq_bytes(rq);
1587 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1588 bio = rq->bio;
1589 1998
1590 dout("%s 0x%x bytes at 0x%llx\n", 1999 /* Disallow writes to a read-only device */
1591 do_write ? "write" : "read",
1592 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1593 2000
1594 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size); 2001 if (write_request) {
1595 if (num_segs <= 0) { 2002 result = -EROFS;
1596 spin_lock_irq(q->queue_lock); 2003 if (read_only)
1597 __blk_end_request_all(rq, num_segs); 2004 goto end_request;
1598 ceph_put_snap_context(snapc); 2005 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1599 continue;
1600 } 2006 }
1601 coll = rbd_alloc_coll(num_segs);
1602 if (!coll) {
1603 spin_lock_irq(q->queue_lock);
1604 __blk_end_request_all(rq, -ENOMEM);
1605 ceph_put_snap_context(snapc);
1606 continue;
1607 }
1608
1609 bio_offset = 0;
1610 do {
1611 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1612 unsigned int chain_size;
1613 struct bio *bio_chain;
1614
1615 BUG_ON(limit > (u64) UINT_MAX);
1616 chain_size = (unsigned int) limit;
1617 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1618 2007
1619 kref_get(&coll->kref); 2008 /*
2009 * Quit early if the mapped snapshot no longer
2010 * exists. It's still possible the snapshot will
2011 * have disappeared by the time our request arrives
2012 * at the osd, but there's no sense in sending it if
2013 * we already know.
2014 */
2015 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2016 dout("request for non-existent snapshot");
2017 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2018 result = -ENXIO;
2019 goto end_request;
2020 }
1620 2021
1621 /* Pass a cloned bio chain via an osd request */ 2022 result = -EINVAL;
2023 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2024 goto end_request; /* Shouldn't happen */
1622 2025
1623 bio_chain = bio_chain_clone_range(&bio, 2026 result = -ENOMEM;
1624 &bio_offset, chain_size, 2027 img_request = rbd_img_request_create(rbd_dev, offset, length,
1625 GFP_ATOMIC); 2028 write_request);
1626 if (bio_chain) 2029 if (!img_request)
1627 (void) rbd_do_op(rq, rbd_dev, snapc, 2030 goto end_request;
1628 ofs, chain_size,
1629 bio_chain, coll, cur_seg);
1630 else
1631 rbd_coll_end_req_index(rq, coll, cur_seg,
1632 -ENOMEM, chain_size);
1633 size -= chain_size;
1634 ofs += chain_size;
1635 2031
1636 cur_seg++; 2032 img_request->rq = rq;
1637 } while (size > 0);
1638 kref_put(&coll->kref, rbd_coll_release);
1639 2033
2034 result = rbd_img_request_fill_bio(img_request, rq->bio);
2035 if (!result)
2036 result = rbd_img_request_submit(img_request);
2037 if (result)
2038 rbd_img_request_put(img_request);
2039end_request:
1640 spin_lock_irq(q->queue_lock); 2040 spin_lock_irq(q->queue_lock);
1641 2041 if (result < 0) {
1642 ceph_put_snap_context(snapc); 2042 rbd_warn(rbd_dev, "obj_request %s result %d\n",
2043 write_request ? "write" : "read", result);
2044 __blk_end_request_all(rq, result);
2045 }
1643 } 2046 }
1644} 2047}
1645 2048
@@ -1703,6 +2106,71 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
1703 put_disk(disk); 2106 put_disk(disk);
1704} 2107}
1705 2108
2109static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2110 const char *object_name,
2111 u64 offset, u64 length,
2112 char *buf, u64 *version)
2113
2114{
2115 struct ceph_osd_req_op *op;
2116 struct rbd_obj_request *obj_request;
2117 struct ceph_osd_client *osdc;
2118 struct page **pages = NULL;
2119 u32 page_count;
2120 size_t size;
2121 int ret;
2122
2123 page_count = (u32) calc_pages_for(offset, length);
2124 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2125 if (IS_ERR(pages))
2126 ret = PTR_ERR(pages);
2127
2128 ret = -ENOMEM;
2129 obj_request = rbd_obj_request_create(object_name, offset, length,
2130 OBJ_REQUEST_PAGES);
2131 if (!obj_request)
2132 goto out;
2133
2134 obj_request->pages = pages;
2135 obj_request->page_count = page_count;
2136
2137 op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
2138 if (!op)
2139 goto out;
2140 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2141 obj_request, op);
2142 rbd_osd_req_op_destroy(op);
2143 if (!obj_request->osd_req)
2144 goto out;
2145
2146 osdc = &rbd_dev->rbd_client->client->osdc;
2147 ret = rbd_obj_request_submit(osdc, obj_request);
2148 if (ret)
2149 goto out;
2150 ret = rbd_obj_request_wait(obj_request);
2151 if (ret)
2152 goto out;
2153
2154 ret = obj_request->result;
2155 if (ret < 0)
2156 goto out;
2157
2158 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2159 size = (size_t) obj_request->xferred;
2160 ceph_copy_from_page_vector(pages, buf, 0, size);
2161 rbd_assert(size <= (size_t) INT_MAX);
2162 ret = (int) size;
2163 if (version)
2164 *version = obj_request->version;
2165out:
2166 if (obj_request)
2167 rbd_obj_request_put(obj_request);
2168 else
2169 ceph_release_page_vector(pages, page_count);
2170
2171 return ret;
2172}
2173
1706/* 2174/*
1707 * Read the complete header for the given rbd device. 2175 * Read the complete header for the given rbd device.
1708 * 2176 *
@@ -1741,24 +2209,20 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1741 if (!ondisk) 2209 if (!ondisk)
1742 return ERR_PTR(-ENOMEM); 2210 return ERR_PTR(-ENOMEM);
1743 2211
1744 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP, 2212 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
1745 rbd_dev->header_name,
1746 0, size, 2213 0, size,
1747 (char *) ondisk, version); 2214 (char *) ondisk, version);
1748
1749 if (ret < 0) 2215 if (ret < 0)
1750 goto out_err; 2216 goto out_err;
1751 if (WARN_ON((size_t) ret < size)) { 2217 if (WARN_ON((size_t) ret < size)) {
1752 ret = -ENXIO; 2218 ret = -ENXIO;
1753 pr_warning("short header read for image %s" 2219 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
1754 " (want %zd got %d)\n", 2220 size, ret);
1755 rbd_dev->spec->image_name, size, ret);
1756 goto out_err; 2221 goto out_err;
1757 } 2222 }
1758 if (!rbd_dev_ondisk_valid(ondisk)) { 2223 if (!rbd_dev_ondisk_valid(ondisk)) {
1759 ret = -ENXIO; 2224 ret = -ENXIO;
1760 pr_warning("invalid header for image %s\n", 2225 rbd_warn(rbd_dev, "invalid header");
1761 rbd_dev->spec->image_name);
1762 goto out_err; 2226 goto out_err;
1763 } 2227 }
1764 2228
@@ -1895,8 +2359,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
1895 disk->fops = &rbd_bd_ops; 2359 disk->fops = &rbd_bd_ops;
1896 disk->private_data = rbd_dev; 2360 disk->private_data = rbd_dev;
1897 2361
1898 /* init rq */ 2362 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
1899 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1900 if (!q) 2363 if (!q)
1901 goto out_disk; 2364 goto out_disk;
1902 2365
@@ -2233,7 +2696,7 @@ static void rbd_spec_free(struct kref *kref)
2233 kfree(spec); 2696 kfree(spec);
2234} 2697}
2235 2698
2236struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, 2699static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2237 struct rbd_spec *spec) 2700 struct rbd_spec *spec)
2238{ 2701{
2239 struct rbd_device *rbd_dev; 2702 struct rbd_device *rbd_dev;
@@ -2243,6 +2706,7 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2243 return NULL; 2706 return NULL;
2244 2707
2245 spin_lock_init(&rbd_dev->lock); 2708 spin_lock_init(&rbd_dev->lock);
2709 rbd_dev->flags = 0;
2246 INIT_LIST_HEAD(&rbd_dev->node); 2710 INIT_LIST_HEAD(&rbd_dev->node);
2247 INIT_LIST_HEAD(&rbd_dev->snaps); 2711 INIT_LIST_HEAD(&rbd_dev->snaps);
2248 init_rwsem(&rbd_dev->header_rwsem); 2712 init_rwsem(&rbd_dev->header_rwsem);
@@ -2250,6 +2714,13 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2250 rbd_dev->spec = spec; 2714 rbd_dev->spec = spec;
2251 rbd_dev->rbd_client = rbdc; 2715 rbd_dev->rbd_client = rbdc;
2252 2716
2717 /* Initialize the layout used for all rbd requests */
2718
2719 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2720 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2721 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2722 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2723
2253 return rbd_dev; 2724 return rbd_dev;
2254} 2725}
2255 2726
@@ -2360,12 +2831,11 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2360 __le64 size; 2831 __le64 size;
2361 } __attribute__ ((packed)) size_buf = { 0 }; 2832 } __attribute__ ((packed)) size_buf = { 0 };
2362 2833
2363 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2834 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2364 "rbd", "get_size", 2835 "rbd", "get_size",
2365 (char *) &snapid, sizeof (snapid), 2836 (char *) &snapid, sizeof (snapid),
2366 (char *) &size_buf, sizeof (size_buf), 2837 (char *) &size_buf, sizeof (size_buf), NULL);
2367 CEPH_OSD_FLAG_READ, NULL); 2838 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2368 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2369 if (ret < 0) 2839 if (ret < 0)
2370 return ret; 2840 return ret;
2371 2841
@@ -2396,15 +2866,13 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2396 if (!reply_buf) 2866 if (!reply_buf)
2397 return -ENOMEM; 2867 return -ENOMEM;
2398 2868
2399 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2869 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2400 "rbd", "get_object_prefix", 2870 "rbd", "get_object_prefix",
2401 NULL, 0, 2871 NULL, 0,
2402 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, 2872 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2403 CEPH_OSD_FLAG_READ, NULL); 2873 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2404 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2405 if (ret < 0) 2874 if (ret < 0)
2406 goto out; 2875 goto out;
2407 ret = 0; /* rbd_req_sync_exec() can return positive */
2408 2876
2409 p = reply_buf; 2877 p = reply_buf;
2410 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, 2878 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
@@ -2435,12 +2903,12 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2435 u64 incompat; 2903 u64 incompat;
2436 int ret; 2904 int ret;
2437 2905
2438 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2906 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2439 "rbd", "get_features", 2907 "rbd", "get_features",
2440 (char *) &snapid, sizeof (snapid), 2908 (char *) &snapid, sizeof (snapid),
2441 (char *) &features_buf, sizeof (features_buf), 2909 (char *) &features_buf, sizeof (features_buf),
2442 CEPH_OSD_FLAG_READ, NULL); 2910 NULL);
2443 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2911 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2444 if (ret < 0) 2912 if (ret < 0)
2445 return ret; 2913 return ret;
2446 2914
@@ -2474,7 +2942,6 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2474 void *end; 2942 void *end;
2475 char *image_id; 2943 char *image_id;
2476 u64 overlap; 2944 u64 overlap;
2477 size_t len = 0;
2478 int ret; 2945 int ret;
2479 2946
2480 parent_spec = rbd_spec_alloc(); 2947 parent_spec = rbd_spec_alloc();
@@ -2492,12 +2959,11 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2492 } 2959 }
2493 2960
2494 snapid = cpu_to_le64(CEPH_NOSNAP); 2961 snapid = cpu_to_le64(CEPH_NOSNAP);
2495 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2962 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2496 "rbd", "get_parent", 2963 "rbd", "get_parent",
2497 (char *) &snapid, sizeof (snapid), 2964 (char *) &snapid, sizeof (snapid),
2498 (char *) reply_buf, size, 2965 (char *) reply_buf, size, NULL);
2499 CEPH_OSD_FLAG_READ, NULL); 2966 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2500 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2501 if (ret < 0) 2967 if (ret < 0)
2502 goto out_err; 2968 goto out_err;
2503 2969
@@ -2508,13 +2974,18 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2508 if (parent_spec->pool_id == CEPH_NOPOOL) 2974 if (parent_spec->pool_id == CEPH_NOPOOL)
2509 goto out; /* No parent? No problem. */ 2975 goto out; /* No parent? No problem. */
2510 2976
2511 image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); 2977 /* The ceph file layout needs to fit pool id in 32 bits */
2978
2979 ret = -EIO;
2980 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2981 goto out;
2982
2983 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2512 if (IS_ERR(image_id)) { 2984 if (IS_ERR(image_id)) {
2513 ret = PTR_ERR(image_id); 2985 ret = PTR_ERR(image_id);
2514 goto out_err; 2986 goto out_err;
2515 } 2987 }
2516 parent_spec->image_id = image_id; 2988 parent_spec->image_id = image_id;
2517 parent_spec->image_id_len = len;
2518 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err); 2989 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2519 ceph_decode_64_safe(&p, end, overlap, out_err); 2990 ceph_decode_64_safe(&p, end, overlap, out_err);
2520 2991
@@ -2544,26 +3015,25 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2544 3015
2545 rbd_assert(!rbd_dev->spec->image_name); 3016 rbd_assert(!rbd_dev->spec->image_name);
2546 3017
2547 image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len; 3018 len = strlen(rbd_dev->spec->image_id);
3019 image_id_size = sizeof (__le32) + len;
2548 image_id = kmalloc(image_id_size, GFP_KERNEL); 3020 image_id = kmalloc(image_id_size, GFP_KERNEL);
2549 if (!image_id) 3021 if (!image_id)
2550 return NULL; 3022 return NULL;
2551 3023
2552 p = image_id; 3024 p = image_id;
2553 end = (char *) image_id + image_id_size; 3025 end = (char *) image_id + image_id_size;
2554 ceph_encode_string(&p, end, rbd_dev->spec->image_id, 3026 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2555 (u32) rbd_dev->spec->image_id_len);
2556 3027
2557 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; 3028 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2558 reply_buf = kmalloc(size, GFP_KERNEL); 3029 reply_buf = kmalloc(size, GFP_KERNEL);
2559 if (!reply_buf) 3030 if (!reply_buf)
2560 goto out; 3031 goto out;
2561 3032
2562 ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY, 3033 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
2563 "rbd", "dir_get_name", 3034 "rbd", "dir_get_name",
2564 image_id, image_id_size, 3035 image_id, image_id_size,
2565 (char *) reply_buf, size, 3036 (char *) reply_buf, size, NULL);
2566 CEPH_OSD_FLAG_READ, NULL);
2567 if (ret < 0) 3037 if (ret < 0)
2568 goto out; 3038 goto out;
2569 p = reply_buf; 3039 p = reply_buf;
@@ -2602,8 +3072,11 @@ static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2602 3072
2603 osdc = &rbd_dev->rbd_client->client->osdc; 3073 osdc = &rbd_dev->rbd_client->client->osdc;
2604 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id); 3074 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
2605 if (!name) 3075 if (!name) {
2606 return -EIO; /* pool id too large (>= 2^31) */ 3076 rbd_warn(rbd_dev, "there is no pool with id %llu",
3077 rbd_dev->spec->pool_id); /* Really a BUG() */
3078 return -EIO;
3079 }
2607 3080
2608 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL); 3081 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
2609 if (!rbd_dev->spec->pool_name) 3082 if (!rbd_dev->spec->pool_name)
@@ -2612,19 +3085,17 @@ static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2612 /* Fetch the image name; tolerate failure here */ 3085 /* Fetch the image name; tolerate failure here */
2613 3086
2614 name = rbd_dev_image_name(rbd_dev); 3087 name = rbd_dev_image_name(rbd_dev);
2615 if (name) { 3088 if (name)
2616 rbd_dev->spec->image_name_len = strlen(name);
2617 rbd_dev->spec->image_name = (char *) name; 3089 rbd_dev->spec->image_name = (char *) name;
2618 } else { 3090 else
2619 pr_warning(RBD_DRV_NAME "%d " 3091 rbd_warn(rbd_dev, "unable to get image name");
2620 "unable to get image name for image id %s\n",
2621 rbd_dev->major, rbd_dev->spec->image_id);
2622 }
2623 3092
2624 /* Look up the snapshot name. */ 3093 /* Look up the snapshot name. */
2625 3094
2626 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id); 3095 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
2627 if (!name) { 3096 if (!name) {
3097 rbd_warn(rbd_dev, "no snapshot with id %llu",
3098 rbd_dev->spec->snap_id); /* Really a BUG() */
2628 ret = -EIO; 3099 ret = -EIO;
2629 goto out_err; 3100 goto out_err;
2630 } 3101 }
@@ -2665,12 +3136,11 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2665 if (!reply_buf) 3136 if (!reply_buf)
2666 return -ENOMEM; 3137 return -ENOMEM;
2667 3138
2668 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 3139 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2669 "rbd", "get_snapcontext", 3140 "rbd", "get_snapcontext",
2670 NULL, 0, 3141 NULL, 0,
2671 reply_buf, size, 3142 reply_buf, size, ver);
2672 CEPH_OSD_FLAG_READ, ver); 3143 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2673 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2674 if (ret < 0) 3144 if (ret < 0)
2675 goto out; 3145 goto out;
2676 3146
@@ -2735,12 +3205,11 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2735 return ERR_PTR(-ENOMEM); 3205 return ERR_PTR(-ENOMEM);
2736 3206
2737 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]); 3207 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2738 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 3208 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2739 "rbd", "get_snapshot_name", 3209 "rbd", "get_snapshot_name",
2740 (char *) &snap_id, sizeof (snap_id), 3210 (char *) &snap_id, sizeof (snap_id),
2741 reply_buf, size, 3211 reply_buf, size, NULL);
2742 CEPH_OSD_FLAG_READ, NULL); 3212 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2743 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2744 if (ret < 0) 3213 if (ret < 0)
2745 goto out; 3214 goto out;
2746 3215
@@ -2766,7 +3235,7 @@ out:
2766static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which, 3235static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2767 u64 *snap_size, u64 *snap_features) 3236 u64 *snap_size, u64 *snap_features)
2768{ 3237{
2769 __le64 snap_id; 3238 u64 snap_id;
2770 u8 order; 3239 u8 order;
2771 int ret; 3240 int ret;
2772 3241
@@ -2865,10 +3334,17 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2865 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) { 3334 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2866 struct list_head *next = links->next; 3335 struct list_head *next = links->next;
2867 3336
2868 /* Existing snapshot not in the new snap context */ 3337 /*
2869 3338 * A previously-existing snapshot is not in
3339 * the new snap context.
3340 *
3341 * If the now missing snapshot is the one the
3342 * image is mapped to, clear its exists flag
3343 * so we can avoid sending any more requests
3344 * to it.
3345 */
2870 if (rbd_dev->spec->snap_id == snap->id) 3346 if (rbd_dev->spec->snap_id == snap->id)
2871 rbd_dev->exists = false; 3347 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
2872 rbd_remove_snap_dev(snap); 3348 rbd_remove_snap_dev(snap);
2873 dout("%ssnap id %llu has been removed\n", 3349 dout("%ssnap id %llu has been removed\n",
2874 rbd_dev->spec->snap_id == snap->id ? 3350 rbd_dev->spec->snap_id == snap->id ?
@@ -2942,7 +3418,7 @@ static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2942 struct rbd_snap *snap; 3418 struct rbd_snap *snap;
2943 int ret = 0; 3419 int ret = 0;
2944 3420
2945 dout("%s called\n", __func__); 3421 dout("%s:\n", __func__);
2946 if (WARN_ON(!device_is_registered(&rbd_dev->dev))) 3422 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2947 return -EIO; 3423 return -EIO;
2948 3424
@@ -2983,22 +3459,6 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2983 device_unregister(&rbd_dev->dev); 3459 device_unregister(&rbd_dev->dev);
2984} 3460}
2985 3461
2986static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2987{
2988 int ret, rc;
2989
2990 do {
2991 ret = rbd_req_sync_watch(rbd_dev);
2992 if (ret == -ERANGE) {
2993 rc = rbd_dev_refresh(rbd_dev, NULL);
2994 if (rc < 0)
2995 return rc;
2996 }
2997 } while (ret == -ERANGE);
2998
2999 return ret;
3000}
3001
3002static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0); 3462static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3003 3463
3004/* 3464/*
@@ -3138,11 +3598,9 @@ static inline char *dup_token(const char **buf, size_t *lenp)
3138 size_t len; 3598 size_t len;
3139 3599
3140 len = next_token(buf); 3600 len = next_token(buf);
3141 dup = kmalloc(len + 1, GFP_KERNEL); 3601 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3142 if (!dup) 3602 if (!dup)
3143 return NULL; 3603 return NULL;
3144
3145 memcpy(dup, *buf, len);
3146 *(dup + len) = '\0'; 3604 *(dup + len) = '\0';
3147 *buf += len; 3605 *buf += len;
3148 3606
@@ -3210,8 +3668,10 @@ static int rbd_add_parse_args(const char *buf,
3210 /* The first four tokens are required */ 3668 /* The first four tokens are required */
3211 3669
3212 len = next_token(&buf); 3670 len = next_token(&buf);
3213 if (!len) 3671 if (!len) {
3214 return -EINVAL; /* Missing monitor address(es) */ 3672 rbd_warn(NULL, "no monitor address(es) provided");
3673 return -EINVAL;
3674 }
3215 mon_addrs = buf; 3675 mon_addrs = buf;
3216 mon_addrs_size = len + 1; 3676 mon_addrs_size = len + 1;
3217 buf += len; 3677 buf += len;
@@ -3220,8 +3680,10 @@ static int rbd_add_parse_args(const char *buf,
3220 options = dup_token(&buf, NULL); 3680 options = dup_token(&buf, NULL);
3221 if (!options) 3681 if (!options)
3222 return -ENOMEM; 3682 return -ENOMEM;
3223 if (!*options) 3683 if (!*options) {
3224 goto out_err; /* Missing options */ 3684 rbd_warn(NULL, "no options provided");
3685 goto out_err;
3686 }
3225 3687
3226 spec = rbd_spec_alloc(); 3688 spec = rbd_spec_alloc();
3227 if (!spec) 3689 if (!spec)
@@ -3230,14 +3692,18 @@ static int rbd_add_parse_args(const char *buf,
3230 spec->pool_name = dup_token(&buf, NULL); 3692 spec->pool_name = dup_token(&buf, NULL);
3231 if (!spec->pool_name) 3693 if (!spec->pool_name)
3232 goto out_mem; 3694 goto out_mem;
3233 if (!*spec->pool_name) 3695 if (!*spec->pool_name) {
3234 goto out_err; /* Missing pool name */ 3696 rbd_warn(NULL, "no pool name provided");
3697 goto out_err;
3698 }
3235 3699
3236 spec->image_name = dup_token(&buf, &spec->image_name_len); 3700 spec->image_name = dup_token(&buf, NULL);
3237 if (!spec->image_name) 3701 if (!spec->image_name)
3238 goto out_mem; 3702 goto out_mem;
3239 if (!*spec->image_name) 3703 if (!*spec->image_name) {
3240 goto out_err; /* Missing image name */ 3704 rbd_warn(NULL, "no image name provided");
3705 goto out_err;
3706 }
3241 3707
3242 /* 3708 /*
3243 * Snapshot name is optional; default is to use "-" 3709 * Snapshot name is optional; default is to use "-"
@@ -3251,10 +3717,9 @@ static int rbd_add_parse_args(const char *buf,
3251 ret = -ENAMETOOLONG; 3717 ret = -ENAMETOOLONG;
3252 goto out_err; 3718 goto out_err;
3253 } 3719 }
3254 spec->snap_name = kmalloc(len + 1, GFP_KERNEL); 3720 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3255 if (!spec->snap_name) 3721 if (!spec->snap_name)
3256 goto out_mem; 3722 goto out_mem;
3257 memcpy(spec->snap_name, buf, len);
3258 *(spec->snap_name + len) = '\0'; 3723 *(spec->snap_name + len) = '\0';
3259 3724
3260 /* Initialize all rbd options to the defaults */ 3725 /* Initialize all rbd options to the defaults */
@@ -3323,7 +3788,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3323 * First, see if the format 2 image id file exists, and if 3788 * First, see if the format 2 image id file exists, and if
3324 * so, get the image's persistent id from it. 3789 * so, get the image's persistent id from it.
3325 */ 3790 */
3326 size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len; 3791 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3327 object_name = kmalloc(size, GFP_NOIO); 3792 object_name = kmalloc(size, GFP_NOIO);
3328 if (!object_name) 3793 if (!object_name)
3329 return -ENOMEM; 3794 return -ENOMEM;
@@ -3339,21 +3804,18 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3339 goto out; 3804 goto out;
3340 } 3805 }
3341 3806
3342 ret = rbd_req_sync_exec(rbd_dev, object_name, 3807 ret = rbd_obj_method_sync(rbd_dev, object_name,
3343 "rbd", "get_id", 3808 "rbd", "get_id",
3344 NULL, 0, 3809 NULL, 0,
3345 response, RBD_IMAGE_ID_LEN_MAX, 3810 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3346 CEPH_OSD_FLAG_READ, NULL); 3811 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3347 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3348 if (ret < 0) 3812 if (ret < 0)
3349 goto out; 3813 goto out;
3350 ret = 0; /* rbd_req_sync_exec() can return positive */
3351 3814
3352 p = response; 3815 p = response;
3353 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p, 3816 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3354 p + RBD_IMAGE_ID_LEN_MAX, 3817 p + RBD_IMAGE_ID_LEN_MAX,
3355 &rbd_dev->spec->image_id_len, 3818 NULL, GFP_NOIO);
3356 GFP_NOIO);
3357 if (IS_ERR(rbd_dev->spec->image_id)) { 3819 if (IS_ERR(rbd_dev->spec->image_id)) {
3358 ret = PTR_ERR(rbd_dev->spec->image_id); 3820 ret = PTR_ERR(rbd_dev->spec->image_id);
3359 rbd_dev->spec->image_id = NULL; 3821 rbd_dev->spec->image_id = NULL;
@@ -3377,11 +3839,10 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3377 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL); 3839 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3378 if (!rbd_dev->spec->image_id) 3840 if (!rbd_dev->spec->image_id)
3379 return -ENOMEM; 3841 return -ENOMEM;
3380 rbd_dev->spec->image_id_len = 0;
3381 3842
3382 /* Record the header object name for this rbd image. */ 3843 /* Record the header object name for this rbd image. */
3383 3844
3384 size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX); 3845 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3385 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 3846 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3386 if (!rbd_dev->header_name) { 3847 if (!rbd_dev->header_name) {
3387 ret = -ENOMEM; 3848 ret = -ENOMEM;
@@ -3427,7 +3888,7 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3427 * Image id was filled in by the caller. Record the header 3888 * Image id was filled in by the caller. Record the header
3428 * object name for this rbd image. 3889 * object name for this rbd image.
3429 */ 3890 */
3430 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len; 3891 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3431 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 3892 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3432 if (!rbd_dev->header_name) 3893 if (!rbd_dev->header_name)
3433 return -ENOMEM; 3894 return -ENOMEM;
@@ -3542,7 +4003,7 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3542 if (ret) 4003 if (ret)
3543 goto err_out_bus; 4004 goto err_out_bus;
3544 4005
3545 ret = rbd_init_watch_dev(rbd_dev); 4006 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
3546 if (ret) 4007 if (ret)
3547 goto err_out_bus; 4008 goto err_out_bus;
3548 4009
@@ -3638,6 +4099,13 @@ static ssize_t rbd_add(struct bus_type *bus,
3638 goto err_out_client; 4099 goto err_out_client;
3639 spec->pool_id = (u64) rc; 4100 spec->pool_id = (u64) rc;
3640 4101
4102 /* The ceph file layout needs to fit pool id in 32 bits */
4103
4104 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4105 rc = -EIO;
4106 goto err_out_client;
4107 }
4108
3641 rbd_dev = rbd_dev_create(rbdc, spec); 4109 rbd_dev = rbd_dev_create(rbdc, spec);
3642 if (!rbd_dev) 4110 if (!rbd_dev)
3643 goto err_out_client; 4111 goto err_out_client;
@@ -3691,15 +4159,8 @@ static void rbd_dev_release(struct device *dev)
3691{ 4159{
3692 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4160 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3693 4161
3694 if (rbd_dev->watch_request) {
3695 struct ceph_client *client = rbd_dev->rbd_client->client;
3696
3697 ceph_osdc_unregister_linger_request(&client->osdc,
3698 rbd_dev->watch_request);
3699 }
3700 if (rbd_dev->watch_event) 4162 if (rbd_dev->watch_event)
3701 rbd_req_sync_unwatch(rbd_dev); 4163 rbd_dev_header_watch_sync(rbd_dev, 0);
3702
3703 4164
3704 /* clean up and free blkdev */ 4165 /* clean up and free blkdev */
3705 rbd_free_disk(rbd_dev); 4166 rbd_free_disk(rbd_dev);
@@ -3743,10 +4204,14 @@ static ssize_t rbd_remove(struct bus_type *bus,
3743 goto done; 4204 goto done;
3744 } 4205 }
3745 4206
3746 if (rbd_dev->open_count) { 4207 spin_lock_irq(&rbd_dev->lock);
4208 if (rbd_dev->open_count)
3747 ret = -EBUSY; 4209 ret = -EBUSY;
4210 else
4211 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4212 spin_unlock_irq(&rbd_dev->lock);
4213 if (ret < 0)
3748 goto done; 4214 goto done;
3749 }
3750 4215
3751 rbd_remove_all_snaps(rbd_dev); 4216 rbd_remove_all_snaps(rbd_dev);
3752 rbd_bus_del_dev(rbd_dev); 4217 rbd_bus_del_dev(rbd_dev);
@@ -3782,10 +4247,15 @@ static void rbd_sysfs_cleanup(void)
3782 device_unregister(&rbd_root_dev); 4247 device_unregister(&rbd_root_dev);
3783} 4248}
3784 4249
3785int __init rbd_init(void) 4250static int __init rbd_init(void)
3786{ 4251{
3787 int rc; 4252 int rc;
3788 4253
4254 if (!libceph_compatible(NULL)) {
4255 rbd_warn(NULL, "libceph incompatibility (quitting)");
4256
4257 return -EINVAL;
4258 }
3789 rc = rbd_sysfs_init(); 4259 rc = rbd_sysfs_init();
3790 if (rc) 4260 if (rc)
3791 return rc; 4261 return rc;
@@ -3793,7 +4263,7 @@ int __init rbd_init(void)
3793 return 0; 4263 return 0;
3794} 4264}
3795 4265
3796void __exit rbd_exit(void) 4266static void __exit rbd_exit(void)
3797{ 4267{
3798 rbd_sysfs_cleanup(); 4268 rbd_sysfs_cleanup();
3799} 4269}