aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/block
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/block')
-rw-r--r--drivers/block/rbd.c730
-rw-r--r--drivers/block/rbd_types.h4
2 files changed, 448 insertions, 286 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index a6278e7e61a0..013c7a549fb6 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -41,19 +41,35 @@
41 41
42#include "rbd_types.h" 42#include "rbd_types.h"
43 43
44#define DRV_NAME "rbd" 44/*
45#define DRV_NAME_LONG "rbd (rados block device)" 45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50#define SECTOR_SHIFT 9
51#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
53#define RBD_DRV_NAME "rbd"
54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
46 55
47#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ 56#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
48 57
49#define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX)) 58#define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
50#define RBD_MAX_POOL_NAME_LEN 64 59#define RBD_MAX_POOL_NAME_LEN 64
51#define RBD_MAX_SNAP_NAME_LEN 32 60#define RBD_MAX_SNAP_NAME_LEN 32
52#define RBD_MAX_OPT_LEN 1024 61#define RBD_MAX_OPT_LEN 1024
53 62
54#define RBD_SNAP_HEAD_NAME "-" 63#define RBD_SNAP_HEAD_NAME "-"
55 64
65/*
66 * An RBD device name will be "rbd#", where the "rbd" comes from
67 * RBD_DRV_NAME above, and # is a unique integer identifier.
68 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
69 * enough to hold all possible device names.
70 */
56#define DEV_NAME_LEN 32 71#define DEV_NAME_LEN 32
72#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
57 73
58#define RBD_NOTIFY_TIMEOUT_DEFAULT 10 74#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59 75
@@ -66,7 +82,6 @@ struct rbd_image_header {
66 __u8 obj_order; 82 __u8 obj_order;
67 __u8 crypt_type; 83 __u8 crypt_type;
68 __u8 comp_type; 84 __u8 comp_type;
69 struct rw_semaphore snap_rwsem;
70 struct ceph_snap_context *snapc; 85 struct ceph_snap_context *snapc;
71 size_t snap_names_len; 86 size_t snap_names_len;
72 u64 snap_seq; 87 u64 snap_seq;
@@ -83,7 +98,7 @@ struct rbd_options {
83}; 98};
84 99
85/* 100/*
86 * an instance of the client. multiple devices may share a client. 101 * an instance of the client. multiple devices may share an rbd client.
87 */ 102 */
88struct rbd_client { 103struct rbd_client {
89 struct ceph_client *client; 104 struct ceph_client *client;
@@ -92,20 +107,9 @@ struct rbd_client {
92 struct list_head node; 107 struct list_head node;
93}; 108};
94 109
95struct rbd_req_coll;
96
97/* 110/*
98 * a single io request 111 * a request completion status
99 */ 112 */
100struct rbd_request {
101 struct request *rq; /* blk layer request */
102 struct bio *bio; /* cloned bio */
103 struct page **pages; /* list of used pages */
104 u64 len;
105 int coll_index;
106 struct rbd_req_coll *coll;
107};
108
109struct rbd_req_status { 113struct rbd_req_status {
110 int done; 114 int done;
111 int rc; 115 int rc;
@@ -122,6 +126,18 @@ struct rbd_req_coll {
122 struct rbd_req_status status[0]; 126 struct rbd_req_status status[0];
123}; 127};
124 128
129/*
130 * a single io request
131 */
132struct rbd_request {
133 struct request *rq; /* blk layer request */
134 struct bio *bio; /* cloned bio */
135 struct page **pages; /* list of used pages */
136 u64 len;
137 int coll_index;
138 struct rbd_req_coll *coll;
139};
140
125struct rbd_snap { 141struct rbd_snap {
126 struct device dev; 142 struct device dev;
127 const char *name; 143 const char *name;
@@ -140,7 +156,6 @@ struct rbd_device {
140 struct gendisk *disk; /* blkdev's gendisk and rq */ 156 struct gendisk *disk; /* blkdev's gendisk and rq */
141 struct request_queue *q; 157 struct request_queue *q;
142 158
143 struct ceph_client *client;
144 struct rbd_client *rbd_client; 159 struct rbd_client *rbd_client;
145 160
146 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 161 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
@@ -157,6 +172,8 @@ struct rbd_device {
157 struct ceph_osd_event *watch_event; 172 struct ceph_osd_event *watch_event;
158 struct ceph_osd_request *watch_request; 173 struct ceph_osd_request *watch_request;
159 174
175 /* protects updating the header */
176 struct rw_semaphore header_rwsem;
160 char snap_name[RBD_MAX_SNAP_NAME_LEN]; 177 char snap_name[RBD_MAX_SNAP_NAME_LEN];
161 u32 cur_snap; /* index+1 of current snapshot within snap context 178 u32 cur_snap; /* index+1 of current snapshot within snap context
162 0 - for the head */ 179 0 - for the head */
@@ -171,15 +188,13 @@ struct rbd_device {
171 struct device dev; 188 struct device dev;
172}; 189};
173 190
174static struct bus_type rbd_bus_type = {
175 .name = "rbd",
176};
177
178static spinlock_t node_lock; /* protects client get/put */
179
180static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ 191static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
192
181static LIST_HEAD(rbd_dev_list); /* devices */ 193static LIST_HEAD(rbd_dev_list); /* devices */
182static LIST_HEAD(rbd_client_list); /* clients */ 194static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196static LIST_HEAD(rbd_client_list); /* clients */
197static DEFINE_SPINLOCK(rbd_client_list_lock);
183 198
184static int __rbd_init_snaps_header(struct rbd_device *rbd_dev); 199static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
185static void rbd_dev_release(struct device *dev); 200static void rbd_dev_release(struct device *dev);
@@ -190,12 +205,32 @@ static ssize_t rbd_snap_add(struct device *dev,
190static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev, 205static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
191 struct rbd_snap *snap); 206 struct rbd_snap *snap);
192 207
208static ssize_t rbd_add(struct bus_type *bus, const char *buf,
209 size_t count);
210static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
211 size_t count);
193 212
194static struct rbd_device *dev_to_rbd(struct device *dev) 213static struct bus_attribute rbd_bus_attrs[] = {
214 __ATTR(add, S_IWUSR, NULL, rbd_add),
215 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
216 __ATTR_NULL
217};
218
219static struct bus_type rbd_bus_type = {
220 .name = "rbd",
221 .bus_attrs = rbd_bus_attrs,
222};
223
224static void rbd_root_dev_release(struct device *dev)
195{ 225{
196 return container_of(dev, struct rbd_device, dev);
197} 226}
198 227
228static struct device rbd_root_dev = {
229 .init_name = "rbd",
230 .release = rbd_root_dev_release,
231};
232
233
199static struct device *rbd_get_dev(struct rbd_device *rbd_dev) 234static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
200{ 235{
201 return get_device(&rbd_dev->dev); 236 return get_device(&rbd_dev->dev);
@@ -210,8 +245,7 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev);
210 245
211static int rbd_open(struct block_device *bdev, fmode_t mode) 246static int rbd_open(struct block_device *bdev, fmode_t mode)
212{ 247{
213 struct gendisk *disk = bdev->bd_disk; 248 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
214 struct rbd_device *rbd_dev = disk->private_data;
215 249
216 rbd_get_dev(rbd_dev); 250 rbd_get_dev(rbd_dev);
217 251
@@ -256,9 +290,11 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt,
256 kref_init(&rbdc->kref); 290 kref_init(&rbdc->kref);
257 INIT_LIST_HEAD(&rbdc->node); 291 INIT_LIST_HEAD(&rbdc->node);
258 292
293 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
294
259 rbdc->client = ceph_create_client(opt, rbdc, 0, 0); 295 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
260 if (IS_ERR(rbdc->client)) 296 if (IS_ERR(rbdc->client))
261 goto out_rbdc; 297 goto out_mutex;
262 opt = NULL; /* Now rbdc->client is responsible for opt */ 298 opt = NULL; /* Now rbdc->client is responsible for opt */
263 299
264 ret = ceph_open_session(rbdc->client); 300 ret = ceph_open_session(rbdc->client);
@@ -267,16 +303,19 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt,
267 303
268 rbdc->rbd_opts = rbd_opts; 304 rbdc->rbd_opts = rbd_opts;
269 305
270 spin_lock(&node_lock); 306 spin_lock(&rbd_client_list_lock);
271 list_add_tail(&rbdc->node, &rbd_client_list); 307 list_add_tail(&rbdc->node, &rbd_client_list);
272 spin_unlock(&node_lock); 308 spin_unlock(&rbd_client_list_lock);
309
310 mutex_unlock(&ctl_mutex);
273 311
274 dout("rbd_client_create created %p\n", rbdc); 312 dout("rbd_client_create created %p\n", rbdc);
275 return rbdc; 313 return rbdc;
276 314
277out_err: 315out_err:
278 ceph_destroy_client(rbdc->client); 316 ceph_destroy_client(rbdc->client);
279out_rbdc: 317out_mutex:
318 mutex_unlock(&ctl_mutex);
280 kfree(rbdc); 319 kfree(rbdc);
281out_opt: 320out_opt:
282 if (opt) 321 if (opt)
@@ -324,7 +363,7 @@ static int parse_rbd_opts_token(char *c, void *private)
324 substring_t argstr[MAX_OPT_ARGS]; 363 substring_t argstr[MAX_OPT_ARGS];
325 int token, intval, ret; 364 int token, intval, ret;
326 365
327 token = match_token((char *)c, rbdopt_tokens, argstr); 366 token = match_token(c, rbdopt_tokens, argstr);
328 if (token < 0) 367 if (token < 0)
329 return -EINVAL; 368 return -EINVAL;
330 369
@@ -357,58 +396,54 @@ static int parse_rbd_opts_token(char *c, void *private)
357 * Get a ceph client with specific addr and configuration, if one does 396 * Get a ceph client with specific addr and configuration, if one does
358 * not exist create it. 397 * not exist create it.
359 */ 398 */
360static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, 399static struct rbd_client *rbd_get_client(const char *mon_addr,
361 char *options) 400 size_t mon_addr_len,
401 char *options)
362{ 402{
363 struct rbd_client *rbdc; 403 struct rbd_client *rbdc;
364 struct ceph_options *opt; 404 struct ceph_options *opt;
365 int ret;
366 struct rbd_options *rbd_opts; 405 struct rbd_options *rbd_opts;
367 406
368 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL); 407 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
369 if (!rbd_opts) 408 if (!rbd_opts)
370 return -ENOMEM; 409 return ERR_PTR(-ENOMEM);
371 410
372 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT; 411 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
373 412
374 ret = ceph_parse_options(&opt, options, mon_addr, 413 opt = ceph_parse_options(options, mon_addr,
375 mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts); 414 mon_addr + mon_addr_len,
376 if (ret < 0) 415 parse_rbd_opts_token, rbd_opts);
377 goto done_err; 416 if (IS_ERR(opt)) {
417 kfree(rbd_opts);
418 return ERR_CAST(opt);
419 }
378 420
379 spin_lock(&node_lock); 421 spin_lock(&rbd_client_list_lock);
380 rbdc = __rbd_client_find(opt); 422 rbdc = __rbd_client_find(opt);
381 if (rbdc) { 423 if (rbdc) {
424 /* using an existing client */
425 kref_get(&rbdc->kref);
426 spin_unlock(&rbd_client_list_lock);
427
382 ceph_destroy_options(opt); 428 ceph_destroy_options(opt);
383 kfree(rbd_opts); 429 kfree(rbd_opts);
384 430
385 /* using an existing client */ 431 return rbdc;
386 kref_get(&rbdc->kref);
387 rbd_dev->rbd_client = rbdc;
388 rbd_dev->client = rbdc->client;
389 spin_unlock(&node_lock);
390 return 0;
391 } 432 }
392 spin_unlock(&node_lock); 433 spin_unlock(&rbd_client_list_lock);
393 434
394 rbdc = rbd_client_create(opt, rbd_opts); 435 rbdc = rbd_client_create(opt, rbd_opts);
395 if (IS_ERR(rbdc)) {
396 ret = PTR_ERR(rbdc);
397 goto done_err;
398 }
399 436
400 rbd_dev->rbd_client = rbdc; 437 if (IS_ERR(rbdc))
401 rbd_dev->client = rbdc->client; 438 kfree(rbd_opts);
402 return 0; 439
403done_err: 440 return rbdc;
404 kfree(rbd_opts);
405 return ret;
406} 441}
407 442
408/* 443/*
409 * Destroy ceph client 444 * Destroy ceph client
410 * 445 *
411 * Caller must hold node_lock. 446 * Caller must hold rbd_client_list_lock.
412 */ 447 */
413static void rbd_client_release(struct kref *kref) 448static void rbd_client_release(struct kref *kref)
414{ 449{
@@ -428,11 +463,10 @@ static void rbd_client_release(struct kref *kref)
428 */ 463 */
429static void rbd_put_client(struct rbd_device *rbd_dev) 464static void rbd_put_client(struct rbd_device *rbd_dev)
430{ 465{
431 spin_lock(&node_lock); 466 spin_lock(&rbd_client_list_lock);
432 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release); 467 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
433 spin_unlock(&node_lock); 468 spin_unlock(&rbd_client_list_lock);
434 rbd_dev->rbd_client = NULL; 469 rbd_dev->rbd_client = NULL;
435 rbd_dev->client = NULL;
436} 470}
437 471
438/* 472/*
@@ -457,21 +491,19 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
457 gfp_t gfp_flags) 491 gfp_t gfp_flags)
458{ 492{
459 int i; 493 int i;
460 u32 snap_count = le32_to_cpu(ondisk->snap_count); 494 u32 snap_count;
461 int ret = -ENOMEM;
462 495
463 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT))) { 496 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
464 return -ENXIO; 497 return -ENXIO;
465 }
466 498
467 init_rwsem(&header->snap_rwsem); 499 snap_count = le32_to_cpu(ondisk->snap_count);
468 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
469 header->snapc = kmalloc(sizeof(struct ceph_snap_context) + 500 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
470 snap_count * 501 snap_count * sizeof (*ondisk),
471 sizeof(struct rbd_image_snap_ondisk),
472 gfp_flags); 502 gfp_flags);
473 if (!header->snapc) 503 if (!header->snapc)
474 return -ENOMEM; 504 return -ENOMEM;
505
506 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
475 if (snap_count) { 507 if (snap_count) {
476 header->snap_names = kmalloc(header->snap_names_len, 508 header->snap_names = kmalloc(header->snap_names_len,
477 GFP_KERNEL); 509 GFP_KERNEL);
@@ -498,8 +530,7 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
498 header->snapc->num_snaps = snap_count; 530 header->snapc->num_snaps = snap_count;
499 header->total_snaps = snap_count; 531 header->total_snaps = snap_count;
500 532
501 if (snap_count && 533 if (snap_count && allocated_snaps == snap_count) {
502 allocated_snaps == snap_count) {
503 for (i = 0; i < snap_count; i++) { 534 for (i = 0; i < snap_count; i++) {
504 header->snapc->snaps[i] = 535 header->snapc->snaps[i] =
505 le64_to_cpu(ondisk->snaps[i].id); 536 le64_to_cpu(ondisk->snaps[i].id);
@@ -518,7 +549,7 @@ err_names:
518 kfree(header->snap_names); 549 kfree(header->snap_names);
519err_snapc: 550err_snapc:
520 kfree(header->snapc); 551 kfree(header->snapc);
521 return ret; 552 return -ENOMEM;
522} 553}
523 554
524static int snap_index(struct rbd_image_header *header, int snap_num) 555static int snap_index(struct rbd_image_header *header, int snap_num)
@@ -542,35 +573,34 @@ static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
542 int i; 573 int i;
543 char *p = header->snap_names; 574 char *p = header->snap_names;
544 575
545 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) { 576 for (i = 0; i < header->total_snaps; i++) {
546 if (strcmp(snap_name, p) == 0) 577 if (!strcmp(snap_name, p)) {
547 break;
548 }
549 if (i == header->total_snaps)
550 return -ENOENT;
551 if (seq)
552 *seq = header->snapc->snaps[i];
553 578
554 if (size) 579 /* Found it. Pass back its id and/or size */
555 *size = header->snap_sizes[i];
556 580
557 return i; 581 if (seq)
582 *seq = header->snapc->snaps[i];
583 if (size)
584 *size = header->snap_sizes[i];
585 return i;
586 }
587 p += strlen(p) + 1; /* Skip ahead to the next name */
588 }
589 return -ENOENT;
558} 590}
559 591
560static int rbd_header_set_snap(struct rbd_device *dev, 592static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
561 const char *snap_name,
562 u64 *size)
563{ 593{
564 struct rbd_image_header *header = &dev->header; 594 struct rbd_image_header *header = &dev->header;
565 struct ceph_snap_context *snapc = header->snapc; 595 struct ceph_snap_context *snapc = header->snapc;
566 int ret = -ENOENT; 596 int ret = -ENOENT;
567 597
568 down_write(&header->snap_rwsem); 598 BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
569 599
570 if (!snap_name || 600 down_write(&dev->header_rwsem);
571 !*snap_name || 601
572 strcmp(snap_name, "-") == 0 || 602 if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
573 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) { 603 sizeof (RBD_SNAP_HEAD_NAME))) {
574 if (header->total_snaps) 604 if (header->total_snaps)
575 snapc->seq = header->snap_seq; 605 snapc->seq = header->snap_seq;
576 else 606 else
@@ -580,7 +610,7 @@ static int rbd_header_set_snap(struct rbd_device *dev,
580 if (size) 610 if (size)
581 *size = header->image_size; 611 *size = header->image_size;
582 } else { 612 } else {
583 ret = snap_by_name(header, snap_name, &snapc->seq, size); 613 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
584 if (ret < 0) 614 if (ret < 0)
585 goto done; 615 goto done;
586 616
@@ -590,7 +620,7 @@ static int rbd_header_set_snap(struct rbd_device *dev,
590 620
591 ret = 0; 621 ret = 0;
592done: 622done:
593 up_write(&header->snap_rwsem); 623 up_write(&dev->header_rwsem);
594 return ret; 624 return ret;
595} 625}
596 626
@@ -717,7 +747,7 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
717 747
718 /* split the bio. We'll release it either in the next 748 /* split the bio. We'll release it either in the next
719 call, or it will have to be released outside */ 749 call, or it will have to be released outside */
720 bp = bio_split(old_chain, (len - total) / 512ULL); 750 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
721 if (!bp) 751 if (!bp)
722 goto err_out; 752 goto err_out;
723 753
@@ -857,7 +887,7 @@ static int rbd_do_request(struct request *rq,
857 struct timespec mtime = CURRENT_TIME; 887 struct timespec mtime = CURRENT_TIME;
858 struct rbd_request *req_data; 888 struct rbd_request *req_data;
859 struct ceph_osd_request_head *reqhead; 889 struct ceph_osd_request_head *reqhead;
860 struct rbd_image_header *header = &dev->header; 890 struct ceph_osd_client *osdc;
861 891
862 req_data = kzalloc(sizeof(*req_data), GFP_NOIO); 892 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
863 if (!req_data) { 893 if (!req_data) {
@@ -874,15 +904,13 @@ static int rbd_do_request(struct request *rq,
874 904
875 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs); 905 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
876 906
877 down_read(&header->snap_rwsem); 907 down_read(&dev->header_rwsem);
878 908
879 req = ceph_osdc_alloc_request(&dev->client->osdc, flags, 909 osdc = &dev->rbd_client->client->osdc;
880 snapc, 910 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
881 ops, 911 false, GFP_NOIO, pages, bio);
882 false,
883 GFP_NOIO, pages, bio);
884 if (!req) { 912 if (!req) {
885 up_read(&header->snap_rwsem); 913 up_read(&dev->header_rwsem);
886 ret = -ENOMEM; 914 ret = -ENOMEM;
887 goto done_pages; 915 goto done_pages;
888 } 916 }
@@ -909,27 +937,27 @@ static int rbd_do_request(struct request *rq,
909 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 937 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
910 layout->fl_pg_preferred = cpu_to_le32(-1); 938 layout->fl_pg_preferred = cpu_to_le32(-1);
911 layout->fl_pg_pool = cpu_to_le32(dev->poolid); 939 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
912 ceph_calc_raw_layout(&dev->client->osdc, layout, snapid, 940 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
913 ofs, &len, &bno, req, ops); 941 req, ops);
914 942
915 ceph_osdc_build_request(req, ofs, &len, 943 ceph_osdc_build_request(req, ofs, &len,
916 ops, 944 ops,
917 snapc, 945 snapc,
918 &mtime, 946 &mtime,
919 req->r_oid, req->r_oid_len); 947 req->r_oid, req->r_oid_len);
920 up_read(&header->snap_rwsem); 948 up_read(&dev->header_rwsem);
921 949
922 if (linger_req) { 950 if (linger_req) {
923 ceph_osdc_set_request_linger(&dev->client->osdc, req); 951 ceph_osdc_set_request_linger(osdc, req);
924 *linger_req = req; 952 *linger_req = req;
925 } 953 }
926 954
927 ret = ceph_osdc_start_request(&dev->client->osdc, req, false); 955 ret = ceph_osdc_start_request(osdc, req, false);
928 if (ret < 0) 956 if (ret < 0)
929 goto done_err; 957 goto done_err;
930 958
931 if (!rbd_cb) { 959 if (!rbd_cb) {
932 ret = ceph_osdc_wait_request(&dev->client->osdc, req); 960 ret = ceph_osdc_wait_request(osdc, req);
933 if (ver) 961 if (ver)
934 *ver = le64_to_cpu(req->r_reassert_version.version); 962 *ver = le64_to_cpu(req->r_reassert_version.version);
935 dout("reassert_ver=%lld\n", 963 dout("reassert_ver=%lld\n",
@@ -1213,8 +1241,8 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1213 rc = __rbd_update_snaps(dev); 1241 rc = __rbd_update_snaps(dev);
1214 mutex_unlock(&ctl_mutex); 1242 mutex_unlock(&ctl_mutex);
1215 if (rc) 1243 if (rc)
1216 pr_warning(DRV_NAME "%d got notification but failed to update" 1244 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1217 " snaps: %d\n", dev->major, rc); 1245 " update snaps: %d\n", dev->major, rc);
1218 1246
1219 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name); 1247 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1220} 1248}
@@ -1227,7 +1255,7 @@ static int rbd_req_sync_watch(struct rbd_device *dev,
1227 u64 ver) 1255 u64 ver)
1228{ 1256{
1229 struct ceph_osd_req_op *ops; 1257 struct ceph_osd_req_op *ops;
1230 struct ceph_osd_client *osdc = &dev->client->osdc; 1258 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1231 1259
1232 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0); 1260 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1233 if (ret < 0) 1261 if (ret < 0)
@@ -1314,7 +1342,7 @@ static int rbd_req_sync_notify(struct rbd_device *dev,
1314 const char *obj) 1342 const char *obj)
1315{ 1343{
1316 struct ceph_osd_req_op *ops; 1344 struct ceph_osd_req_op *ops;
1317 struct ceph_osd_client *osdc = &dev->client->osdc; 1345 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1318 struct ceph_osd_event *event; 1346 struct ceph_osd_event *event;
1319 struct rbd_notify_info info; 1347 struct rbd_notify_info info;
1320 int payload_len = sizeof(u32) + sizeof(u32); 1348 int payload_len = sizeof(u32) + sizeof(u32);
@@ -1421,9 +1449,7 @@ static void rbd_rq_fn(struct request_queue *q)
1421 struct request *rq; 1449 struct request *rq;
1422 struct bio_pair *bp = NULL; 1450 struct bio_pair *bp = NULL;
1423 1451
1424 rq = blk_fetch_request(q); 1452 while ((rq = blk_fetch_request(q))) {
1425
1426 while (1) {
1427 struct bio *bio; 1453 struct bio *bio;
1428 struct bio *rq_bio, *next_bio = NULL; 1454 struct bio *rq_bio, *next_bio = NULL;
1429 bool do_write; 1455 bool do_write;
@@ -1441,32 +1467,32 @@ static void rbd_rq_fn(struct request_queue *q)
1441 /* filter out block requests we don't understand */ 1467 /* filter out block requests we don't understand */
1442 if ((rq->cmd_type != REQ_TYPE_FS)) { 1468 if ((rq->cmd_type != REQ_TYPE_FS)) {
1443 __blk_end_request_all(rq, 0); 1469 __blk_end_request_all(rq, 0);
1444 goto next; 1470 continue;
1445 } 1471 }
1446 1472
1447 /* deduce our operation (read, write) */ 1473 /* deduce our operation (read, write) */
1448 do_write = (rq_data_dir(rq) == WRITE); 1474 do_write = (rq_data_dir(rq) == WRITE);
1449 1475
1450 size = blk_rq_bytes(rq); 1476 size = blk_rq_bytes(rq);
1451 ofs = blk_rq_pos(rq) * 512ULL; 1477 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1452 rq_bio = rq->bio; 1478 rq_bio = rq->bio;
1453 if (do_write && rbd_dev->read_only) { 1479 if (do_write && rbd_dev->read_only) {
1454 __blk_end_request_all(rq, -EROFS); 1480 __blk_end_request_all(rq, -EROFS);
1455 goto next; 1481 continue;
1456 } 1482 }
1457 1483
1458 spin_unlock_irq(q->queue_lock); 1484 spin_unlock_irq(q->queue_lock);
1459 1485
1460 dout("%s 0x%x bytes at 0x%llx\n", 1486 dout("%s 0x%x bytes at 0x%llx\n",
1461 do_write ? "write" : "read", 1487 do_write ? "write" : "read",
1462 size, blk_rq_pos(rq) * 512ULL); 1488 size, blk_rq_pos(rq) * SECTOR_SIZE);
1463 1489
1464 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size); 1490 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1465 coll = rbd_alloc_coll(num_segs); 1491 coll = rbd_alloc_coll(num_segs);
1466 if (!coll) { 1492 if (!coll) {
1467 spin_lock_irq(q->queue_lock); 1493 spin_lock_irq(q->queue_lock);
1468 __blk_end_request_all(rq, -ENOMEM); 1494 __blk_end_request_all(rq, -ENOMEM);
1469 goto next; 1495 continue;
1470 } 1496 }
1471 1497
1472 do { 1498 do {
@@ -1512,8 +1538,6 @@ next_seg:
1512 if (bp) 1538 if (bp)
1513 bio_pair_release(bp); 1539 bio_pair_release(bp);
1514 spin_lock_irq(q->queue_lock); 1540 spin_lock_irq(q->queue_lock);
1515next:
1516 rq = blk_fetch_request(q);
1517 } 1541 }
1518} 1542}
1519 1543
@@ -1526,13 +1550,17 @@ static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1526 struct bio_vec *bvec) 1550 struct bio_vec *bvec)
1527{ 1551{
1528 struct rbd_device *rbd_dev = q->queuedata; 1552 struct rbd_device *rbd_dev = q->queuedata;
1529 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9); 1553 unsigned int chunk_sectors;
1530 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev); 1554 sector_t sector;
1531 unsigned int bio_sectors = bmd->bi_size >> 9; 1555 unsigned int bio_sectors;
1532 int max; 1556 int max;
1533 1557
1558 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1559 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1560 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1561
1534 max = (chunk_sectors - ((sector & (chunk_sectors - 1)) 1562 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1535 + bio_sectors)) << 9; 1563 + bio_sectors)) << SECTOR_SHIFT;
1536 if (max < 0) 1564 if (max < 0)
1537 max = 0; /* bio_add cannot handle a negative return */ 1565 max = 0; /* bio_add cannot handle a negative return */
1538 if (max <= bvec->bv_len && bio_sectors == 0) 1566 if (max <= bvec->bv_len && bio_sectors == 0)
@@ -1565,15 +1593,16 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
1565 ssize_t rc; 1593 ssize_t rc;
1566 struct rbd_image_header_ondisk *dh; 1594 struct rbd_image_header_ondisk *dh;
1567 int snap_count = 0; 1595 int snap_count = 0;
1568 u64 snap_names_len = 0;
1569 u64 ver; 1596 u64 ver;
1597 size_t len;
1570 1598
1599 /*
1600 * First reads the fixed-size header to determine the number
1601 * of snapshots, then re-reads it, along with all snapshot
1602 * records as well as their stored names.
1603 */
1604 len = sizeof (*dh);
1571 while (1) { 1605 while (1) {
1572 int len = sizeof(*dh) +
1573 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1574 snap_names_len;
1575
1576 rc = -ENOMEM;
1577 dh = kmalloc(len, GFP_KERNEL); 1606 dh = kmalloc(len, GFP_KERNEL);
1578 if (!dh) 1607 if (!dh)
1579 return -ENOMEM; 1608 return -ENOMEM;
@@ -1588,21 +1617,22 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
1588 1617
1589 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL); 1618 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1590 if (rc < 0) { 1619 if (rc < 0) {
1591 if (rc == -ENXIO) { 1620 if (rc == -ENXIO)
1592 pr_warning("unrecognized header format" 1621 pr_warning("unrecognized header format"
1593 " for image %s", rbd_dev->obj); 1622 " for image %s", rbd_dev->obj);
1594 }
1595 goto out_dh; 1623 goto out_dh;
1596 } 1624 }
1597 1625
1598 if (snap_count != header->total_snaps) { 1626 if (snap_count == header->total_snaps)
1599 snap_count = header->total_snaps; 1627 break;
1600 snap_names_len = header->snap_names_len; 1628
1601 rbd_header_free(header); 1629 snap_count = header->total_snaps;
1602 kfree(dh); 1630 len = sizeof (*dh) +
1603 continue; 1631 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1604 } 1632 header->snap_names_len;
1605 break; 1633
1634 rbd_header_free(header);
1635 kfree(dh);
1606 } 1636 }
1607 header->obj_version = ver; 1637 header->obj_version = ver;
1608 1638
@@ -1623,13 +1653,14 @@ static int rbd_header_add_snap(struct rbd_device *dev,
1623 int ret; 1653 int ret;
1624 void *data, *p, *e; 1654 void *data, *p, *e;
1625 u64 ver; 1655 u64 ver;
1656 struct ceph_mon_client *monc;
1626 1657
1627 /* we should create a snapshot only if we're pointing at the head */ 1658 /* we should create a snapshot only if we're pointing at the head */
1628 if (dev->cur_snap) 1659 if (dev->cur_snap)
1629 return -EINVAL; 1660 return -EINVAL;
1630 1661
1631 ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid, 1662 monc = &dev->rbd_client->client->monc;
1632 &new_snapid); 1663 ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
1633 dout("created snapid=%lld\n", new_snapid); 1664 dout("created snapid=%lld\n", new_snapid);
1634 if (ret < 0) 1665 if (ret < 0)
1635 return ret; 1666 return ret;
@@ -1684,9 +1715,9 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1684 return ret; 1715 return ret;
1685 1716
1686 /* resized? */ 1717 /* resized? */
1687 set_capacity(rbd_dev->disk, h.image_size / 512ULL); 1718 set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1688 1719
1689 down_write(&rbd_dev->header.snap_rwsem); 1720 down_write(&rbd_dev->header_rwsem);
1690 1721
1691 snap_seq = rbd_dev->header.snapc->seq; 1722 snap_seq = rbd_dev->header.snapc->seq;
1692 if (rbd_dev->header.total_snaps && 1723 if (rbd_dev->header.total_snaps &&
@@ -1711,7 +1742,7 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1711 1742
1712 ret = __rbd_init_snaps_header(rbd_dev); 1743 ret = __rbd_init_snaps_header(rbd_dev);
1713 1744
1714 up_write(&rbd_dev->header.snap_rwsem); 1745 up_write(&rbd_dev->header_rwsem);
1715 1746
1716 return ret; 1747 return ret;
1717} 1748}
@@ -1721,6 +1752,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
1721 struct gendisk *disk; 1752 struct gendisk *disk;
1722 struct request_queue *q; 1753 struct request_queue *q;
1723 int rc; 1754 int rc;
1755 u64 segment_size;
1724 u64 total_size = 0; 1756 u64 total_size = 0;
1725 1757
1726 /* contact OSD, request size info about the object being mapped */ 1758 /* contact OSD, request size info about the object being mapped */
@@ -1733,7 +1765,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
1733 if (rc) 1765 if (rc)
1734 return rc; 1766 return rc;
1735 1767
1736 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size); 1768 rc = rbd_header_set_snap(rbd_dev, &total_size);
1737 if (rc) 1769 if (rc)
1738 return rc; 1770 return rc;
1739 1771
@@ -1743,7 +1775,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
1743 if (!disk) 1775 if (!disk)
1744 goto out; 1776 goto out;
1745 1777
1746 snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d", 1778 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1747 rbd_dev->id); 1779 rbd_dev->id);
1748 disk->major = rbd_dev->major; 1780 disk->major = rbd_dev->major;
1749 disk->first_minor = 0; 1781 disk->first_minor = 0;
@@ -1756,11 +1788,15 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
1756 if (!q) 1788 if (!q)
1757 goto out_disk; 1789 goto out_disk;
1758 1790
1791 /* We use the default size, but let's be explicit about it. */
1792 blk_queue_physical_block_size(q, SECTOR_SIZE);
1793
1759 /* set io sizes to object size */ 1794 /* set io sizes to object size */
1760 blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL); 1795 segment_size = rbd_obj_bytes(&rbd_dev->header);
1761 blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header)); 1796 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1762 blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header)); 1797 blk_queue_max_segment_size(q, segment_size);
1763 blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header)); 1798 blk_queue_io_min(q, segment_size);
1799 blk_queue_io_opt(q, segment_size);
1764 1800
1765 blk_queue_merge_bvec(q, rbd_merge_bvec); 1801 blk_queue_merge_bvec(q, rbd_merge_bvec);
1766 disk->queue = q; 1802 disk->queue = q;
@@ -1771,7 +1807,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
1771 rbd_dev->q = q; 1807 rbd_dev->q = q;
1772 1808
1773 /* finally, announce the disk to the world */ 1809 /* finally, announce the disk to the world */
1774 set_capacity(disk, total_size / 512ULL); 1810 set_capacity(disk, total_size / SECTOR_SIZE);
1775 add_disk(disk); 1811 add_disk(disk);
1776 1812
1777 pr_info("%s: added with size 0x%llx\n", 1813 pr_info("%s: added with size 0x%llx\n",
@@ -1788,10 +1824,15 @@ out:
1788 sysfs 1824 sysfs
1789*/ 1825*/
1790 1826
1827static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1828{
1829 return container_of(dev, struct rbd_device, dev);
1830}
1831
1791static ssize_t rbd_size_show(struct device *dev, 1832static ssize_t rbd_size_show(struct device *dev,
1792 struct device_attribute *attr, char *buf) 1833 struct device_attribute *attr, char *buf)
1793{ 1834{
1794 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1835 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1795 1836
1796 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size); 1837 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1797} 1838}
@@ -1799,7 +1840,7 @@ static ssize_t rbd_size_show(struct device *dev,
1799static ssize_t rbd_major_show(struct device *dev, 1840static ssize_t rbd_major_show(struct device *dev,
1800 struct device_attribute *attr, char *buf) 1841 struct device_attribute *attr, char *buf)
1801{ 1842{
1802 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1843 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1803 1844
1804 return sprintf(buf, "%d\n", rbd_dev->major); 1845 return sprintf(buf, "%d\n", rbd_dev->major);
1805} 1846}
@@ -1807,15 +1848,16 @@ static ssize_t rbd_major_show(struct device *dev,
1807static ssize_t rbd_client_id_show(struct device *dev, 1848static ssize_t rbd_client_id_show(struct device *dev,
1808 struct device_attribute *attr, char *buf) 1849 struct device_attribute *attr, char *buf)
1809{ 1850{
1810 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1851 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1811 1852
1812 return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client)); 1853 return sprintf(buf, "client%lld\n",
1854 ceph_client_id(rbd_dev->rbd_client->client));
1813} 1855}
1814 1856
1815static ssize_t rbd_pool_show(struct device *dev, 1857static ssize_t rbd_pool_show(struct device *dev,
1816 struct device_attribute *attr, char *buf) 1858 struct device_attribute *attr, char *buf)
1817{ 1859{
1818 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1860 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1819 1861
1820 return sprintf(buf, "%s\n", rbd_dev->pool_name); 1862 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1821} 1863}
@@ -1823,7 +1865,7 @@ static ssize_t rbd_pool_show(struct device *dev,
1823static ssize_t rbd_name_show(struct device *dev, 1865static ssize_t rbd_name_show(struct device *dev,
1824 struct device_attribute *attr, char *buf) 1866 struct device_attribute *attr, char *buf)
1825{ 1867{
1826 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1868 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1827 1869
1828 return sprintf(buf, "%s\n", rbd_dev->obj); 1870 return sprintf(buf, "%s\n", rbd_dev->obj);
1829} 1871}
@@ -1832,7 +1874,7 @@ static ssize_t rbd_snap_show(struct device *dev,
1832 struct device_attribute *attr, 1874 struct device_attribute *attr,
1833 char *buf) 1875 char *buf)
1834{ 1876{
1835 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1877 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1836 1878
1837 return sprintf(buf, "%s\n", rbd_dev->snap_name); 1879 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1838} 1880}
@@ -1842,7 +1884,7 @@ static ssize_t rbd_image_refresh(struct device *dev,
1842 const char *buf, 1884 const char *buf,
1843 size_t size) 1885 size_t size)
1844{ 1886{
1845 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1887 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1846 int rc; 1888 int rc;
1847 int ret = size; 1889 int ret = size;
1848 1890
@@ -1907,7 +1949,7 @@ static ssize_t rbd_snap_size_show(struct device *dev,
1907{ 1949{
1908 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1950 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1909 1951
1910 return sprintf(buf, "%lld\n", (long long)snap->size); 1952 return sprintf(buf, "%zd\n", snap->size);
1911} 1953}
1912 1954
1913static ssize_t rbd_snap_id_show(struct device *dev, 1955static ssize_t rbd_snap_id_show(struct device *dev,
@@ -1916,7 +1958,7 @@ static ssize_t rbd_snap_id_show(struct device *dev,
1916{ 1958{
1917 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1959 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1918 1960
1919 return sprintf(buf, "%lld\n", (long long)snap->id); 1961 return sprintf(buf, "%llu\n", (unsigned long long) snap->id);
1920} 1962}
1921 1963
1922static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL); 1964static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
@@ -2088,19 +2130,9 @@ static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2088 return 0; 2130 return 0;
2089} 2131}
2090 2132
2091
2092static void rbd_root_dev_release(struct device *dev)
2093{
2094}
2095
2096static struct device rbd_root_dev = {
2097 .init_name = "rbd",
2098 .release = rbd_root_dev_release,
2099};
2100
2101static int rbd_bus_add_dev(struct rbd_device *rbd_dev) 2133static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2102{ 2134{
2103 int ret = -ENOMEM; 2135 int ret;
2104 struct device *dev; 2136 struct device *dev;
2105 struct rbd_snap *snap; 2137 struct rbd_snap *snap;
2106 2138
@@ -2114,7 +2146,7 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2114 dev_set_name(dev, "%d", rbd_dev->id); 2146 dev_set_name(dev, "%d", rbd_dev->id);
2115 ret = device_register(dev); 2147 ret = device_register(dev);
2116 if (ret < 0) 2148 if (ret < 0)
2117 goto done_free; 2149 goto out;
2118 2150
2119 list_for_each_entry(snap, &rbd_dev->snaps, node) { 2151 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2120 ret = rbd_register_snap_dev(rbd_dev, snap, 2152 ret = rbd_register_snap_dev(rbd_dev, snap,
@@ -2122,10 +2154,7 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2122 if (ret < 0) 2154 if (ret < 0)
2123 break; 2155 break;
2124 } 2156 }
2125 2157out:
2126 mutex_unlock(&ctl_mutex);
2127 return 0;
2128done_free:
2129 mutex_unlock(&ctl_mutex); 2158 mutex_unlock(&ctl_mutex);
2130 return ret; 2159 return ret;
2131} 2160}
@@ -2154,104 +2183,250 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2154 return ret; 2183 return ret;
2155} 2184}
2156 2185
2186static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2187
2188/*
2189 * Get a unique rbd identifier for the given new rbd_dev, and add
2190 * the rbd_dev to the global list. The minimum rbd id is 1.
2191 */
2192static void rbd_id_get(struct rbd_device *rbd_dev)
2193{
2194 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2195
2196 spin_lock(&rbd_dev_list_lock);
2197 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2198 spin_unlock(&rbd_dev_list_lock);
2199}
2200
2201/*
2202 * Remove an rbd_dev from the global list, and record that its
2203 * identifier is no longer in use.
2204 */
2205static void rbd_id_put(struct rbd_device *rbd_dev)
2206{
2207 struct list_head *tmp;
2208 int rbd_id = rbd_dev->id;
2209 int max_id;
2210
2211 BUG_ON(rbd_id < 1);
2212
2213 spin_lock(&rbd_dev_list_lock);
2214 list_del_init(&rbd_dev->node);
2215
2216 /*
2217 * If the id being "put" is not the current maximum, there
2218 * is nothing special we need to do.
2219 */
2220 if (rbd_id != atomic64_read(&rbd_id_max)) {
2221 spin_unlock(&rbd_dev_list_lock);
2222 return;
2223 }
2224
2225 /*
2226 * We need to update the current maximum id. Search the
2227 * list to find out what it is. We're more likely to find
2228 * the maximum at the end, so search the list backward.
2229 */
2230 max_id = 0;
2231 list_for_each_prev(tmp, &rbd_dev_list) {
2232 struct rbd_device *rbd_dev;
2233
2234 rbd_dev = list_entry(tmp, struct rbd_device, node);
2235 if (rbd_id > max_id)
2236 max_id = rbd_id;
2237 }
2238 spin_unlock(&rbd_dev_list_lock);
2239
2240 /*
2241 * The max id could have been updated by rbd_id_get(), in
2242 * which case it now accurately reflects the new maximum.
2243 * Be careful not to overwrite the maximum value in that
2244 * case.
2245 */
2246 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2247}
2248
2249/*
2250 * Skips over white space at *buf, and updates *buf to point to the
2251 * first found non-space character (if any). Returns the length of
2252 * the token (string of non-white space characters) found. Note
2253 * that *buf must be terminated with '\0'.
2254 */
2255static inline size_t next_token(const char **buf)
2256{
2257 /*
2258 * These are the characters that produce nonzero for
2259 * isspace() in the "C" and "POSIX" locales.
2260 */
2261 const char *spaces = " \f\n\r\t\v";
2262
2263 *buf += strspn(*buf, spaces); /* Find start of token */
2264
2265 return strcspn(*buf, spaces); /* Return token length */
2266}
2267
2268/*
2269 * Finds the next token in *buf, and if the provided token buffer is
2270 * big enough, copies the found token into it. The result, if
2271 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2272 * must be terminated with '\0' on entry.
2273 *
2274 * Returns the length of the token found (not including the '\0').
2275 * Return value will be 0 if no token is found, and it will be >=
2276 * token_size if the token would not fit.
2277 *
2278 * The *buf pointer will be updated to point beyond the end of the
2279 * found token. Note that this occurs even if the token buffer is
2280 * too small to hold it.
2281 */
2282static inline size_t copy_token(const char **buf,
2283 char *token,
2284 size_t token_size)
2285{
2286 size_t len;
2287
2288 len = next_token(buf);
2289 if (len < token_size) {
2290 memcpy(token, *buf, len);
2291 *(token + len) = '\0';
2292 }
2293 *buf += len;
2294
2295 return len;
2296}
2297
2298/*
2299 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2300 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2301 * on the list of monitor addresses and other options provided via
2302 * /sys/bus/rbd/add.
2303 */
2304static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2305 const char *buf,
2306 const char **mon_addrs,
2307 size_t *mon_addrs_size,
2308 char *options,
2309 size_t options_size)
2310{
2311 size_t len;
2312
2313 /* The first four tokens are required */
2314
2315 len = next_token(&buf);
2316 if (!len)
2317 return -EINVAL;
2318 *mon_addrs_size = len + 1;
2319 *mon_addrs = buf;
2320
2321 buf += len;
2322
2323 len = copy_token(&buf, options, options_size);
2324 if (!len || len >= options_size)
2325 return -EINVAL;
2326
2327 len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2328 if (!len || len >= sizeof (rbd_dev->pool_name))
2329 return -EINVAL;
2330
2331 len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2332 if (!len || len >= sizeof (rbd_dev->obj))
2333 return -EINVAL;
2334
2335 /* We have the object length in hand, save it. */
2336
2337 rbd_dev->obj_len = len;
2338
2339 BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2340 < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2341 sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2342
2343 /*
2344 * The snapshot name is optional, but it's an error if it's
2345 * too long. If no snapshot is supplied, fill in the default.
2346 */
2347 len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2348 if (!len)
2349 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2350 sizeof (RBD_SNAP_HEAD_NAME));
2351 else if (len >= sizeof (rbd_dev->snap_name))
2352 return -EINVAL;
2353
2354 return 0;
2355}
2356
2157static ssize_t rbd_add(struct bus_type *bus, 2357static ssize_t rbd_add(struct bus_type *bus,
2158 const char *buf, 2358 const char *buf,
2159 size_t count) 2359 size_t count)
2160{ 2360{
2161 struct ceph_osd_client *osdc;
2162 struct rbd_device *rbd_dev; 2361 struct rbd_device *rbd_dev;
2163 ssize_t rc = -ENOMEM; 2362 const char *mon_addrs = NULL;
2164 int irc, new_id = 0; 2363 size_t mon_addrs_size = 0;
2165 struct list_head *tmp; 2364 char *options = NULL;
2166 char *mon_dev_name; 2365 struct ceph_osd_client *osdc;
2167 char *options; 2366 int rc = -ENOMEM;
2168 2367
2169 if (!try_module_get(THIS_MODULE)) 2368 if (!try_module_get(THIS_MODULE))
2170 return -ENODEV; 2369 return -ENODEV;
2171 2370
2172 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2173 if (!mon_dev_name)
2174 goto err_out_mod;
2175
2176 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2177 if (!options)
2178 goto err_mon_dev;
2179
2180 /* new rbd_device object */
2181 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); 2371 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2182 if (!rbd_dev) 2372 if (!rbd_dev)
2183 goto err_out_opt; 2373 goto err_nomem;
2374 options = kmalloc(count, GFP_KERNEL);
2375 if (!options)
2376 goto err_nomem;
2184 2377
2185 /* static rbd_device initialization */ 2378 /* static rbd_device initialization */
2186 spin_lock_init(&rbd_dev->lock); 2379 spin_lock_init(&rbd_dev->lock);
2187 INIT_LIST_HEAD(&rbd_dev->node); 2380 INIT_LIST_HEAD(&rbd_dev->node);
2188 INIT_LIST_HEAD(&rbd_dev->snaps); 2381 INIT_LIST_HEAD(&rbd_dev->snaps);
2382 init_rwsem(&rbd_dev->header_rwsem);
2189 2383
2190 init_rwsem(&rbd_dev->header.snap_rwsem); 2384 init_rwsem(&rbd_dev->header_rwsem);
2191 2385
2192 /* generate unique id: find highest unique id, add one */ 2386 /* generate unique id: find highest unique id, add one */
2193 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2387 rbd_id_get(rbd_dev);
2194
2195 list_for_each(tmp, &rbd_dev_list) {
2196 struct rbd_device *rbd_dev;
2197 2388
2198 rbd_dev = list_entry(tmp, struct rbd_device, node); 2389 /* Fill in the device name, now that we have its id. */
2199 if (rbd_dev->id >= new_id) 2390 BUILD_BUG_ON(DEV_NAME_LEN
2200 new_id = rbd_dev->id + 1; 2391 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2201 } 2392 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2202
2203 rbd_dev->id = new_id;
2204
2205 /* add to global list */
2206 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2207 2393
2208 /* parse add command */ 2394 /* parse add command */
2209 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s " 2395 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2210 "%" __stringify(RBD_MAX_OPT_LEN) "s " 2396 options, count);
2211 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s " 2397 if (rc)
2212 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s" 2398 goto err_put_id;
2213 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2214 mon_dev_name, options, rbd_dev->pool_name,
2215 rbd_dev->obj, rbd_dev->snap_name) < 4) {
2216 rc = -EINVAL;
2217 goto err_out_slot;
2218 }
2219
2220 if (rbd_dev->snap_name[0] == 0)
2221 rbd_dev->snap_name[0] = '-';
2222
2223 rbd_dev->obj_len = strlen(rbd_dev->obj);
2224 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2225 rbd_dev->obj, RBD_SUFFIX);
2226
2227 /* initialize rest of new object */
2228 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2229 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2230 if (rc < 0)
2231 goto err_out_slot;
2232 2399
2233 mutex_unlock(&ctl_mutex); 2400 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2401 options);
2402 if (IS_ERR(rbd_dev->rbd_client)) {
2403 rc = PTR_ERR(rbd_dev->rbd_client);
2404 goto err_put_id;
2405 }
2234 2406
2235 /* pick the pool */ 2407 /* pick the pool */
2236 osdc = &rbd_dev->client->osdc; 2408 osdc = &rbd_dev->rbd_client->client->osdc;
2237 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name); 2409 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2238 if (rc < 0) 2410 if (rc < 0)
2239 goto err_out_client; 2411 goto err_out_client;
2240 rbd_dev->poolid = rc; 2412 rbd_dev->poolid = rc;
2241 2413
2242 /* register our block device */ 2414 /* register our block device */
2243 irc = register_blkdev(0, rbd_dev->name); 2415 rc = register_blkdev(0, rbd_dev->name);
2244 if (irc < 0) { 2416 if (rc < 0)
2245 rc = irc;
2246 goto err_out_client; 2417 goto err_out_client;
2247 } 2418 rbd_dev->major = rc;
2248 rbd_dev->major = irc;
2249 2419
2250 rc = rbd_bus_add_dev(rbd_dev); 2420 rc = rbd_bus_add_dev(rbd_dev);
2251 if (rc) 2421 if (rc)
2252 goto err_out_blkdev; 2422 goto err_out_blkdev;
2253 2423
2254 /* set up and announce blkdev mapping */ 2424 /*
2425 * At this point cleanup in the event of an error is the job
2426 * of the sysfs code (initiated by rbd_bus_del_dev()).
2427 *
2428 * Set up and announce blkdev mapping.
2429 */
2255 rc = rbd_init_disk(rbd_dev); 2430 rc = rbd_init_disk(rbd_dev);
2256 if (rc) 2431 if (rc)
2257 goto err_out_bus; 2432 goto err_out_bus;
@@ -2263,35 +2438,26 @@ static ssize_t rbd_add(struct bus_type *bus,
2263 return count; 2438 return count;
2264 2439
2265err_out_bus: 2440err_out_bus:
2266 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2267 list_del_init(&rbd_dev->node);
2268 mutex_unlock(&ctl_mutex);
2269
2270 /* this will also clean up rest of rbd_dev stuff */ 2441 /* this will also clean up rest of rbd_dev stuff */
2271 2442
2272 rbd_bus_del_dev(rbd_dev); 2443 rbd_bus_del_dev(rbd_dev);
2273 kfree(options); 2444 kfree(options);
2274 kfree(mon_dev_name);
2275 return rc; 2445 return rc;
2276 2446
2277err_out_blkdev: 2447err_out_blkdev:
2278 unregister_blkdev(rbd_dev->major, rbd_dev->name); 2448 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2279err_out_client: 2449err_out_client:
2280 rbd_put_client(rbd_dev); 2450 rbd_put_client(rbd_dev);
2281 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2451err_put_id:
2282err_out_slot: 2452 rbd_id_put(rbd_dev);
2283 list_del_init(&rbd_dev->node); 2453err_nomem:
2284 mutex_unlock(&ctl_mutex);
2285
2286 kfree(rbd_dev);
2287err_out_opt:
2288 kfree(options); 2454 kfree(options);
2289err_mon_dev: 2455 kfree(rbd_dev);
2290 kfree(mon_dev_name); 2456
2291err_out_mod:
2292 dout("Error adding device %s\n", buf); 2457 dout("Error adding device %s\n", buf);
2293 module_put(THIS_MODULE); 2458 module_put(THIS_MODULE);
2294 return rc; 2459
2460 return (ssize_t) rc;
2295} 2461}
2296 2462
2297static struct rbd_device *__rbd_get_dev(unsigned long id) 2463static struct rbd_device *__rbd_get_dev(unsigned long id)
@@ -2299,22 +2465,28 @@ static struct rbd_device *__rbd_get_dev(unsigned long id)
2299 struct list_head *tmp; 2465 struct list_head *tmp;
2300 struct rbd_device *rbd_dev; 2466 struct rbd_device *rbd_dev;
2301 2467
2468 spin_lock(&rbd_dev_list_lock);
2302 list_for_each(tmp, &rbd_dev_list) { 2469 list_for_each(tmp, &rbd_dev_list) {
2303 rbd_dev = list_entry(tmp, struct rbd_device, node); 2470 rbd_dev = list_entry(tmp, struct rbd_device, node);
2304 if (rbd_dev->id == id) 2471 if (rbd_dev->id == id) {
2472 spin_unlock(&rbd_dev_list_lock);
2305 return rbd_dev; 2473 return rbd_dev;
2474 }
2306 } 2475 }
2476 spin_unlock(&rbd_dev_list_lock);
2307 return NULL; 2477 return NULL;
2308} 2478}
2309 2479
2310static void rbd_dev_release(struct device *dev) 2480static void rbd_dev_release(struct device *dev)
2311{ 2481{
2312 struct rbd_device *rbd_dev = 2482 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2313 container_of(dev, struct rbd_device, dev);
2314 2483
2315 if (rbd_dev->watch_request) 2484 if (rbd_dev->watch_request) {
2316 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc, 2485 struct ceph_client *client = rbd_dev->rbd_client->client;
2486
2487 ceph_osdc_unregister_linger_request(&client->osdc,
2317 rbd_dev->watch_request); 2488 rbd_dev->watch_request);
2489 }
2318 if (rbd_dev->watch_event) 2490 if (rbd_dev->watch_event)
2319 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name); 2491 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2320 2492
@@ -2323,6 +2495,9 @@ static void rbd_dev_release(struct device *dev)
2323 /* clean up and free blkdev */ 2495 /* clean up and free blkdev */
2324 rbd_free_disk(rbd_dev); 2496 rbd_free_disk(rbd_dev);
2325 unregister_blkdev(rbd_dev->major, rbd_dev->name); 2497 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2498
2499 /* done with the id, and with the rbd_dev */
2500 rbd_id_put(rbd_dev);
2326 kfree(rbd_dev); 2501 kfree(rbd_dev);
2327 2502
2328 /* release module ref */ 2503 /* release module ref */
@@ -2355,8 +2530,6 @@ static ssize_t rbd_remove(struct bus_type *bus,
2355 goto done; 2530 goto done;
2356 } 2531 }
2357 2532
2358 list_del_init(&rbd_dev->node);
2359
2360 __rbd_remove_all_snaps(rbd_dev); 2533 __rbd_remove_all_snaps(rbd_dev);
2361 rbd_bus_del_dev(rbd_dev); 2534 rbd_bus_del_dev(rbd_dev);
2362 2535
@@ -2370,7 +2543,7 @@ static ssize_t rbd_snap_add(struct device *dev,
2370 const char *buf, 2543 const char *buf,
2371 size_t count) 2544 size_t count)
2372{ 2545{
2373 struct rbd_device *rbd_dev = dev_to_rbd(dev); 2546 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2374 int ret; 2547 int ret;
2375 char *name = kmalloc(count + 1, GFP_KERNEL); 2548 char *name = kmalloc(count + 1, GFP_KERNEL);
2376 if (!name) 2549 if (!name)
@@ -2406,12 +2579,6 @@ err_unlock:
2406 return ret; 2579 return ret;
2407} 2580}
2408 2581
2409static struct bus_attribute rbd_bus_attrs[] = {
2410 __ATTR(add, S_IWUSR, NULL, rbd_add),
2411 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
2412 __ATTR_NULL
2413};
2414
2415/* 2582/*
2416 * create control files in sysfs 2583 * create control files in sysfs
2417 * /sys/bus/rbd/... 2584 * /sys/bus/rbd/...
@@ -2420,21 +2587,21 @@ static int rbd_sysfs_init(void)
2420{ 2587{
2421 int ret; 2588 int ret;
2422 2589
2423 rbd_bus_type.bus_attrs = rbd_bus_attrs; 2590 ret = device_register(&rbd_root_dev);
2424 2591 if (ret < 0)
2425 ret = bus_register(&rbd_bus_type);
2426 if (ret < 0)
2427 return ret; 2592 return ret;
2428 2593
2429 ret = device_register(&rbd_root_dev); 2594 ret = bus_register(&rbd_bus_type);
2595 if (ret < 0)
2596 device_unregister(&rbd_root_dev);
2430 2597
2431 return ret; 2598 return ret;
2432} 2599}
2433 2600
2434static void rbd_sysfs_cleanup(void) 2601static void rbd_sysfs_cleanup(void)
2435{ 2602{
2436 device_unregister(&rbd_root_dev);
2437 bus_unregister(&rbd_bus_type); 2603 bus_unregister(&rbd_bus_type);
2604 device_unregister(&rbd_root_dev);
2438} 2605}
2439 2606
2440int __init rbd_init(void) 2607int __init rbd_init(void)
@@ -2444,8 +2611,7 @@ int __init rbd_init(void)
2444 rc = rbd_sysfs_init(); 2611 rc = rbd_sysfs_init();
2445 if (rc) 2612 if (rc)
2446 return rc; 2613 return rc;
2447 spin_lock_init(&node_lock); 2614 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2448 pr_info("loaded " DRV_NAME_LONG "\n");
2449 return 0; 2615 return 0;
2450} 2616}
2451 2617
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index fc6c678aa2cb..950708688f17 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -41,10 +41,6 @@
41#define RBD_HEADER_SIGNATURE "RBD" 41#define RBD_HEADER_SIGNATURE "RBD"
42#define RBD_HEADER_VERSION "001.005" 42#define RBD_HEADER_VERSION "001.005"
43 43
44struct rbd_info {
45 __le64 max_id;
46} __attribute__ ((packed));
47
48struct rbd_image_snap_ondisk { 44struct rbd_image_snap_ondisk {
49 __le64 id; 45 __le64 id;
50 __le64 image_size; 46 __le64 image_size;