aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2012-12-20 17:00:13 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2012-12-20 17:00:13 -0500
commit40889e8d9fc6355980cf2bc94ef4356c10dec4ec (patch)
treec03f4e218477052c665cd9b01352f83e32c4a593
parent1ca22254b32657d65315af261ae0e699b8427fb7 (diff)
parentc3e946ce7276faf0b302acd25c7b874edbeba661 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph update from Sage Weil: "There are a few different groups of commits here. The largest is Alex's ongoing work to enable the coming RBD features (cloning, striping). There is some cleanup in libceph that goes along with it. Cyril and David have fixed some problems with NFS reexport (leaking dentries and page locks), and there is a batch of patches from Yan fixing problems with the fs client when running against a clustered MDS. There are a few bug fixes mixed in for good measure, many of which will be going to the stable trees once they're upstream. My apologies for the late pull. There is still a gremlin in the rbd map/unmap code and I was hoping to include the fix for that as well, but we haven't been able to confirm the fix is correct yet; I'll send that in a separate pull once it's nailed down." * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (68 commits) rbd: get rid of rbd_{get,put}_dev() libceph: register request before unregister linger libceph: don't use rb_init_node() in ceph_osdc_alloc_request() libceph: init event->node in ceph_osdc_create_event() libceph: init osd->o_node in create_osd() libceph: report connection fault with warning libceph: socket can close in any connection state rbd: don't use ENOTSUPP rbd: remove linger unconditionally rbd: get rid of RBD_MAX_SEG_NAME_LEN libceph: avoid using freed osd in __kick_osd_requests() ceph: don't reference req after put rbd: do not allow remove of mounted-on image libceph: Unlock unprocessed pages in start_read() error path ceph: call handle_cap_grant() for cap import message ceph: Fix __ceph_do_pending_vmtruncate ceph: Don't add dirty inode to dirty list if caps is in migration ceph: Fix infinite loop in __wake_requests ceph: Don't update i_max_size when handling non-auth cap bdi_register: add __printf verification, fix arg mismatch ...
-rw-r--r--Documentation/ABI/testing/sysfs-bus-rbd4
-rw-r--r--drivers/block/rbd.c1389
-rw-r--r--drivers/block/rbd_types.h2
-rw-r--r--fs/ceph/addr.c60
-rw-r--r--fs/ceph/caps.c18
-rw-r--r--fs/ceph/file.c73
-rw-r--r--fs/ceph/inode.c15
-rw-r--r--fs/ceph/mds_client.c11
-rw-r--r--fs/ceph/super.c4
-rw-r--r--include/linux/backing-dev.h1
-rw-r--r--include/linux/ceph/libceph.h2
-rw-r--r--include/linux/ceph/osdmap.h1
-rw-r--r--include/linux/ceph/rados.h2
-rw-r--r--net/ceph/ceph_common.c3
-rw-r--r--net/ceph/messenger.c107
-rw-r--r--net/ceph/osd_client.c59
-rw-r--r--net/ceph/osdmap.c47
17 files changed, 1192 insertions, 606 deletions
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index 1cf2adf46b1..cd9213ccf3d 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -70,6 +70,10 @@ snap_*
70 70
71 A directory per each snapshot 71 A directory per each snapshot
72 72
73parent
74
75 Information identifying the pool, image, and snapshot id for
76 the parent image in a layered rbd image (format 2 only).
73 77
74Entries under /sys/bus/rbd/devices/<dev-id>/snap_<snap-name> 78Entries under /sys/bus/rbd/devices/<dev-id>/snap_<snap-name>
75------------------------------------------------------------- 79-------------------------------------------------------------
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index bb3d9be3b1b..89576a0b3f2 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -61,15 +61,29 @@
61 61
62#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ 62#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63 63
64#define RBD_MAX_SNAP_NAME_LEN 32 64#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
65#define RBD_MAX_SNAP_NAME_LEN \
66 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67
65#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ 68#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
66#define RBD_MAX_OPT_LEN 1024 69#define RBD_MAX_OPT_LEN 1024
67 70
68#define RBD_SNAP_HEAD_NAME "-" 71#define RBD_SNAP_HEAD_NAME "-"
69 72
73/* This allows a single page to hold an image name sent by OSD */
74#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
70#define RBD_IMAGE_ID_LEN_MAX 64 75#define RBD_IMAGE_ID_LEN_MAX 64
76
71#define RBD_OBJ_PREFIX_LEN_MAX 64 77#define RBD_OBJ_PREFIX_LEN_MAX 64
72 78
79/* Feature bits */
80
81#define RBD_FEATURE_LAYERING 1
82
83/* Features supported by this (client software) implementation. */
84
85#define RBD_FEATURES_ALL (0)
86
73/* 87/*
74 * An RBD device name will be "rbd#", where the "rbd" comes from 88 * An RBD device name will be "rbd#", where the "rbd" comes from
75 * RBD_DRV_NAME above, and # is a unique integer identifier. 89 * RBD_DRV_NAME above, and # is a unique integer identifier.
@@ -101,6 +115,27 @@ struct rbd_image_header {
101 u64 obj_version; 115 u64 obj_version;
102}; 116};
103 117
118/*
119 * An rbd image specification.
120 *
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122 * identify an image.
123 */
124struct rbd_spec {
125 u64 pool_id;
126 char *pool_name;
127
128 char *image_id;
129 size_t image_id_len;
130 char *image_name;
131 size_t image_name_len;
132
133 u64 snap_id;
134 char *snap_name;
135
136 struct kref kref;
137};
138
104struct rbd_options { 139struct rbd_options {
105 bool read_only; 140 bool read_only;
106}; 141};
@@ -155,11 +190,8 @@ struct rbd_snap {
155}; 190};
156 191
157struct rbd_mapping { 192struct rbd_mapping {
158 char *snap_name;
159 u64 snap_id;
160 u64 size; 193 u64 size;
161 u64 features; 194 u64 features;
162 bool snap_exists;
163 bool read_only; 195 bool read_only;
164}; 196};
165 197
@@ -173,7 +205,6 @@ struct rbd_device {
173 struct gendisk *disk; /* blkdev's gendisk and rq */ 205 struct gendisk *disk; /* blkdev's gendisk and rq */
174 206
175 u32 image_format; /* Either 1 or 2 */ 207 u32 image_format; /* Either 1 or 2 */
176 struct rbd_options rbd_opts;
177 struct rbd_client *rbd_client; 208 struct rbd_client *rbd_client;
178 209
179 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 210 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
@@ -181,17 +212,17 @@ struct rbd_device {
181 spinlock_t lock; /* queue lock */ 212 spinlock_t lock; /* queue lock */
182 213
183 struct rbd_image_header header; 214 struct rbd_image_header header;
184 char *image_id; 215 bool exists;
185 size_t image_id_len; 216 struct rbd_spec *spec;
186 char *image_name; 217
187 size_t image_name_len;
188 char *header_name; 218 char *header_name;
189 char *pool_name;
190 int pool_id;
191 219
192 struct ceph_osd_event *watch_event; 220 struct ceph_osd_event *watch_event;
193 struct ceph_osd_request *watch_request; 221 struct ceph_osd_request *watch_request;
194 222
223 struct rbd_spec *parent_spec;
224 u64 parent_overlap;
225
195 /* protects updating the header */ 226 /* protects updating the header */
196 struct rw_semaphore header_rwsem; 227 struct rw_semaphore header_rwsem;
197 228
@@ -204,6 +235,7 @@ struct rbd_device {
204 235
205 /* sysfs related */ 236 /* sysfs related */
206 struct device dev; 237 struct device dev;
238 unsigned long open_count;
207}; 239};
208 240
209static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ 241static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
@@ -218,7 +250,7 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
218static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); 250static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
219 251
220static void rbd_dev_release(struct device *dev); 252static void rbd_dev_release(struct device *dev);
221static void __rbd_remove_snap_dev(struct rbd_snap *snap); 253static void rbd_remove_snap_dev(struct rbd_snap *snap);
222 254
223static ssize_t rbd_add(struct bus_type *bus, const char *buf, 255static ssize_t rbd_add(struct bus_type *bus, const char *buf,
224 size_t count); 256 size_t count);
@@ -258,17 +290,8 @@ static struct device rbd_root_dev = {
258# define rbd_assert(expr) ((void) 0) 290# define rbd_assert(expr) ((void) 0)
259#endif /* !RBD_DEBUG */ 291#endif /* !RBD_DEBUG */
260 292
261static struct device *rbd_get_dev(struct rbd_device *rbd_dev) 293static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
262{ 294static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
263 return get_device(&rbd_dev->dev);
264}
265
266static void rbd_put_dev(struct rbd_device *rbd_dev)
267{
268 put_device(&rbd_dev->dev);
269}
270
271static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
272 295
273static int rbd_open(struct block_device *bdev, fmode_t mode) 296static int rbd_open(struct block_device *bdev, fmode_t mode)
274{ 297{
@@ -277,8 +300,11 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
277 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only) 300 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
278 return -EROFS; 301 return -EROFS;
279 302
280 rbd_get_dev(rbd_dev); 303 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
304 (void) get_device(&rbd_dev->dev);
281 set_device_ro(bdev, rbd_dev->mapping.read_only); 305 set_device_ro(bdev, rbd_dev->mapping.read_only);
306 rbd_dev->open_count++;
307 mutex_unlock(&ctl_mutex);
282 308
283 return 0; 309 return 0;
284} 310}
@@ -287,7 +313,11 @@ static int rbd_release(struct gendisk *disk, fmode_t mode)
287{ 313{
288 struct rbd_device *rbd_dev = disk->private_data; 314 struct rbd_device *rbd_dev = disk->private_data;
289 315
290 rbd_put_dev(rbd_dev); 316 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
317 rbd_assert(rbd_dev->open_count > 0);
318 rbd_dev->open_count--;
319 put_device(&rbd_dev->dev);
320 mutex_unlock(&ctl_mutex);
291 321
292 return 0; 322 return 0;
293} 323}
@@ -388,7 +418,7 @@ enum {
388static match_table_t rbd_opts_tokens = { 418static match_table_t rbd_opts_tokens = {
389 /* int args above */ 419 /* int args above */
390 /* string args above */ 420 /* string args above */
391 {Opt_read_only, "mapping.read_only"}, 421 {Opt_read_only, "read_only"},
392 {Opt_read_only, "ro"}, /* Alternate spelling */ 422 {Opt_read_only, "ro"}, /* Alternate spelling */
393 {Opt_read_write, "read_write"}, 423 {Opt_read_write, "read_write"},
394 {Opt_read_write, "rw"}, /* Alternate spelling */ 424 {Opt_read_write, "rw"}, /* Alternate spelling */
@@ -441,33 +471,17 @@ static int parse_rbd_opts_token(char *c, void *private)
441 * Get a ceph client with specific addr and configuration, if one does 471 * Get a ceph client with specific addr and configuration, if one does
442 * not exist create it. 472 * not exist create it.
443 */ 473 */
444static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, 474static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
445 size_t mon_addr_len, char *options)
446{ 475{
447 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
448 struct ceph_options *ceph_opts;
449 struct rbd_client *rbdc; 476 struct rbd_client *rbdc;
450 477
451 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
452
453 ceph_opts = ceph_parse_options(options, mon_addr,
454 mon_addr + mon_addr_len,
455 parse_rbd_opts_token, rbd_opts);
456 if (IS_ERR(ceph_opts))
457 return PTR_ERR(ceph_opts);
458
459 rbdc = rbd_client_find(ceph_opts); 478 rbdc = rbd_client_find(ceph_opts);
460 if (rbdc) { 479 if (rbdc) /* using an existing client */
461 /* using an existing client */
462 ceph_destroy_options(ceph_opts); 480 ceph_destroy_options(ceph_opts);
463 } else { 481 else
464 rbdc = rbd_client_create(ceph_opts); 482 rbdc = rbd_client_create(ceph_opts);
465 if (IS_ERR(rbdc))
466 return PTR_ERR(rbdc);
467 }
468 rbd_dev->rbd_client = rbdc;
469 483
470 return 0; 484 return rbdc;
471} 485}
472 486
473/* 487/*
@@ -492,10 +506,10 @@ static void rbd_client_release(struct kref *kref)
492 * Drop reference to ceph client node. If it's not referenced anymore, release 506 * Drop reference to ceph client node. If it's not referenced anymore, release
493 * it. 507 * it.
494 */ 508 */
495static void rbd_put_client(struct rbd_device *rbd_dev) 509static void rbd_put_client(struct rbd_client *rbdc)
496{ 510{
497 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release); 511 if (rbdc)
498 rbd_dev->rbd_client = NULL; 512 kref_put(&rbdc->kref, rbd_client_release);
499} 513}
500 514
501/* 515/*
@@ -524,6 +538,16 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
524 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) 538 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
525 return false; 539 return false;
526 540
541 /* The bio layer requires at least sector-sized I/O */
542
543 if (ondisk->options.order < SECTOR_SHIFT)
544 return false;
545
546 /* If we use u64 in a few spots we may be able to loosen this */
547
548 if (ondisk->options.order > 8 * sizeof (int) - 1)
549 return false;
550
527 /* 551 /*
528 * The size of a snapshot header has to fit in a size_t, and 552 * The size of a snapshot header has to fit in a size_t, and
529 * that limits the number of snapshots. 553 * that limits the number of snapshots.
@@ -635,6 +659,20 @@ out_err:
635 return -ENOMEM; 659 return -ENOMEM;
636} 660}
637 661
662static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
663{
664 struct rbd_snap *snap;
665
666 if (snap_id == CEPH_NOSNAP)
667 return RBD_SNAP_HEAD_NAME;
668
669 list_for_each_entry(snap, &rbd_dev->snaps, node)
670 if (snap_id == snap->id)
671 return snap->name;
672
673 return NULL;
674}
675
638static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name) 676static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
639{ 677{
640 678
@@ -642,7 +680,7 @@ static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
642 680
643 list_for_each_entry(snap, &rbd_dev->snaps, node) { 681 list_for_each_entry(snap, &rbd_dev->snaps, node) {
644 if (!strcmp(snap_name, snap->name)) { 682 if (!strcmp(snap_name, snap->name)) {
645 rbd_dev->mapping.snap_id = snap->id; 683 rbd_dev->spec->snap_id = snap->id;
646 rbd_dev->mapping.size = snap->size; 684 rbd_dev->mapping.size = snap->size;
647 rbd_dev->mapping.features = snap->features; 685 rbd_dev->mapping.features = snap->features;
648 686
@@ -653,26 +691,23 @@ static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
653 return -ENOENT; 691 return -ENOENT;
654} 692}
655 693
656static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name) 694static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
657{ 695{
658 int ret; 696 int ret;
659 697
660 if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME, 698 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
661 sizeof (RBD_SNAP_HEAD_NAME))) { 699 sizeof (RBD_SNAP_HEAD_NAME))) {
662 rbd_dev->mapping.snap_id = CEPH_NOSNAP; 700 rbd_dev->spec->snap_id = CEPH_NOSNAP;
663 rbd_dev->mapping.size = rbd_dev->header.image_size; 701 rbd_dev->mapping.size = rbd_dev->header.image_size;
664 rbd_dev->mapping.features = rbd_dev->header.features; 702 rbd_dev->mapping.features = rbd_dev->header.features;
665 rbd_dev->mapping.snap_exists = false;
666 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
667 ret = 0; 703 ret = 0;
668 } else { 704 } else {
669 ret = snap_by_name(rbd_dev, snap_name); 705 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
670 if (ret < 0) 706 if (ret < 0)
671 goto done; 707 goto done;
672 rbd_dev->mapping.snap_exists = true;
673 rbd_dev->mapping.read_only = true; 708 rbd_dev->mapping.read_only = true;
674 } 709 }
675 rbd_dev->mapping.snap_name = snap_name; 710 rbd_dev->exists = true;
676done: 711done:
677 return ret; 712 return ret;
678} 713}
@@ -695,13 +730,13 @@ static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
695 u64 segment; 730 u64 segment;
696 int ret; 731 int ret;
697 732
698 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO); 733 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
699 if (!name) 734 if (!name)
700 return NULL; 735 return NULL;
701 segment = offset >> rbd_dev->header.obj_order; 736 segment = offset >> rbd_dev->header.obj_order;
702 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx", 737 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
703 rbd_dev->header.object_prefix, segment); 738 rbd_dev->header.object_prefix, segment);
704 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) { 739 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
705 pr_err("error formatting segment name for #%llu (%d)\n", 740 pr_err("error formatting segment name for #%llu (%d)\n",
706 segment, ret); 741 segment, ret);
707 kfree(name); 742 kfree(name);
@@ -800,77 +835,144 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
800} 835}
801 836
802/* 837/*
803 * bio_chain_clone - clone a chain of bios up to a certain length. 838 * Clone a portion of a bio, starting at the given byte offset
804 * might return a bio_pair that will need to be released. 839 * and continuing for the number of bytes indicated.
805 */ 840 */
806static struct bio *bio_chain_clone(struct bio **old, struct bio **next, 841static struct bio *bio_clone_range(struct bio *bio_src,
807 struct bio_pair **bp, 842 unsigned int offset,
808 int len, gfp_t gfpmask) 843 unsigned int len,
809{ 844 gfp_t gfpmask)
810 struct bio *old_chain = *old; 845{
811 struct bio *new_chain = NULL; 846 struct bio_vec *bv;
812 struct bio *tail; 847 unsigned int resid;
813 int total = 0; 848 unsigned short idx;
814 849 unsigned int voff;
815 if (*bp) { 850 unsigned short end_idx;
816 bio_pair_release(*bp); 851 unsigned short vcnt;
817 *bp = NULL; 852 struct bio *bio;
818 }
819 853
820 while (old_chain && (total < len)) { 854 /* Handle the easy case for the caller */
821 struct bio *tmp;
822 855
823 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs); 856 if (!offset && len == bio_src->bi_size)
824 if (!tmp) 857 return bio_clone(bio_src, gfpmask);
825 goto err_out;
826 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
827 858
828 if (total + old_chain->bi_size > len) { 859 if (WARN_ON_ONCE(!len))
829 struct bio_pair *bp; 860 return NULL;
861 if (WARN_ON_ONCE(len > bio_src->bi_size))
862 return NULL;
863 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
864 return NULL;
830 865
831 /* 866 /* Find first affected segment... */
832 * this split can only happen with a single paged bio,
833 * split_bio will BUG_ON if this is not the case
834 */
835 dout("bio_chain_clone split! total=%d remaining=%d"
836 "bi_size=%u\n",
837 total, len - total, old_chain->bi_size);
838 867
839 /* split the bio. We'll release it either in the next 868 resid = offset;
840 call, or it will have to be released outside */ 869 __bio_for_each_segment(bv, bio_src, idx, 0) {
841 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE); 870 if (resid < bv->bv_len)
842 if (!bp) 871 break;
843 goto err_out; 872 resid -= bv->bv_len;
873 }
874 voff = resid;
844 875
845 __bio_clone(tmp, &bp->bio1); 876 /* ...and the last affected segment */
846 877
847 *next = &bp->bio2; 878 resid += len;
848 } else { 879 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
849 __bio_clone(tmp, old_chain); 880 if (resid <= bv->bv_len)
850 *next = old_chain->bi_next; 881 break;
851 } 882 resid -= bv->bv_len;
883 }
884 vcnt = end_idx - idx + 1;
885
886 /* Build the clone */
852 887
853 tmp->bi_bdev = NULL; 888 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
854 tmp->bi_next = NULL; 889 if (!bio)
855 if (new_chain) 890 return NULL; /* ENOMEM */
856 tail->bi_next = tmp;
857 else
858 new_chain = tmp;
859 tail = tmp;
860 old_chain = old_chain->bi_next;
861 891
862 total += tmp->bi_size; 892 bio->bi_bdev = bio_src->bi_bdev;
893 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
894 bio->bi_rw = bio_src->bi_rw;
895 bio->bi_flags |= 1 << BIO_CLONED;
896
897 /*
898 * Copy over our part of the bio_vec, then update the first
899 * and last (or only) entries.
900 */
901 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
902 vcnt * sizeof (struct bio_vec));
903 bio->bi_io_vec[0].bv_offset += voff;
904 if (vcnt > 1) {
905 bio->bi_io_vec[0].bv_len -= voff;
906 bio->bi_io_vec[vcnt - 1].bv_len = resid;
907 } else {
908 bio->bi_io_vec[0].bv_len = len;
863 } 909 }
864 910
865 rbd_assert(total == len); 911 bio->bi_vcnt = vcnt;
912 bio->bi_size = len;
913 bio->bi_idx = 0;
914
915 return bio;
916}
917
918/*
919 * Clone a portion of a bio chain, starting at the given byte offset
920 * into the first bio in the source chain and continuing for the
921 * number of bytes indicated. The result is another bio chain of
922 * exactly the given length, or a null pointer on error.
923 *
924 * The bio_src and offset parameters are both in-out. On entry they
925 * refer to the first source bio and the offset into that bio where
926 * the start of data to be cloned is located.
927 *
928 * On return, bio_src is updated to refer to the bio in the source
929 * chain that contains first un-cloned byte, and *offset will
930 * contain the offset of that byte within that bio.
931 */
932static struct bio *bio_chain_clone_range(struct bio **bio_src,
933 unsigned int *offset,
934 unsigned int len,
935 gfp_t gfpmask)
936{
937 struct bio *bi = *bio_src;
938 unsigned int off = *offset;
939 struct bio *chain = NULL;
940 struct bio **end;
941
942 /* Build up a chain of clone bios up to the limit */
943
944 if (!bi || off >= bi->bi_size || !len)
945 return NULL; /* Nothing to clone */
866 946
867 *old = old_chain; 947 end = &chain;
948 while (len) {
949 unsigned int bi_size;
950 struct bio *bio;
951
952 if (!bi)
953 goto out_err; /* EINVAL; ran out of bio's */
954 bi_size = min_t(unsigned int, bi->bi_size - off, len);
955 bio = bio_clone_range(bi, off, bi_size, gfpmask);
956 if (!bio)
957 goto out_err; /* ENOMEM */
958
959 *end = bio;
960 end = &bio->bi_next;
961
962 off += bi_size;
963 if (off == bi->bi_size) {
964 bi = bi->bi_next;
965 off = 0;
966 }
967 len -= bi_size;
968 }
969 *bio_src = bi;
970 *offset = off;
868 971
869 return new_chain; 972 return chain;
973out_err:
974 bio_chain_put(chain);
870 975
871err_out:
872 dout("bio_chain_clone with err\n");
873 bio_chain_put(new_chain);
874 return NULL; 976 return NULL;
875} 977}
876 978
@@ -988,8 +1090,9 @@ static int rbd_do_request(struct request *rq,
988 req_data->coll_index = coll_index; 1090 req_data->coll_index = coll_index;
989 } 1091 }
990 1092
991 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name, 1093 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
992 (unsigned long long) ofs, (unsigned long long) len); 1094 object_name, (unsigned long long) ofs,
1095 (unsigned long long) len, coll, coll_index);
993 1096
994 osdc = &rbd_dev->rbd_client->client->osdc; 1097 osdc = &rbd_dev->rbd_client->client->osdc;
995 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops, 1098 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
@@ -1019,7 +1122,7 @@ static int rbd_do_request(struct request *rq,
1019 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 1122 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1020 layout->fl_stripe_count = cpu_to_le32(1); 1123 layout->fl_stripe_count = cpu_to_le32(1);
1021 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 1124 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1022 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id); 1125 layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
1023 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno, 1126 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1024 req, ops); 1127 req, ops);
1025 rbd_assert(ret == 0); 1128 rbd_assert(ret == 0);
@@ -1154,8 +1257,6 @@ done:
1154static int rbd_do_op(struct request *rq, 1257static int rbd_do_op(struct request *rq,
1155 struct rbd_device *rbd_dev, 1258 struct rbd_device *rbd_dev,
1156 struct ceph_snap_context *snapc, 1259 struct ceph_snap_context *snapc,
1157 u64 snapid,
1158 int opcode, int flags,
1159 u64 ofs, u64 len, 1260 u64 ofs, u64 len,
1160 struct bio *bio, 1261 struct bio *bio,
1161 struct rbd_req_coll *coll, 1262 struct rbd_req_coll *coll,
@@ -1167,6 +1268,9 @@ static int rbd_do_op(struct request *rq,
1167 int ret; 1268 int ret;
1168 struct ceph_osd_req_op *ops; 1269 struct ceph_osd_req_op *ops;
1169 u32 payload_len; 1270 u32 payload_len;
1271 int opcode;
1272 int flags;
1273 u64 snapid;
1170 1274
1171 seg_name = rbd_segment_name(rbd_dev, ofs); 1275 seg_name = rbd_segment_name(rbd_dev, ofs);
1172 if (!seg_name) 1276 if (!seg_name)
@@ -1174,7 +1278,18 @@ static int rbd_do_op(struct request *rq,
1174 seg_len = rbd_segment_length(rbd_dev, ofs, len); 1278 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1175 seg_ofs = rbd_segment_offset(rbd_dev, ofs); 1279 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1176 1280
1177 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0); 1281 if (rq_data_dir(rq) == WRITE) {
1282 opcode = CEPH_OSD_OP_WRITE;
1283 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1284 snapid = CEPH_NOSNAP;
1285 payload_len = seg_len;
1286 } else {
1287 opcode = CEPH_OSD_OP_READ;
1288 flags = CEPH_OSD_FLAG_READ;
1289 snapc = NULL;
1290 snapid = rbd_dev->spec->snap_id;
1291 payload_len = 0;
1292 }
1178 1293
1179 ret = -ENOMEM; 1294 ret = -ENOMEM;
1180 ops = rbd_create_rw_ops(1, opcode, payload_len); 1295 ops = rbd_create_rw_ops(1, opcode, payload_len);
@@ -1202,41 +1317,6 @@ done:
1202} 1317}
1203 1318
1204/* 1319/*
1205 * Request async osd write
1206 */
1207static int rbd_req_write(struct request *rq,
1208 struct rbd_device *rbd_dev,
1209 struct ceph_snap_context *snapc,
1210 u64 ofs, u64 len,
1211 struct bio *bio,
1212 struct rbd_req_coll *coll,
1213 int coll_index)
1214{
1215 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1216 CEPH_OSD_OP_WRITE,
1217 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1218 ofs, len, bio, coll, coll_index);
1219}
1220
1221/*
1222 * Request async osd read
1223 */
1224static int rbd_req_read(struct request *rq,
1225 struct rbd_device *rbd_dev,
1226 u64 snapid,
1227 u64 ofs, u64 len,
1228 struct bio *bio,
1229 struct rbd_req_coll *coll,
1230 int coll_index)
1231{
1232 return rbd_do_op(rq, rbd_dev, NULL,
1233 snapid,
1234 CEPH_OSD_OP_READ,
1235 CEPH_OSD_FLAG_READ,
1236 ofs, len, bio, coll, coll_index);
1237}
1238
1239/*
1240 * Request sync osd read 1320 * Request sync osd read
1241 */ 1321 */
1242static int rbd_req_sync_read(struct rbd_device *rbd_dev, 1322static int rbd_req_sync_read(struct rbd_device *rbd_dev,
@@ -1304,7 +1384,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1304 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n", 1384 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1305 rbd_dev->header_name, (unsigned long long) notify_id, 1385 rbd_dev->header_name, (unsigned long long) notify_id,
1306 (unsigned int) opcode); 1386 (unsigned int) opcode);
1307 rc = rbd_refresh_header(rbd_dev, &hver); 1387 rc = rbd_dev_refresh(rbd_dev, &hver);
1308 if (rc) 1388 if (rc)
1309 pr_warning(RBD_DRV_NAME "%d got notification but failed to " 1389 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1310 " update snaps: %d\n", rbd_dev->major, rc); 1390 " update snaps: %d\n", rbd_dev->major, rc);
@@ -1460,18 +1540,16 @@ static void rbd_rq_fn(struct request_queue *q)
1460{ 1540{
1461 struct rbd_device *rbd_dev = q->queuedata; 1541 struct rbd_device *rbd_dev = q->queuedata;
1462 struct request *rq; 1542 struct request *rq;
1463 struct bio_pair *bp = NULL;
1464 1543
1465 while ((rq = blk_fetch_request(q))) { 1544 while ((rq = blk_fetch_request(q))) {
1466 struct bio *bio; 1545 struct bio *bio;
1467 struct bio *rq_bio, *next_bio = NULL;
1468 bool do_write; 1546 bool do_write;
1469 unsigned int size; 1547 unsigned int size;
1470 u64 op_size = 0;
1471 u64 ofs; 1548 u64 ofs;
1472 int num_segs, cur_seg = 0; 1549 int num_segs, cur_seg = 0;
1473 struct rbd_req_coll *coll; 1550 struct rbd_req_coll *coll;
1474 struct ceph_snap_context *snapc; 1551 struct ceph_snap_context *snapc;
1552 unsigned int bio_offset;
1475 1553
1476 dout("fetched request\n"); 1554 dout("fetched request\n");
1477 1555
@@ -1483,10 +1561,6 @@ static void rbd_rq_fn(struct request_queue *q)
1483 1561
1484 /* deduce our operation (read, write) */ 1562 /* deduce our operation (read, write) */
1485 do_write = (rq_data_dir(rq) == WRITE); 1563 do_write = (rq_data_dir(rq) == WRITE);
1486
1487 size = blk_rq_bytes(rq);
1488 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1489 rq_bio = rq->bio;
1490 if (do_write && rbd_dev->mapping.read_only) { 1564 if (do_write && rbd_dev->mapping.read_only) {
1491 __blk_end_request_all(rq, -EROFS); 1565 __blk_end_request_all(rq, -EROFS);
1492 continue; 1566 continue;
@@ -1496,8 +1570,8 @@ static void rbd_rq_fn(struct request_queue *q)
1496 1570
1497 down_read(&rbd_dev->header_rwsem); 1571 down_read(&rbd_dev->header_rwsem);
1498 1572
1499 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP && 1573 if (!rbd_dev->exists) {
1500 !rbd_dev->mapping.snap_exists) { 1574 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1501 up_read(&rbd_dev->header_rwsem); 1575 up_read(&rbd_dev->header_rwsem);
1502 dout("request for non-existent snapshot"); 1576 dout("request for non-existent snapshot");
1503 spin_lock_irq(q->queue_lock); 1577 spin_lock_irq(q->queue_lock);
@@ -1509,6 +1583,10 @@ static void rbd_rq_fn(struct request_queue *q)
1509 1583
1510 up_read(&rbd_dev->header_rwsem); 1584 up_read(&rbd_dev->header_rwsem);
1511 1585
1586 size = blk_rq_bytes(rq);
1587 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1588 bio = rq->bio;
1589
1512 dout("%s 0x%x bytes at 0x%llx\n", 1590 dout("%s 0x%x bytes at 0x%llx\n",
1513 do_write ? "write" : "read", 1591 do_write ? "write" : "read",
1514 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE); 1592 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
@@ -1528,45 +1606,37 @@ static void rbd_rq_fn(struct request_queue *q)
1528 continue; 1606 continue;
1529 } 1607 }
1530 1608
1609 bio_offset = 0;
1531 do { 1610 do {
1532 /* a bio clone to be passed down to OSD req */ 1611 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1612 unsigned int chain_size;
1613 struct bio *bio_chain;
1614
1615 BUG_ON(limit > (u64) UINT_MAX);
1616 chain_size = (unsigned int) limit;
1533 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt); 1617 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1534 op_size = rbd_segment_length(rbd_dev, ofs, size); 1618
1535 kref_get(&coll->kref); 1619 kref_get(&coll->kref);
1536 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1537 op_size, GFP_ATOMIC);
1538 if (!bio) {
1539 rbd_coll_end_req_index(rq, coll, cur_seg,
1540 -ENOMEM, op_size);
1541 goto next_seg;
1542 }
1543 1620
1621 /* Pass a cloned bio chain via an osd request */
1544 1622
1545 /* init OSD command: write or read */ 1623 bio_chain = bio_chain_clone_range(&bio,
1546 if (do_write) 1624 &bio_offset, chain_size,
1547 rbd_req_write(rq, rbd_dev, 1625 GFP_ATOMIC);
1548 snapc, 1626 if (bio_chain)
1549 ofs, 1627 (void) rbd_do_op(rq, rbd_dev, snapc,
1550 op_size, bio, 1628 ofs, chain_size,
1551 coll, cur_seg); 1629 bio_chain, coll, cur_seg);
1552 else 1630 else
1553 rbd_req_read(rq, rbd_dev, 1631 rbd_coll_end_req_index(rq, coll, cur_seg,
1554 rbd_dev->mapping.snap_id, 1632 -ENOMEM, chain_size);
1555 ofs, 1633 size -= chain_size;
1556 op_size, bio, 1634 ofs += chain_size;
1557 coll, cur_seg);
1558
1559next_seg:
1560 size -= op_size;
1561 ofs += op_size;
1562 1635
1563 cur_seg++; 1636 cur_seg++;
1564 rq_bio = next_bio;
1565 } while (size > 0); 1637 } while (size > 0);
1566 kref_put(&coll->kref, rbd_coll_release); 1638 kref_put(&coll->kref, rbd_coll_release);
1567 1639
1568 if (bp)
1569 bio_pair_release(bp);
1570 spin_lock_irq(q->queue_lock); 1640 spin_lock_irq(q->queue_lock);
1571 1641
1572 ceph_put_snap_context(snapc); 1642 ceph_put_snap_context(snapc);
@@ -1576,28 +1646,47 @@ next_seg:
1576/* 1646/*
1577 * a queue callback. Makes sure that we don't create a bio that spans across 1647 * a queue callback. Makes sure that we don't create a bio that spans across
1578 * multiple osd objects. One exception would be with a single page bios, 1648 * multiple osd objects. One exception would be with a single page bios,
1579 * which we handle later at bio_chain_clone 1649 * which we handle later at bio_chain_clone_range()
1580 */ 1650 */
1581static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, 1651static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1582 struct bio_vec *bvec) 1652 struct bio_vec *bvec)
1583{ 1653{
1584 struct rbd_device *rbd_dev = q->queuedata; 1654 struct rbd_device *rbd_dev = q->queuedata;
1585 unsigned int chunk_sectors; 1655 sector_t sector_offset;
1586 sector_t sector; 1656 sector_t sectors_per_obj;
1587 unsigned int bio_sectors; 1657 sector_t obj_sector_offset;
1588 int max; 1658 int ret;
1589 1659
1590 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT); 1660 /*
1591 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev); 1661 * Find how far into its rbd object the partition-relative
1592 bio_sectors = bmd->bi_size >> SECTOR_SHIFT; 1662 * bio start sector is to offset relative to the enclosing
1663 * device.
1664 */
1665 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1666 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1667 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1668
1669 /*
1670 * Compute the number of bytes from that offset to the end
1671 * of the object. Account for what's already used by the bio.
1672 */
1673 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1674 if (ret > bmd->bi_size)
1675 ret -= bmd->bi_size;
1676 else
1677 ret = 0;
1593 1678
1594 max = (chunk_sectors - ((sector & (chunk_sectors - 1)) 1679 /*
1595 + bio_sectors)) << SECTOR_SHIFT; 1680 * Don't send back more than was asked for. And if the bio
1596 if (max < 0) 1681 * was empty, let the whole thing through because: "Note
1597 max = 0; /* bio_add cannot handle a negative return */ 1682 * that a block device *must* allow a single page to be
1598 if (max <= bvec->bv_len && bio_sectors == 0) 1683 * added to an empty bio."
1599 return bvec->bv_len; 1684 */
1600 return max; 1685 rbd_assert(bvec->bv_len <= PAGE_SIZE);
1686 if (ret > (int) bvec->bv_len || !bmd->bi_size)
1687 ret = (int) bvec->bv_len;
1688
1689 return ret;
1601} 1690}
1602 1691
1603static void rbd_free_disk(struct rbd_device *rbd_dev) 1692static void rbd_free_disk(struct rbd_device *rbd_dev)
@@ -1663,13 +1752,13 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1663 ret = -ENXIO; 1752 ret = -ENXIO;
1664 pr_warning("short header read for image %s" 1753 pr_warning("short header read for image %s"
1665 " (want %zd got %d)\n", 1754 " (want %zd got %d)\n",
1666 rbd_dev->image_name, size, ret); 1755 rbd_dev->spec->image_name, size, ret);
1667 goto out_err; 1756 goto out_err;
1668 } 1757 }
1669 if (!rbd_dev_ondisk_valid(ondisk)) { 1758 if (!rbd_dev_ondisk_valid(ondisk)) {
1670 ret = -ENXIO; 1759 ret = -ENXIO;
1671 pr_warning("invalid header for image %s\n", 1760 pr_warning("invalid header for image %s\n",
1672 rbd_dev->image_name); 1761 rbd_dev->spec->image_name);
1673 goto out_err; 1762 goto out_err;
1674 } 1763 }
1675 1764
@@ -1707,19 +1796,32 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
1707 return ret; 1796 return ret;
1708} 1797}
1709 1798
1710static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev) 1799static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1711{ 1800{
1712 struct rbd_snap *snap; 1801 struct rbd_snap *snap;
1713 struct rbd_snap *next; 1802 struct rbd_snap *next;
1714 1803
1715 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) 1804 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1716 __rbd_remove_snap_dev(snap); 1805 rbd_remove_snap_dev(snap);
1806}
1807
1808static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1809{
1810 sector_t size;
1811
1812 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
1813 return;
1814
1815 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1816 dout("setting size to %llu sectors", (unsigned long long) size);
1817 rbd_dev->mapping.size = (u64) size;
1818 set_capacity(rbd_dev->disk, size);
1717} 1819}
1718 1820
1719/* 1821/*
1720 * only read the first part of the ondisk header, without the snaps info 1822 * only read the first part of the ondisk header, without the snaps info
1721 */ 1823 */
1722static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver) 1824static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1723{ 1825{
1724 int ret; 1826 int ret;
1725 struct rbd_image_header h; 1827 struct rbd_image_header h;
@@ -1730,17 +1832,9 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1730 1832
1731 down_write(&rbd_dev->header_rwsem); 1833 down_write(&rbd_dev->header_rwsem);
1732 1834
1733 /* resized? */ 1835 /* Update image size, and check for resize of mapped image */
1734 if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) { 1836 rbd_dev->header.image_size = h.image_size;
1735 sector_t size = (sector_t) h.image_size / SECTOR_SIZE; 1837 rbd_update_mapping_size(rbd_dev);
1736
1737 if (size != (sector_t) rbd_dev->mapping.size) {
1738 dout("setting size to %llu sectors",
1739 (unsigned long long) size);
1740 rbd_dev->mapping.size = (u64) size;
1741 set_capacity(rbd_dev->disk, size);
1742 }
1743 }
1744 1838
1745 /* rbd_dev->header.object_prefix shouldn't change */ 1839 /* rbd_dev->header.object_prefix shouldn't change */
1746 kfree(rbd_dev->header.snap_sizes); 1840 kfree(rbd_dev->header.snap_sizes);
@@ -1768,12 +1862,16 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1768 return ret; 1862 return ret;
1769} 1863}
1770 1864
1771static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver) 1865static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1772{ 1866{
1773 int ret; 1867 int ret;
1774 1868
1869 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1775 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1870 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1776 ret = __rbd_refresh_header(rbd_dev, hver); 1871 if (rbd_dev->image_format == 1)
1872 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1873 else
1874 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1777 mutex_unlock(&ctl_mutex); 1875 mutex_unlock(&ctl_mutex);
1778 1876
1779 return ret; 1877 return ret;
@@ -1885,7 +1983,7 @@ static ssize_t rbd_pool_show(struct device *dev,
1885{ 1983{
1886 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1984 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1887 1985
1888 return sprintf(buf, "%s\n", rbd_dev->pool_name); 1986 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
1889} 1987}
1890 1988
1891static ssize_t rbd_pool_id_show(struct device *dev, 1989static ssize_t rbd_pool_id_show(struct device *dev,
@@ -1893,7 +1991,8 @@ static ssize_t rbd_pool_id_show(struct device *dev,
1893{ 1991{
1894 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1992 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1895 1993
1896 return sprintf(buf, "%d\n", rbd_dev->pool_id); 1994 return sprintf(buf, "%llu\n",
1995 (unsigned long long) rbd_dev->spec->pool_id);
1897} 1996}
1898 1997
1899static ssize_t rbd_name_show(struct device *dev, 1998static ssize_t rbd_name_show(struct device *dev,
@@ -1901,7 +2000,10 @@ static ssize_t rbd_name_show(struct device *dev,
1901{ 2000{
1902 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2001 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1903 2002
1904 return sprintf(buf, "%s\n", rbd_dev->image_name); 2003 if (rbd_dev->spec->image_name)
2004 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2005
2006 return sprintf(buf, "(unknown)\n");
1905} 2007}
1906 2008
1907static ssize_t rbd_image_id_show(struct device *dev, 2009static ssize_t rbd_image_id_show(struct device *dev,
@@ -1909,7 +2011,7 @@ static ssize_t rbd_image_id_show(struct device *dev,
1909{ 2011{
1910 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2012 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1911 2013
1912 return sprintf(buf, "%s\n", rbd_dev->image_id); 2014 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
1913} 2015}
1914 2016
1915/* 2017/*
@@ -1922,7 +2024,50 @@ static ssize_t rbd_snap_show(struct device *dev,
1922{ 2024{
1923 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2025 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1924 2026
1925 return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name); 2027 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2028}
2029
2030/*
2031 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2032 * for the parent image. If there is no parent, simply shows
2033 * "(no parent image)".
2034 */
2035static ssize_t rbd_parent_show(struct device *dev,
2036 struct device_attribute *attr,
2037 char *buf)
2038{
2039 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2040 struct rbd_spec *spec = rbd_dev->parent_spec;
2041 int count;
2042 char *bufp = buf;
2043
2044 if (!spec)
2045 return sprintf(buf, "(no parent image)\n");
2046
2047 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2048 (unsigned long long) spec->pool_id, spec->pool_name);
2049 if (count < 0)
2050 return count;
2051 bufp += count;
2052
2053 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2054 spec->image_name ? spec->image_name : "(unknown)");
2055 if (count < 0)
2056 return count;
2057 bufp += count;
2058
2059 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2060 (unsigned long long) spec->snap_id, spec->snap_name);
2061 if (count < 0)
2062 return count;
2063 bufp += count;
2064
2065 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2066 if (count < 0)
2067 return count;
2068 bufp += count;
2069
2070 return (ssize_t) (bufp - buf);
1926} 2071}
1927 2072
1928static ssize_t rbd_image_refresh(struct device *dev, 2073static ssize_t rbd_image_refresh(struct device *dev,
@@ -1933,7 +2078,7 @@ static ssize_t rbd_image_refresh(struct device *dev,
1933 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2078 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1934 int ret; 2079 int ret;
1935 2080
1936 ret = rbd_refresh_header(rbd_dev, NULL); 2081 ret = rbd_dev_refresh(rbd_dev, NULL);
1937 2082
1938 return ret < 0 ? ret : size; 2083 return ret < 0 ? ret : size;
1939} 2084}
@@ -1948,6 +2093,7 @@ static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1948static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); 2093static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1949static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); 2094static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1950static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); 2095static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2096static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
1951 2097
1952static struct attribute *rbd_attrs[] = { 2098static struct attribute *rbd_attrs[] = {
1953 &dev_attr_size.attr, 2099 &dev_attr_size.attr,
@@ -1959,6 +2105,7 @@ static struct attribute *rbd_attrs[] = {
1959 &dev_attr_name.attr, 2105 &dev_attr_name.attr,
1960 &dev_attr_image_id.attr, 2106 &dev_attr_image_id.attr,
1961 &dev_attr_current_snap.attr, 2107 &dev_attr_current_snap.attr,
2108 &dev_attr_parent.attr,
1962 &dev_attr_refresh.attr, 2109 &dev_attr_refresh.attr,
1963 NULL 2110 NULL
1964}; 2111};
@@ -2047,6 +2194,74 @@ static struct device_type rbd_snap_device_type = {
2047 .release = rbd_snap_dev_release, 2194 .release = rbd_snap_dev_release,
2048}; 2195};
2049 2196
2197static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2198{
2199 kref_get(&spec->kref);
2200
2201 return spec;
2202}
2203
2204static void rbd_spec_free(struct kref *kref);
2205static void rbd_spec_put(struct rbd_spec *spec)
2206{
2207 if (spec)
2208 kref_put(&spec->kref, rbd_spec_free);
2209}
2210
2211static struct rbd_spec *rbd_spec_alloc(void)
2212{
2213 struct rbd_spec *spec;
2214
2215 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2216 if (!spec)
2217 return NULL;
2218 kref_init(&spec->kref);
2219
2220 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2221
2222 return spec;
2223}
2224
2225static void rbd_spec_free(struct kref *kref)
2226{
2227 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2228
2229 kfree(spec->pool_name);
2230 kfree(spec->image_id);
2231 kfree(spec->image_name);
2232 kfree(spec->snap_name);
2233 kfree(spec);
2234}
2235
2236struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2237 struct rbd_spec *spec)
2238{
2239 struct rbd_device *rbd_dev;
2240
2241 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2242 if (!rbd_dev)
2243 return NULL;
2244
2245 spin_lock_init(&rbd_dev->lock);
2246 INIT_LIST_HEAD(&rbd_dev->node);
2247 INIT_LIST_HEAD(&rbd_dev->snaps);
2248 init_rwsem(&rbd_dev->header_rwsem);
2249
2250 rbd_dev->spec = spec;
2251 rbd_dev->rbd_client = rbdc;
2252
2253 return rbd_dev;
2254}
2255
2256static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2257{
2258 rbd_spec_put(rbd_dev->parent_spec);
2259 kfree(rbd_dev->header_name);
2260 rbd_put_client(rbd_dev->rbd_client);
2261 rbd_spec_put(rbd_dev->spec);
2262 kfree(rbd_dev);
2263}
2264
2050static bool rbd_snap_registered(struct rbd_snap *snap) 2265static bool rbd_snap_registered(struct rbd_snap *snap)
2051{ 2266{
2052 bool ret = snap->dev.type == &rbd_snap_device_type; 2267 bool ret = snap->dev.type == &rbd_snap_device_type;
@@ -2057,7 +2272,7 @@ static bool rbd_snap_registered(struct rbd_snap *snap)
2057 return ret; 2272 return ret;
2058} 2273}
2059 2274
2060static void __rbd_remove_snap_dev(struct rbd_snap *snap) 2275static void rbd_remove_snap_dev(struct rbd_snap *snap)
2061{ 2276{
2062 list_del(&snap->node); 2277 list_del(&snap->node);
2063 if (device_is_registered(&snap->dev)) 2278 if (device_is_registered(&snap->dev))
@@ -2073,7 +2288,7 @@ static int rbd_register_snap_dev(struct rbd_snap *snap,
2073 dev->type = &rbd_snap_device_type; 2288 dev->type = &rbd_snap_device_type;
2074 dev->parent = parent; 2289 dev->parent = parent;
2075 dev->release = rbd_snap_dev_release; 2290 dev->release = rbd_snap_dev_release;
2076 dev_set_name(dev, "snap_%s", snap->name); 2291 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2077 dout("%s: registering device for snapshot %s\n", __func__, snap->name); 2292 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2078 2293
2079 ret = device_register(dev); 2294 ret = device_register(dev);
@@ -2189,6 +2404,7 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2189 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2404 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2190 if (ret < 0) 2405 if (ret < 0)
2191 goto out; 2406 goto out;
2407 ret = 0; /* rbd_req_sync_exec() can return positive */
2192 2408
2193 p = reply_buf; 2409 p = reply_buf;
2194 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, 2410 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
@@ -2216,6 +2432,7 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2216 __le64 features; 2432 __le64 features;
2217 __le64 incompat; 2433 __le64 incompat;
2218 } features_buf = { 0 }; 2434 } features_buf = { 0 };
2435 u64 incompat;
2219 int ret; 2436 int ret;
2220 2437
2221 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2438 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
@@ -2226,6 +2443,11 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2226 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2443 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2227 if (ret < 0) 2444 if (ret < 0)
2228 return ret; 2445 return ret;
2446
2447 incompat = le64_to_cpu(features_buf.incompat);
2448 if (incompat & ~RBD_FEATURES_ALL)
2449 return -ENXIO;
2450
2229 *snap_features = le64_to_cpu(features_buf.features); 2451 *snap_features = le64_to_cpu(features_buf.features);
2230 2452
2231 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", 2453 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
@@ -2242,6 +2464,183 @@ static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2242 &rbd_dev->header.features); 2464 &rbd_dev->header.features);
2243} 2465}
2244 2466
2467static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2468{
2469 struct rbd_spec *parent_spec;
2470 size_t size;
2471 void *reply_buf = NULL;
2472 __le64 snapid;
2473 void *p;
2474 void *end;
2475 char *image_id;
2476 u64 overlap;
2477 size_t len = 0;
2478 int ret;
2479
2480 parent_spec = rbd_spec_alloc();
2481 if (!parent_spec)
2482 return -ENOMEM;
2483
2484 size = sizeof (__le64) + /* pool_id */
2485 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2486 sizeof (__le64) + /* snap_id */
2487 sizeof (__le64); /* overlap */
2488 reply_buf = kmalloc(size, GFP_KERNEL);
2489 if (!reply_buf) {
2490 ret = -ENOMEM;
2491 goto out_err;
2492 }
2493
2494 snapid = cpu_to_le64(CEPH_NOSNAP);
2495 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2496 "rbd", "get_parent",
2497 (char *) &snapid, sizeof (snapid),
2498 (char *) reply_buf, size,
2499 CEPH_OSD_FLAG_READ, NULL);
2500 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2501 if (ret < 0)
2502 goto out_err;
2503
2504 ret = -ERANGE;
2505 p = reply_buf;
2506 end = (char *) reply_buf + size;
2507 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2508 if (parent_spec->pool_id == CEPH_NOPOOL)
2509 goto out; /* No parent? No problem. */
2510
2511 image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2512 if (IS_ERR(image_id)) {
2513 ret = PTR_ERR(image_id);
2514 goto out_err;
2515 }
2516 parent_spec->image_id = image_id;
2517 parent_spec->image_id_len = len;
2518 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2519 ceph_decode_64_safe(&p, end, overlap, out_err);
2520
2521 rbd_dev->parent_overlap = overlap;
2522 rbd_dev->parent_spec = parent_spec;
2523 parent_spec = NULL; /* rbd_dev now owns this */
2524out:
2525 ret = 0;
2526out_err:
2527 kfree(reply_buf);
2528 rbd_spec_put(parent_spec);
2529
2530 return ret;
2531}
2532
2533static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2534{
2535 size_t image_id_size;
2536 char *image_id;
2537 void *p;
2538 void *end;
2539 size_t size;
2540 void *reply_buf = NULL;
2541 size_t len = 0;
2542 char *image_name = NULL;
2543 int ret;
2544
2545 rbd_assert(!rbd_dev->spec->image_name);
2546
2547 image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len;
2548 image_id = kmalloc(image_id_size, GFP_KERNEL);
2549 if (!image_id)
2550 return NULL;
2551
2552 p = image_id;
2553 end = (char *) image_id + image_id_size;
2554 ceph_encode_string(&p, end, rbd_dev->spec->image_id,
2555 (u32) rbd_dev->spec->image_id_len);
2556
2557 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2558 reply_buf = kmalloc(size, GFP_KERNEL);
2559 if (!reply_buf)
2560 goto out;
2561
2562 ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
2563 "rbd", "dir_get_name",
2564 image_id, image_id_size,
2565 (char *) reply_buf, size,
2566 CEPH_OSD_FLAG_READ, NULL);
2567 if (ret < 0)
2568 goto out;
2569 p = reply_buf;
2570 end = (char *) reply_buf + size;
2571 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2572 if (IS_ERR(image_name))
2573 image_name = NULL;
2574 else
2575 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2576out:
2577 kfree(reply_buf);
2578 kfree(image_id);
2579
2580 return image_name;
2581}
2582
2583/*
2584 * When a parent image gets probed, we only have the pool, image,
2585 * and snapshot ids but not the names of any of them. This call
2586 * is made later to fill in those names. It has to be done after
2587 * rbd_dev_snaps_update() has completed because some of the
2588 * information (in particular, snapshot name) is not available
2589 * until then.
2590 */
2591static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2592{
2593 struct ceph_osd_client *osdc;
2594 const char *name;
2595 void *reply_buf = NULL;
2596 int ret;
2597
2598 if (rbd_dev->spec->pool_name)
2599 return 0; /* Already have the names */
2600
2601 /* Look up the pool name */
2602
2603 osdc = &rbd_dev->rbd_client->client->osdc;
2604 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
2605 if (!name)
2606 return -EIO; /* pool id too large (>= 2^31) */
2607
2608 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
2609 if (!rbd_dev->spec->pool_name)
2610 return -ENOMEM;
2611
2612 /* Fetch the image name; tolerate failure here */
2613
2614 name = rbd_dev_image_name(rbd_dev);
2615 if (name) {
2616 rbd_dev->spec->image_name_len = strlen(name);
2617 rbd_dev->spec->image_name = (char *) name;
2618 } else {
2619 pr_warning(RBD_DRV_NAME "%d "
2620 "unable to get image name for image id %s\n",
2621 rbd_dev->major, rbd_dev->spec->image_id);
2622 }
2623
2624 /* Look up the snapshot name. */
2625
2626 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
2627 if (!name) {
2628 ret = -EIO;
2629 goto out_err;
2630 }
2631 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
2632 if(!rbd_dev->spec->snap_name)
2633 goto out_err;
2634
2635 return 0;
2636out_err:
2637 kfree(reply_buf);
2638 kfree(rbd_dev->spec->pool_name);
2639 rbd_dev->spec->pool_name = NULL;
2640
2641 return ret;
2642}
2643
2245static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver) 2644static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2246{ 2645{
2247 size_t size; 2646 size_t size;
@@ -2328,7 +2727,6 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2328 int ret; 2727 int ret;
2329 void *p; 2728 void *p;
2330 void *end; 2729 void *end;
2331 size_t snap_name_len;
2332 char *snap_name; 2730 char *snap_name;
2333 2731
2334 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN; 2732 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
@@ -2348,9 +2746,7 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2348 2746
2349 p = reply_buf; 2747 p = reply_buf;
2350 end = (char *) reply_buf + size; 2748 end = (char *) reply_buf + size;
2351 snap_name_len = 0; 2749 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2352 snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2353 GFP_KERNEL);
2354 if (IS_ERR(snap_name)) { 2750 if (IS_ERR(snap_name)) {
2355 ret = PTR_ERR(snap_name); 2751 ret = PTR_ERR(snap_name);
2356 goto out; 2752 goto out;
@@ -2397,6 +2793,41 @@ static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2397 return ERR_PTR(-EINVAL); 2793 return ERR_PTR(-EINVAL);
2398} 2794}
2399 2795
2796static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2797{
2798 int ret;
2799 __u8 obj_order;
2800
2801 down_write(&rbd_dev->header_rwsem);
2802
2803 /* Grab old order first, to see if it changes */
2804
2805 obj_order = rbd_dev->header.obj_order,
2806 ret = rbd_dev_v2_image_size(rbd_dev);
2807 if (ret)
2808 goto out;
2809 if (rbd_dev->header.obj_order != obj_order) {
2810 ret = -EIO;
2811 goto out;
2812 }
2813 rbd_update_mapping_size(rbd_dev);
2814
2815 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2816 dout("rbd_dev_v2_snap_context returned %d\n", ret);
2817 if (ret)
2818 goto out;
2819 ret = rbd_dev_snaps_update(rbd_dev);
2820 dout("rbd_dev_snaps_update returned %d\n", ret);
2821 if (ret)
2822 goto out;
2823 ret = rbd_dev_snaps_register(rbd_dev);
2824 dout("rbd_dev_snaps_register returned %d\n", ret);
2825out:
2826 up_write(&rbd_dev->header_rwsem);
2827
2828 return ret;
2829}
2830
2400/* 2831/*
2401 * Scan the rbd device's current snapshot list and compare it to the 2832 * Scan the rbd device's current snapshot list and compare it to the
2402 * newly-received snapshot context. Remove any existing snapshots 2833 * newly-received snapshot context. Remove any existing snapshots
@@ -2436,12 +2867,12 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2436 2867
2437 /* Existing snapshot not in the new snap context */ 2868 /* Existing snapshot not in the new snap context */
2438 2869
2439 if (rbd_dev->mapping.snap_id == snap->id) 2870 if (rbd_dev->spec->snap_id == snap->id)
2440 rbd_dev->mapping.snap_exists = false; 2871 rbd_dev->exists = false;
2441 __rbd_remove_snap_dev(snap); 2872 rbd_remove_snap_dev(snap);
2442 dout("%ssnap id %llu has been removed\n", 2873 dout("%ssnap id %llu has been removed\n",
2443 rbd_dev->mapping.snap_id == snap->id ? 2874 rbd_dev->spec->snap_id == snap->id ?
2444 "mapped " : "", 2875 "mapped " : "",
2445 (unsigned long long) snap->id); 2876 (unsigned long long) snap->id);
2446 2877
2447 /* Done with this list entry; advance */ 2878 /* Done with this list entry; advance */
@@ -2559,7 +2990,7 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2559 do { 2990 do {
2560 ret = rbd_req_sync_watch(rbd_dev); 2991 ret = rbd_req_sync_watch(rbd_dev);
2561 if (ret == -ERANGE) { 2992 if (ret == -ERANGE) {
2562 rc = rbd_refresh_header(rbd_dev, NULL); 2993 rc = rbd_dev_refresh(rbd_dev, NULL);
2563 if (rc < 0) 2994 if (rc < 0)
2564 return rc; 2995 return rc;
2565 } 2996 }
@@ -2621,8 +3052,8 @@ static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2621 struct rbd_device *rbd_dev; 3052 struct rbd_device *rbd_dev;
2622 3053
2623 rbd_dev = list_entry(tmp, struct rbd_device, node); 3054 rbd_dev = list_entry(tmp, struct rbd_device, node);
2624 if (rbd_id > max_id) 3055 if (rbd_dev->dev_id > max_id)
2625 max_id = rbd_id; 3056 max_id = rbd_dev->dev_id;
2626 } 3057 }
2627 spin_unlock(&rbd_dev_list_lock); 3058 spin_unlock(&rbd_dev_list_lock);
2628 3059
@@ -2722,73 +3153,140 @@ static inline char *dup_token(const char **buf, size_t *lenp)
2722} 3153}
2723 3154
2724/* 3155/*
2725 * This fills in the pool_name, image_name, image_name_len, rbd_dev, 3156 * Parse the options provided for an "rbd add" (i.e., rbd image
2726 * rbd_md_name, and name fields of the given rbd_dev, based on the 3157 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
2727 * list of monitor addresses and other options provided via 3158 * and the data written is passed here via a NUL-terminated buffer.
2728 * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated 3159 * Returns 0 if successful or an error code otherwise.
2729 * copy of the snapshot name to map if successful, or a 3160 *
2730 * pointer-coded error otherwise. 3161 * The information extracted from these options is recorded in
3162 * the other parameters which return dynamically-allocated
3163 * structures:
3164 * ceph_opts
3165 * The address of a pointer that will refer to a ceph options
3166 * structure. Caller must release the returned pointer using
3167 * ceph_destroy_options() when it is no longer needed.
3168 * rbd_opts
3169 * Address of an rbd options pointer. Fully initialized by
3170 * this function; caller must release with kfree().
3171 * spec
3172 * Address of an rbd image specification pointer. Fully
3173 * initialized by this function based on parsed options.
3174 * Caller must release with rbd_spec_put().
2731 * 3175 *
2732 * Note: rbd_dev is assumed to have been initially zero-filled. 3176 * The options passed take this form:
3177 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3178 * where:
3179 * <mon_addrs>
3180 * A comma-separated list of one or more monitor addresses.
3181 * A monitor address is an ip address, optionally followed
3182 * by a port number (separated by a colon).
3183 * I.e.: ip1[:port1][,ip2[:port2]...]
3184 * <options>
3185 * A comma-separated list of ceph and/or rbd options.
3186 * <pool_name>
3187 * The name of the rados pool containing the rbd image.
3188 * <image_name>
3189 * The name of the image in that pool to map.
3190 * <snap_id>
3191 * An optional snapshot id. If provided, the mapping will
3192 * present data from the image at the time that snapshot was
3193 * created. The image head is used if no snapshot id is
3194 * provided. Snapshot mappings are always read-only.
2733 */ 3195 */
2734static char *rbd_add_parse_args(struct rbd_device *rbd_dev, 3196static int rbd_add_parse_args(const char *buf,
2735 const char *buf, 3197 struct ceph_options **ceph_opts,
2736 const char **mon_addrs, 3198 struct rbd_options **opts,
2737 size_t *mon_addrs_size, 3199 struct rbd_spec **rbd_spec)
2738 char *options,
2739 size_t options_size)
2740{ 3200{
2741 size_t len; 3201 size_t len;
2742 char *err_ptr = ERR_PTR(-EINVAL); 3202 char *options;
2743 char *snap_name; 3203 const char *mon_addrs;
3204 size_t mon_addrs_size;
3205 struct rbd_spec *spec = NULL;
3206 struct rbd_options *rbd_opts = NULL;
3207 struct ceph_options *copts;
3208 int ret;
2744 3209
2745 /* The first four tokens are required */ 3210 /* The first four tokens are required */
2746 3211
2747 len = next_token(&buf); 3212 len = next_token(&buf);
2748 if (!len) 3213 if (!len)
2749 return err_ptr; 3214 return -EINVAL; /* Missing monitor address(es) */
2750 *mon_addrs_size = len + 1; 3215 mon_addrs = buf;
2751 *mon_addrs = buf; 3216 mon_addrs_size = len + 1;
2752
2753 buf += len; 3217 buf += len;
2754 3218
2755 len = copy_token(&buf, options, options_size); 3219 ret = -EINVAL;
2756 if (!len || len >= options_size) 3220 options = dup_token(&buf, NULL);
2757 return err_ptr; 3221 if (!options)
3222 return -ENOMEM;
3223 if (!*options)
3224 goto out_err; /* Missing options */
2758 3225
2759 err_ptr = ERR_PTR(-ENOMEM); 3226 spec = rbd_spec_alloc();
2760 rbd_dev->pool_name = dup_token(&buf, NULL); 3227 if (!spec)
2761 if (!rbd_dev->pool_name) 3228 goto out_mem;
2762 goto out_err;
2763 3229
2764 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len); 3230 spec->pool_name = dup_token(&buf, NULL);
2765 if (!rbd_dev->image_name) 3231 if (!spec->pool_name)
2766 goto out_err; 3232 goto out_mem;
3233 if (!*spec->pool_name)
3234 goto out_err; /* Missing pool name */
2767 3235
2768 /* Snapshot name is optional */ 3236 spec->image_name = dup_token(&buf, &spec->image_name_len);
3237 if (!spec->image_name)
3238 goto out_mem;
3239 if (!*spec->image_name)
3240 goto out_err; /* Missing image name */
3241
3242 /*
3243 * Snapshot name is optional; default is to use "-"
3244 * (indicating the head/no snapshot).
3245 */
2769 len = next_token(&buf); 3246 len = next_token(&buf);
2770 if (!len) { 3247 if (!len) {
2771 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */ 3248 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2772 len = sizeof (RBD_SNAP_HEAD_NAME) - 1; 3249 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2773 } 3250 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
2774 snap_name = kmalloc(len + 1, GFP_KERNEL); 3251 ret = -ENAMETOOLONG;
2775 if (!snap_name)
2776 goto out_err; 3252 goto out_err;
2777 memcpy(snap_name, buf, len); 3253 }
2778 *(snap_name + len) = '\0'; 3254 spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
3255 if (!spec->snap_name)
3256 goto out_mem;
3257 memcpy(spec->snap_name, buf, len);
3258 *(spec->snap_name + len) = '\0';
2779 3259
2780dout(" SNAP_NAME is <%s>, len is %zd\n", snap_name, len); 3260 /* Initialize all rbd options to the defaults */
2781 3261
2782 return snap_name; 3262 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3263 if (!rbd_opts)
3264 goto out_mem;
3265
3266 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3267
3268 copts = ceph_parse_options(options, mon_addrs,
3269 mon_addrs + mon_addrs_size - 1,
3270 parse_rbd_opts_token, rbd_opts);
3271 if (IS_ERR(copts)) {
3272 ret = PTR_ERR(copts);
3273 goto out_err;
3274 }
3275 kfree(options);
2783 3276
3277 *ceph_opts = copts;
3278 *opts = rbd_opts;
3279 *rbd_spec = spec;
3280
3281 return 0;
3282out_mem:
3283 ret = -ENOMEM;
2784out_err: 3284out_err:
2785 kfree(rbd_dev->image_name); 3285 kfree(rbd_opts);
2786 rbd_dev->image_name = NULL; 3286 rbd_spec_put(spec);
2787 rbd_dev->image_name_len = 0; 3287 kfree(options);
2788 kfree(rbd_dev->pool_name);
2789 rbd_dev->pool_name = NULL;
2790 3288
2791 return err_ptr; 3289 return ret;
2792} 3290}
2793 3291
2794/* 3292/*
@@ -2814,14 +3312,22 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2814 void *p; 3312 void *p;
2815 3313
2816 /* 3314 /*
3315 * When probing a parent image, the image id is already
3316 * known (and the image name likely is not). There's no
3317 * need to fetch the image id again in this case.
3318 */
3319 if (rbd_dev->spec->image_id)
3320 return 0;
3321
3322 /*
2817 * First, see if the format 2 image id file exists, and if 3323 * First, see if the format 2 image id file exists, and if
2818 * so, get the image's persistent id from it. 3324 * so, get the image's persistent id from it.
2819 */ 3325 */
2820 size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len; 3326 size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
2821 object_name = kmalloc(size, GFP_NOIO); 3327 object_name = kmalloc(size, GFP_NOIO);
2822 if (!object_name) 3328 if (!object_name)
2823 return -ENOMEM; 3329 return -ENOMEM;
2824 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name); 3330 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
2825 dout("rbd id object name is %s\n", object_name); 3331 dout("rbd id object name is %s\n", object_name);
2826 3332
2827 /* Response will be an encoded string, which includes a length */ 3333 /* Response will be an encoded string, which includes a length */
@@ -2841,17 +3347,18 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2841 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 3347 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2842 if (ret < 0) 3348 if (ret < 0)
2843 goto out; 3349 goto out;
3350 ret = 0; /* rbd_req_sync_exec() can return positive */
2844 3351
2845 p = response; 3352 p = response;
2846 rbd_dev->image_id = ceph_extract_encoded_string(&p, 3353 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
2847 p + RBD_IMAGE_ID_LEN_MAX, 3354 p + RBD_IMAGE_ID_LEN_MAX,
2848 &rbd_dev->image_id_len, 3355 &rbd_dev->spec->image_id_len,
2849 GFP_NOIO); 3356 GFP_NOIO);
2850 if (IS_ERR(rbd_dev->image_id)) { 3357 if (IS_ERR(rbd_dev->spec->image_id)) {
2851 ret = PTR_ERR(rbd_dev->image_id); 3358 ret = PTR_ERR(rbd_dev->spec->image_id);
2852 rbd_dev->image_id = NULL; 3359 rbd_dev->spec->image_id = NULL;
2853 } else { 3360 } else {
2854 dout("image_id is %s\n", rbd_dev->image_id); 3361 dout("image_id is %s\n", rbd_dev->spec->image_id);
2855 } 3362 }
2856out: 3363out:
2857 kfree(response); 3364 kfree(response);
@@ -2867,26 +3374,33 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2867 3374
2868 /* Version 1 images have no id; empty string is used */ 3375 /* Version 1 images have no id; empty string is used */
2869 3376
2870 rbd_dev->image_id = kstrdup("", GFP_KERNEL); 3377 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
2871 if (!rbd_dev->image_id) 3378 if (!rbd_dev->spec->image_id)
2872 return -ENOMEM; 3379 return -ENOMEM;
2873 rbd_dev->image_id_len = 0; 3380 rbd_dev->spec->image_id_len = 0;
2874 3381
2875 /* Record the header object name for this rbd image. */ 3382 /* Record the header object name for this rbd image. */
2876 3383
2877 size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX); 3384 size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
2878 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 3385 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2879 if (!rbd_dev->header_name) { 3386 if (!rbd_dev->header_name) {
2880 ret = -ENOMEM; 3387 ret = -ENOMEM;
2881 goto out_err; 3388 goto out_err;
2882 } 3389 }
2883 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX); 3390 sprintf(rbd_dev->header_name, "%s%s",
3391 rbd_dev->spec->image_name, RBD_SUFFIX);
2884 3392
2885 /* Populate rbd image metadata */ 3393 /* Populate rbd image metadata */
2886 3394
2887 ret = rbd_read_header(rbd_dev, &rbd_dev->header); 3395 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2888 if (ret < 0) 3396 if (ret < 0)
2889 goto out_err; 3397 goto out_err;
3398
3399 /* Version 1 images have no parent (no layering) */
3400
3401 rbd_dev->parent_spec = NULL;
3402 rbd_dev->parent_overlap = 0;
3403
2890 rbd_dev->image_format = 1; 3404 rbd_dev->image_format = 1;
2891 3405
2892 dout("discovered version 1 image, header name is %s\n", 3406 dout("discovered version 1 image, header name is %s\n",
@@ -2897,8 +3411,8 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2897out_err: 3411out_err:
2898 kfree(rbd_dev->header_name); 3412 kfree(rbd_dev->header_name);
2899 rbd_dev->header_name = NULL; 3413 rbd_dev->header_name = NULL;
2900 kfree(rbd_dev->image_id); 3414 kfree(rbd_dev->spec->image_id);
2901 rbd_dev->image_id = NULL; 3415 rbd_dev->spec->image_id = NULL;
2902 3416
2903 return ret; 3417 return ret;
2904} 3418}
@@ -2913,12 +3427,12 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2913 * Image id was filled in by the caller. Record the header 3427 * Image id was filled in by the caller. Record the header
2914 * object name for this rbd image. 3428 * object name for this rbd image.
2915 */ 3429 */
2916 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len; 3430 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
2917 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 3431 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2918 if (!rbd_dev->header_name) 3432 if (!rbd_dev->header_name)
2919 return -ENOMEM; 3433 return -ENOMEM;
2920 sprintf(rbd_dev->header_name, "%s%s", 3434 sprintf(rbd_dev->header_name, "%s%s",
2921 RBD_HEADER_PREFIX, rbd_dev->image_id); 3435 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
2922 3436
2923 /* Get the size and object order for the image */ 3437 /* Get the size and object order for the image */
2924 3438
@@ -2932,12 +3446,20 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2932 if (ret < 0) 3446 if (ret < 0)
2933 goto out_err; 3447 goto out_err;
2934 3448
2935 /* Get the features for the image */ 3449 /* Get the and check features for the image */
2936 3450
2937 ret = rbd_dev_v2_features(rbd_dev); 3451 ret = rbd_dev_v2_features(rbd_dev);
2938 if (ret < 0) 3452 if (ret < 0)
2939 goto out_err; 3453 goto out_err;
2940 3454
3455 /* If the image supports layering, get the parent info */
3456
3457 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3458 ret = rbd_dev_v2_parent_info(rbd_dev);
3459 if (ret < 0)
3460 goto out_err;
3461 }
3462
2941 /* crypto and compression type aren't (yet) supported for v2 images */ 3463 /* crypto and compression type aren't (yet) supported for v2 images */
2942 3464
2943 rbd_dev->header.crypt_type = 0; 3465 rbd_dev->header.crypt_type = 0;
@@ -2955,8 +3477,11 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2955 dout("discovered version 2 image, header name is %s\n", 3477 dout("discovered version 2 image, header name is %s\n",
2956 rbd_dev->header_name); 3478 rbd_dev->header_name);
2957 3479
2958 return -ENOTSUPP; 3480 return 0;
2959out_err: 3481out_err:
3482 rbd_dev->parent_overlap = 0;
3483 rbd_spec_put(rbd_dev->parent_spec);
3484 rbd_dev->parent_spec = NULL;
2960 kfree(rbd_dev->header_name); 3485 kfree(rbd_dev->header_name);
2961 rbd_dev->header_name = NULL; 3486 rbd_dev->header_name = NULL;
2962 kfree(rbd_dev->header.object_prefix); 3487 kfree(rbd_dev->header.object_prefix);
@@ -2965,91 +3490,22 @@ out_err:
2965 return ret; 3490 return ret;
2966} 3491}
2967 3492
2968/* 3493static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
2969 * Probe for the existence of the header object for the given rbd
2970 * device. For format 2 images this includes determining the image
2971 * id.
2972 */
2973static int rbd_dev_probe(struct rbd_device *rbd_dev)
2974{ 3494{
2975 int ret; 3495 int ret;
2976 3496
2977 /* 3497 /* no need to lock here, as rbd_dev is not registered yet */
2978 * Get the id from the image id object. If it's not a 3498 ret = rbd_dev_snaps_update(rbd_dev);
2979 * format 2 image, we'll get ENOENT back, and we'll assume
2980 * it's a format 1 image.
2981 */
2982 ret = rbd_dev_image_id(rbd_dev);
2983 if (ret)
2984 ret = rbd_dev_v1_probe(rbd_dev);
2985 else
2986 ret = rbd_dev_v2_probe(rbd_dev);
2987 if (ret) 3499 if (ret)
2988 dout("probe failed, returning %d\n", ret); 3500 return ret;
2989
2990 return ret;
2991}
2992
2993static ssize_t rbd_add(struct bus_type *bus,
2994 const char *buf,
2995 size_t count)
2996{
2997 char *options;
2998 struct rbd_device *rbd_dev = NULL;
2999 const char *mon_addrs = NULL;
3000 size_t mon_addrs_size = 0;
3001 struct ceph_osd_client *osdc;
3002 int rc = -ENOMEM;
3003 char *snap_name;
3004
3005 if (!try_module_get(THIS_MODULE))
3006 return -ENODEV;
3007
3008 options = kmalloc(count, GFP_KERNEL);
3009 if (!options)
3010 goto err_out_mem;
3011 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3012 if (!rbd_dev)
3013 goto err_out_mem;
3014
3015 /* static rbd_device initialization */
3016 spin_lock_init(&rbd_dev->lock);
3017 INIT_LIST_HEAD(&rbd_dev->node);
3018 INIT_LIST_HEAD(&rbd_dev->snaps);
3019 init_rwsem(&rbd_dev->header_rwsem);
3020
3021 /* parse add command */
3022 snap_name = rbd_add_parse_args(rbd_dev, buf,
3023 &mon_addrs, &mon_addrs_size, options, count);
3024 if (IS_ERR(snap_name)) {
3025 rc = PTR_ERR(snap_name);
3026 goto err_out_mem;
3027 }
3028
3029 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3030 if (rc < 0)
3031 goto err_out_args;
3032
3033 /* pick the pool */
3034 osdc = &rbd_dev->rbd_client->client->osdc;
3035 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3036 if (rc < 0)
3037 goto err_out_client;
3038 rbd_dev->pool_id = rc;
3039
3040 rc = rbd_dev_probe(rbd_dev);
3041 if (rc < 0)
3042 goto err_out_client;
3043 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3044 3501
3045 /* no need to lock here, as rbd_dev is not registered yet */ 3502 ret = rbd_dev_probe_update_spec(rbd_dev);
3046 rc = rbd_dev_snaps_update(rbd_dev); 3503 if (ret)
3047 if (rc) 3504 goto err_out_snaps;
3048 goto err_out_header;
3049 3505
3050 rc = rbd_dev_set_mapping(rbd_dev, snap_name); 3506 ret = rbd_dev_set_mapping(rbd_dev);
3051 if (rc) 3507 if (ret)
3052 goto err_out_header; 3508 goto err_out_snaps;
3053 3509
3054 /* generate unique id: find highest unique id, add one */ 3510 /* generate unique id: find highest unique id, add one */
3055 rbd_dev_id_get(rbd_dev); 3511 rbd_dev_id_get(rbd_dev);
@@ -3061,34 +3517,33 @@ static ssize_t rbd_add(struct bus_type *bus,
3061 3517
3062 /* Get our block major device number. */ 3518 /* Get our block major device number. */
3063 3519
3064 rc = register_blkdev(0, rbd_dev->name); 3520 ret = register_blkdev(0, rbd_dev->name);
3065 if (rc < 0) 3521 if (ret < 0)
3066 goto err_out_id; 3522 goto err_out_id;
3067 rbd_dev->major = rc; 3523 rbd_dev->major = ret;
3068 3524
3069 /* Set up the blkdev mapping. */ 3525 /* Set up the blkdev mapping. */
3070 3526
3071 rc = rbd_init_disk(rbd_dev); 3527 ret = rbd_init_disk(rbd_dev);
3072 if (rc) 3528 if (ret)
3073 goto err_out_blkdev; 3529 goto err_out_blkdev;
3074 3530
3075 rc = rbd_bus_add_dev(rbd_dev); 3531 ret = rbd_bus_add_dev(rbd_dev);
3076 if (rc) 3532 if (ret)
3077 goto err_out_disk; 3533 goto err_out_disk;
3078 3534
3079 /* 3535 /*
3080 * At this point cleanup in the event of an error is the job 3536 * At this point cleanup in the event of an error is the job
3081 * of the sysfs code (initiated by rbd_bus_del_dev()). 3537 * of the sysfs code (initiated by rbd_bus_del_dev()).
3082 */ 3538 */
3083
3084 down_write(&rbd_dev->header_rwsem); 3539 down_write(&rbd_dev->header_rwsem);
3085 rc = rbd_dev_snaps_register(rbd_dev); 3540 ret = rbd_dev_snaps_register(rbd_dev);
3086 up_write(&rbd_dev->header_rwsem); 3541 up_write(&rbd_dev->header_rwsem);
3087 if (rc) 3542 if (ret)
3088 goto err_out_bus; 3543 goto err_out_bus;
3089 3544
3090 rc = rbd_init_watch_dev(rbd_dev); 3545 ret = rbd_init_watch_dev(rbd_dev);
3091 if (rc) 3546 if (ret)
3092 goto err_out_bus; 3547 goto err_out_bus;
3093 3548
3094 /* Everything's ready. Announce the disk to the world. */ 3549 /* Everything's ready. Announce the disk to the world. */
@@ -3098,37 +3553,119 @@ static ssize_t rbd_add(struct bus_type *bus,
3098 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, 3553 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3099 (unsigned long long) rbd_dev->mapping.size); 3554 (unsigned long long) rbd_dev->mapping.size);
3100 3555
3101 return count; 3556 return ret;
3102
3103err_out_bus: 3557err_out_bus:
3104 /* this will also clean up rest of rbd_dev stuff */ 3558 /* this will also clean up rest of rbd_dev stuff */
3105 3559
3106 rbd_bus_del_dev(rbd_dev); 3560 rbd_bus_del_dev(rbd_dev);
3107 kfree(options);
3108 return rc;
3109 3561
3562 return ret;
3110err_out_disk: 3563err_out_disk:
3111 rbd_free_disk(rbd_dev); 3564 rbd_free_disk(rbd_dev);
3112err_out_blkdev: 3565err_out_blkdev:
3113 unregister_blkdev(rbd_dev->major, rbd_dev->name); 3566 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3114err_out_id: 3567err_out_id:
3115 rbd_dev_id_put(rbd_dev); 3568 rbd_dev_id_put(rbd_dev);
3116err_out_header: 3569err_out_snaps:
3117 rbd_header_free(&rbd_dev->header); 3570 rbd_remove_all_snaps(rbd_dev);
3571
3572 return ret;
3573}
3574
3575/*
3576 * Probe for the existence of the header object for the given rbd
3577 * device. For format 2 images this includes determining the image
3578 * id.
3579 */
3580static int rbd_dev_probe(struct rbd_device *rbd_dev)
3581{
3582 int ret;
3583
3584 /*
3585 * Get the id from the image id object. If it's not a
3586 * format 2 image, we'll get ENOENT back, and we'll assume
3587 * it's a format 1 image.
3588 */
3589 ret = rbd_dev_image_id(rbd_dev);
3590 if (ret)
3591 ret = rbd_dev_v1_probe(rbd_dev);
3592 else
3593 ret = rbd_dev_v2_probe(rbd_dev);
3594 if (ret) {
3595 dout("probe failed, returning %d\n", ret);
3596
3597 return ret;
3598 }
3599
3600 ret = rbd_dev_probe_finish(rbd_dev);
3601 if (ret)
3602 rbd_header_free(&rbd_dev->header);
3603
3604 return ret;
3605}
3606
3607static ssize_t rbd_add(struct bus_type *bus,
3608 const char *buf,
3609 size_t count)
3610{
3611 struct rbd_device *rbd_dev = NULL;
3612 struct ceph_options *ceph_opts = NULL;
3613 struct rbd_options *rbd_opts = NULL;
3614 struct rbd_spec *spec = NULL;
3615 struct rbd_client *rbdc;
3616 struct ceph_osd_client *osdc;
3617 int rc = -ENOMEM;
3618
3619 if (!try_module_get(THIS_MODULE))
3620 return -ENODEV;
3621
3622 /* parse add command */
3623 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
3624 if (rc < 0)
3625 goto err_out_module;
3626
3627 rbdc = rbd_get_client(ceph_opts);
3628 if (IS_ERR(rbdc)) {
3629 rc = PTR_ERR(rbdc);
3630 goto err_out_args;
3631 }
3632 ceph_opts = NULL; /* rbd_dev client now owns this */
3633
3634 /* pick the pool */
3635 osdc = &rbdc->client->osdc;
3636 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
3637 if (rc < 0)
3638 goto err_out_client;
3639 spec->pool_id = (u64) rc;
3640
3641 rbd_dev = rbd_dev_create(rbdc, spec);
3642 if (!rbd_dev)
3643 goto err_out_client;
3644 rbdc = NULL; /* rbd_dev now owns this */
3645 spec = NULL; /* rbd_dev now owns this */
3646
3647 rbd_dev->mapping.read_only = rbd_opts->read_only;
3648 kfree(rbd_opts);
3649 rbd_opts = NULL; /* done with this */
3650
3651 rc = rbd_dev_probe(rbd_dev);
3652 if (rc < 0)
3653 goto err_out_rbd_dev;
3654
3655 return count;
3656err_out_rbd_dev:
3657 rbd_dev_destroy(rbd_dev);
3118err_out_client: 3658err_out_client:
3119 kfree(rbd_dev->header_name); 3659 rbd_put_client(rbdc);
3120 rbd_put_client(rbd_dev);
3121 kfree(rbd_dev->image_id);
3122err_out_args: 3660err_out_args:
3123 kfree(rbd_dev->mapping.snap_name); 3661 if (ceph_opts)
3124 kfree(rbd_dev->image_name); 3662 ceph_destroy_options(ceph_opts);
3125 kfree(rbd_dev->pool_name); 3663 kfree(rbd_opts);
3126err_out_mem: 3664 rbd_spec_put(spec);
3127 kfree(rbd_dev); 3665err_out_module:
3128 kfree(options); 3666 module_put(THIS_MODULE);
3129 3667
3130 dout("Error adding device %s\n", buf); 3668 dout("Error adding device %s\n", buf);
3131 module_put(THIS_MODULE);
3132 3669
3133 return (ssize_t) rc; 3670 return (ssize_t) rc;
3134} 3671}
@@ -3163,7 +3700,6 @@ static void rbd_dev_release(struct device *dev)
3163 if (rbd_dev->watch_event) 3700 if (rbd_dev->watch_event)
3164 rbd_req_sync_unwatch(rbd_dev); 3701 rbd_req_sync_unwatch(rbd_dev);
3165 3702
3166 rbd_put_client(rbd_dev);
3167 3703
3168 /* clean up and free blkdev */ 3704 /* clean up and free blkdev */
3169 rbd_free_disk(rbd_dev); 3705 rbd_free_disk(rbd_dev);
@@ -3173,13 +3709,9 @@ static void rbd_dev_release(struct device *dev)
3173 rbd_header_free(&rbd_dev->header); 3709 rbd_header_free(&rbd_dev->header);
3174 3710
3175 /* done with the id, and with the rbd_dev */ 3711 /* done with the id, and with the rbd_dev */
3176 kfree(rbd_dev->mapping.snap_name);
3177 kfree(rbd_dev->image_id);
3178 kfree(rbd_dev->header_name);
3179 kfree(rbd_dev->pool_name);
3180 kfree(rbd_dev->image_name);
3181 rbd_dev_id_put(rbd_dev); 3712 rbd_dev_id_put(rbd_dev);
3182 kfree(rbd_dev); 3713 rbd_assert(rbd_dev->rbd_client != NULL);
3714 rbd_dev_destroy(rbd_dev);
3183 3715
3184 /* release module ref */ 3716 /* release module ref */
3185 module_put(THIS_MODULE); 3717 module_put(THIS_MODULE);
@@ -3211,7 +3743,12 @@ static ssize_t rbd_remove(struct bus_type *bus,
3211 goto done; 3743 goto done;
3212 } 3744 }
3213 3745
3214 __rbd_remove_all_snaps(rbd_dev); 3746 if (rbd_dev->open_count) {
3747 ret = -EBUSY;
3748 goto done;
3749 }
3750
3751 rbd_remove_all_snaps(rbd_dev);
3215 rbd_bus_del_dev(rbd_dev); 3752 rbd_bus_del_dev(rbd_dev);
3216 3753
3217done: 3754done:
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index cbe77fa105b..49d77cbcf8b 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -46,8 +46,6 @@
46#define RBD_MIN_OBJ_ORDER 16 46#define RBD_MIN_OBJ_ORDER 16
47#define RBD_MAX_OBJ_ORDER 30 47#define RBD_MAX_OBJ_ORDER 30
48 48
49#define RBD_MAX_SEG_NAME_LEN 128
50
51#define RBD_COMP_NONE 0 49#define RBD_COMP_NONE 0
52#define RBD_CRYPT_NONE 0 50#define RBD_CRYPT_NONE 0
53 51
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 6690269f5dd..064d1a68d2c 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -267,6 +267,14 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
267 kfree(req->r_pages); 267 kfree(req->r_pages);
268} 268}
269 269
270static void ceph_unlock_page_vector(struct page **pages, int num_pages)
271{
272 int i;
273
274 for (i = 0; i < num_pages; i++)
275 unlock_page(pages[i]);
276}
277
270/* 278/*
271 * start an async read(ahead) operation. return nr_pages we submitted 279 * start an async read(ahead) operation. return nr_pages we submitted
272 * a read for on success, or negative error code. 280 * a read for on success, or negative error code.
@@ -347,6 +355,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
347 return nr_pages; 355 return nr_pages;
348 356
349out_pages: 357out_pages:
358 ceph_unlock_page_vector(pages, nr_pages);
350 ceph_release_page_vector(pages, nr_pages); 359 ceph_release_page_vector(pages, nr_pages);
351out: 360out:
352 ceph_osdc_put_request(req); 361 ceph_osdc_put_request(req);
@@ -1078,23 +1087,51 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
1078 struct page **pagep, void **fsdata) 1087 struct page **pagep, void **fsdata)
1079{ 1088{
1080 struct inode *inode = file->f_dentry->d_inode; 1089 struct inode *inode = file->f_dentry->d_inode;
1090 struct ceph_inode_info *ci = ceph_inode(inode);
1091 struct ceph_file_info *fi = file->private_data;
1081 struct page *page; 1092 struct page *page;
1082 pgoff_t index = pos >> PAGE_CACHE_SHIFT; 1093 pgoff_t index = pos >> PAGE_CACHE_SHIFT;
1083 int r; 1094 int r, want, got = 0;
1095
1096 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1097 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1098 else
1099 want = CEPH_CAP_FILE_BUFFER;
1100
1101 dout("write_begin %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
1102 inode, ceph_vinop(inode), pos, len, inode->i_size);
1103 r = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos+len);
1104 if (r < 0)
1105 return r;
1106 dout("write_begin %p %llx.%llx %llu~%u got cap refs on %s\n",
1107 inode, ceph_vinop(inode), pos, len, ceph_cap_string(got));
1108 if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) {
1109 ceph_put_cap_refs(ci, got);
1110 return -EAGAIN;
1111 }
1084 1112
1085 do { 1113 do {
1086 /* get a page */ 1114 /* get a page */
1087 page = grab_cache_page_write_begin(mapping, index, 0); 1115 page = grab_cache_page_write_begin(mapping, index, 0);
1088 if (!page) 1116 if (!page) {
1089 return -ENOMEM; 1117 r = -ENOMEM;
1090 *pagep = page; 1118 break;
1119 }
1091 1120
1092 dout("write_begin file %p inode %p page %p %d~%d\n", file, 1121 dout("write_begin file %p inode %p page %p %d~%d\n", file,
1093 inode, page, (int)pos, (int)len); 1122 inode, page, (int)pos, (int)len);
1094 1123
1095 r = ceph_update_writeable_page(file, pos, len, page); 1124 r = ceph_update_writeable_page(file, pos, len, page);
1125 if (r)
1126 page_cache_release(page);
1096 } while (r == -EAGAIN); 1127 } while (r == -EAGAIN);
1097 1128
1129 if (r) {
1130 ceph_put_cap_refs(ci, got);
1131 } else {
1132 *pagep = page;
1133 *(int *)fsdata = got;
1134 }
1098 return r; 1135 return r;
1099} 1136}
1100 1137
@@ -1108,10 +1145,12 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
1108 struct page *page, void *fsdata) 1145 struct page *page, void *fsdata)
1109{ 1146{
1110 struct inode *inode = file->f_dentry->d_inode; 1147 struct inode *inode = file->f_dentry->d_inode;
1148 struct ceph_inode_info *ci = ceph_inode(inode);
1111 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 1149 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1112 struct ceph_mds_client *mdsc = fsc->mdsc; 1150 struct ceph_mds_client *mdsc = fsc->mdsc;
1113 unsigned from = pos & (PAGE_CACHE_SIZE - 1); 1151 unsigned from = pos & (PAGE_CACHE_SIZE - 1);
1114 int check_cap = 0; 1152 int check_cap = 0;
1153 int got = (unsigned long)fsdata;
1115 1154
1116 dout("write_end file %p inode %p page %p %d~%d (%d)\n", file, 1155 dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
1117 inode, page, (int)pos, (int)copied, (int)len); 1156 inode, page, (int)pos, (int)copied, (int)len);
@@ -1134,6 +1173,19 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
1134 up_read(&mdsc->snap_rwsem); 1173 up_read(&mdsc->snap_rwsem);
1135 page_cache_release(page); 1174 page_cache_release(page);
1136 1175
1176 if (copied > 0) {
1177 int dirty;
1178 spin_lock(&ci->i_ceph_lock);
1179 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
1180 spin_unlock(&ci->i_ceph_lock);
1181 if (dirty)
1182 __mark_inode_dirty(inode, dirty);
1183 }
1184
1185 dout("write_end %p %llx.%llx %llu~%u dropping cap refs on %s\n",
1186 inode, ceph_vinop(inode), pos, len, ceph_cap_string(got));
1187 ceph_put_cap_refs(ci, got);
1188
1137 if (check_cap) 1189 if (check_cap)
1138 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL); 1190 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
1139 1191
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 3251e9cc640..a1d9bb30c1b 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -236,8 +236,10 @@ static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
236 if (!ctx) { 236 if (!ctx) {
237 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); 237 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
238 if (cap) { 238 if (cap) {
239 spin_lock(&mdsc->caps_list_lock);
239 mdsc->caps_use_count++; 240 mdsc->caps_use_count++;
240 mdsc->caps_total_count++; 241 mdsc->caps_total_count++;
242 spin_unlock(&mdsc->caps_list_lock);
241 } 243 }
242 return cap; 244 return cap;
243 } 245 }
@@ -1349,11 +1351,15 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
1349 if (!ci->i_head_snapc) 1351 if (!ci->i_head_snapc)
1350 ci->i_head_snapc = ceph_get_snap_context( 1352 ci->i_head_snapc = ceph_get_snap_context(
1351 ci->i_snap_realm->cached_context); 1353 ci->i_snap_realm->cached_context);
1352 dout(" inode %p now dirty snapc %p\n", &ci->vfs_inode, 1354 dout(" inode %p now dirty snapc %p auth cap %p\n",
1353 ci->i_head_snapc); 1355 &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
1354 BUG_ON(!list_empty(&ci->i_dirty_item)); 1356 BUG_ON(!list_empty(&ci->i_dirty_item));
1355 spin_lock(&mdsc->cap_dirty_lock); 1357 spin_lock(&mdsc->cap_dirty_lock);
1356 list_add(&ci->i_dirty_item, &mdsc->cap_dirty); 1358 if (ci->i_auth_cap)
1359 list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
1360 else
1361 list_add(&ci->i_dirty_item,
1362 &mdsc->cap_dirty_migrating);
1357 spin_unlock(&mdsc->cap_dirty_lock); 1363 spin_unlock(&mdsc->cap_dirty_lock);
1358 if (ci->i_flushing_caps == 0) { 1364 if (ci->i_flushing_caps == 0) {
1359 ihold(inode); 1365 ihold(inode);
@@ -2388,7 +2394,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2388 &atime); 2394 &atime);
2389 2395
2390 /* max size increase? */ 2396 /* max size increase? */
2391 if (max_size != ci->i_max_size) { 2397 if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
2392 dout("max_size %lld -> %llu\n", ci->i_max_size, max_size); 2398 dout("max_size %lld -> %llu\n", ci->i_max_size, max_size);
2393 ci->i_max_size = max_size; 2399 ci->i_max_size = max_size;
2394 if (max_size >= ci->i_wanted_max_size) { 2400 if (max_size >= ci->i_wanted_max_size) {
@@ -2745,6 +2751,7 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
2745 2751
2746 /* make sure we re-request max_size, if necessary */ 2752 /* make sure we re-request max_size, if necessary */
2747 spin_lock(&ci->i_ceph_lock); 2753 spin_lock(&ci->i_ceph_lock);
2754 ci->i_wanted_max_size = 0; /* reset */
2748 ci->i_requested_max_size = 0; 2755 ci->i_requested_max_size = 0;
2749 spin_unlock(&ci->i_ceph_lock); 2756 spin_unlock(&ci->i_ceph_lock);
2750} 2757}
@@ -2840,8 +2847,6 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2840 case CEPH_CAP_OP_IMPORT: 2847 case CEPH_CAP_OP_IMPORT:
2841 handle_cap_import(mdsc, inode, h, session, 2848 handle_cap_import(mdsc, inode, h, session,
2842 snaptrace, snaptrace_len); 2849 snaptrace, snaptrace_len);
2843 ceph_check_caps(ceph_inode(inode), 0, session);
2844 goto done_unlocked;
2845 } 2850 }
2846 2851
2847 /* the rest require a cap */ 2852 /* the rest require a cap */
@@ -2858,6 +2863,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2858 switch (op) { 2863 switch (op) {
2859 case CEPH_CAP_OP_REVOKE: 2864 case CEPH_CAP_OP_REVOKE:
2860 case CEPH_CAP_OP_GRANT: 2865 case CEPH_CAP_OP_GRANT:
2866 case CEPH_CAP_OP_IMPORT:
2861 handle_cap_grant(inode, h, session, cap, msg->middle); 2867 handle_cap_grant(inode, h, session, cap, msg->middle);
2862 goto done_unlocked; 2868 goto done_unlocked;
2863 2869
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index d4dfdcf76d7..e51558fca3a 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -712,63 +712,53 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
712 struct ceph_osd_client *osdc = 712 struct ceph_osd_client *osdc =
713 &ceph_sb_to_client(inode->i_sb)->client->osdc; 713 &ceph_sb_to_client(inode->i_sb)->client->osdc;
714 loff_t endoff = pos + iov->iov_len; 714 loff_t endoff = pos + iov->iov_len;
715 int want, got = 0; 715 int got = 0;
716 int ret, err; 716 int ret, err, written;
717 717
718 if (ceph_snap(inode) != CEPH_NOSNAP) 718 if (ceph_snap(inode) != CEPH_NOSNAP)
719 return -EROFS; 719 return -EROFS;
720 720
721retry_snap: 721retry_snap:
722 written = 0;
722 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 723 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
723 return -ENOSPC; 724 return -ENOSPC;
724 __ceph_do_pending_vmtruncate(inode); 725 __ceph_do_pending_vmtruncate(inode);
725 dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
726 inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
727 inode->i_size);
728 if (fi->fmode & CEPH_FILE_MODE_LAZY)
729 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
730 else
731 want = CEPH_CAP_FILE_BUFFER;
732 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
733 if (ret < 0)
734 goto out_put;
735
736 dout("aio_write %p %llx.%llx %llu~%u got cap refs on %s\n",
737 inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
738 ceph_cap_string(got));
739
740 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
741 (iocb->ki_filp->f_flags & O_DIRECT) ||
742 (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
743 (fi->flags & CEPH_F_SYNC)) {
744 ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
745 &iocb->ki_pos);
746 } else {
747 /*
748 * buffered write; drop Fw early to avoid slow
749 * revocation if we get stuck on balance_dirty_pages
750 */
751 int dirty;
752
753 spin_lock(&ci->i_ceph_lock);
754 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
755 spin_unlock(&ci->i_ceph_lock);
756 ceph_put_cap_refs(ci, got);
757 726
727 /*
728 * try to do a buffered write. if we don't have sufficient
729 * caps, we'll get -EAGAIN from generic_file_aio_write, or a
730 * short write if we only get caps for some pages.
731 */
732 if (!(iocb->ki_filp->f_flags & O_DIRECT) &&
733 !(inode->i_sb->s_flags & MS_SYNCHRONOUS) &&
734 !(fi->flags & CEPH_F_SYNC)) {
758 ret = generic_file_aio_write(iocb, iov, nr_segs, pos); 735 ret = generic_file_aio_write(iocb, iov, nr_segs, pos);
736 if (ret >= 0)
737 written = ret;
738
759 if ((ret >= 0 || ret == -EIOCBQUEUED) && 739 if ((ret >= 0 || ret == -EIOCBQUEUED) &&
760 ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host) 740 ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host)
761 || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) { 741 || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) {
762 err = vfs_fsync_range(file, pos, pos + ret - 1, 1); 742 err = vfs_fsync_range(file, pos, pos + written - 1, 1);
763 if (err < 0) 743 if (err < 0)
764 ret = err; 744 ret = err;
765 } 745 }
746 if ((ret < 0 && ret != -EAGAIN) || pos + written >= endoff)
747 goto out;
748 }
766 749
767 if (dirty) 750 dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
768 __mark_inode_dirty(inode, dirty); 751 inode, ceph_vinop(inode), pos + written,
752 (unsigned)iov->iov_len - written, inode->i_size);
753 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, 0, &got, endoff);
754 if (ret < 0)
769 goto out; 755 goto out;
770 }
771 756
757 dout("aio_write %p %llx.%llx %llu~%u got cap refs on %s\n",
758 inode, ceph_vinop(inode), pos + written,
759 (unsigned)iov->iov_len - written, ceph_cap_string(got));
760 ret = ceph_sync_write(file, iov->iov_base + written,
761 iov->iov_len - written, &iocb->ki_pos);
772 if (ret >= 0) { 762 if (ret >= 0) {
773 int dirty; 763 int dirty;
774 spin_lock(&ci->i_ceph_lock); 764 spin_lock(&ci->i_ceph_lock);
@@ -777,13 +767,10 @@ retry_snap:
777 if (dirty) 767 if (dirty)
778 __mark_inode_dirty(inode, dirty); 768 __mark_inode_dirty(inode, dirty);
779 } 769 }
780
781out_put:
782 dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n", 770 dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n",
783 inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len, 771 inode, ceph_vinop(inode), pos + written,
784 ceph_cap_string(got)); 772 (unsigned)iov->iov_len - written, ceph_cap_string(got));
785 ceph_put_cap_refs(ci, got); 773 ceph_put_cap_refs(ci, got);
786
787out: 774out:
788 if (ret == -EOLDSNAPC) { 775 if (ret == -EOLDSNAPC) {
789 dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n", 776 dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n",
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index ba95eea201b..2971eaa65cd 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1466,7 +1466,7 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)
1466{ 1466{
1467 struct ceph_inode_info *ci = ceph_inode(inode); 1467 struct ceph_inode_info *ci = ceph_inode(inode);
1468 u64 to; 1468 u64 to;
1469 int wrbuffer_refs, wake = 0; 1469 int wrbuffer_refs, finish = 0;
1470 1470
1471retry: 1471retry:
1472 spin_lock(&ci->i_ceph_lock); 1472 spin_lock(&ci->i_ceph_lock);
@@ -1498,15 +1498,18 @@ retry:
1498 truncate_inode_pages(inode->i_mapping, to); 1498 truncate_inode_pages(inode->i_mapping, to);
1499 1499
1500 spin_lock(&ci->i_ceph_lock); 1500 spin_lock(&ci->i_ceph_lock);
1501 ci->i_truncate_pending--; 1501 if (to == ci->i_truncate_size) {
1502 if (ci->i_truncate_pending == 0) 1502 ci->i_truncate_pending = 0;
1503 wake = 1; 1503 finish = 1;
1504 }
1504 spin_unlock(&ci->i_ceph_lock); 1505 spin_unlock(&ci->i_ceph_lock);
1506 if (!finish)
1507 goto retry;
1505 1508
1506 if (wrbuffer_refs == 0) 1509 if (wrbuffer_refs == 0)
1507 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); 1510 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
1508 if (wake) 1511
1509 wake_up_all(&ci->i_cap_wq); 1512 wake_up_all(&ci->i_cap_wq);
1510} 1513}
1511 1514
1512 1515
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 1bcf712655d..9165eb8309e 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1590,7 +1590,7 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1590 } else if (rpath || rino) { 1590 } else if (rpath || rino) {
1591 *ino = rino; 1591 *ino = rino;
1592 *ppath = rpath; 1592 *ppath = rpath;
1593 *pathlen = strlen(rpath); 1593 *pathlen = rpath ? strlen(rpath) : 0;
1594 dout(" path %.*s\n", *pathlen, rpath); 1594 dout(" path %.*s\n", *pathlen, rpath);
1595 } 1595 }
1596 1596
@@ -1876,9 +1876,14 @@ finish:
1876static void __wake_requests(struct ceph_mds_client *mdsc, 1876static void __wake_requests(struct ceph_mds_client *mdsc,
1877 struct list_head *head) 1877 struct list_head *head)
1878{ 1878{
1879 struct ceph_mds_request *req, *nreq; 1879 struct ceph_mds_request *req;
1880 LIST_HEAD(tmp_list);
1881
1882 list_splice_init(head, &tmp_list);
1880 1883
1881 list_for_each_entry_safe(req, nreq, head, r_wait) { 1884 while (!list_empty(&tmp_list)) {
1885 req = list_entry(tmp_list.next,
1886 struct ceph_mds_request, r_wait);
1882 list_del_init(&req->r_wait); 1887 list_del_init(&req->r_wait);
1883 __do_request(mdsc, req); 1888 __do_request(mdsc, req);
1884 } 1889 }
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 2eb43f21132..e86aa994812 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -403,8 +403,6 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
403 seq_printf(m, ",mount_timeout=%d", opt->mount_timeout); 403 seq_printf(m, ",mount_timeout=%d", opt->mount_timeout);
404 if (opt->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT) 404 if (opt->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT)
405 seq_printf(m, ",osd_idle_ttl=%d", opt->osd_idle_ttl); 405 seq_printf(m, ",osd_idle_ttl=%d", opt->osd_idle_ttl);
406 if (opt->osd_timeout != CEPH_OSD_TIMEOUT_DEFAULT)
407 seq_printf(m, ",osdtimeout=%d", opt->osd_timeout);
408 if (opt->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT) 406 if (opt->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT)
409 seq_printf(m, ",osdkeepalivetimeout=%d", 407 seq_printf(m, ",osdkeepalivetimeout=%d",
410 opt->osd_keepalive_timeout); 408 opt->osd_keepalive_timeout);
@@ -849,7 +847,7 @@ static int ceph_register_bdi(struct super_block *sb,
849 fsc->backing_dev_info.ra_pages = 847 fsc->backing_dev_info.ra_pages =
850 default_backing_dev_info.ra_pages; 848 default_backing_dev_info.ra_pages;
851 849
852 err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%d", 850 err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%ld",
853 atomic_long_inc_return(&bdi_seq)); 851 atomic_long_inc_return(&bdi_seq));
854 if (!err) 852 if (!err)
855 sb->s_bdi = &fsc->backing_dev_info; 853 sb->s_bdi = &fsc->backing_dev_info;
diff --git a/include/linux/backing-dev.h b/include/linux/backing-dev.h
index 2a9a9abc912..12731a19ef0 100644
--- a/include/linux/backing-dev.h
+++ b/include/linux/backing-dev.h
@@ -114,6 +114,7 @@ struct backing_dev_info {
114int bdi_init(struct backing_dev_info *bdi); 114int bdi_init(struct backing_dev_info *bdi);
115void bdi_destroy(struct backing_dev_info *bdi); 115void bdi_destroy(struct backing_dev_info *bdi);
116 116
117__printf(3, 4)
117int bdi_register(struct backing_dev_info *bdi, struct device *parent, 118int bdi_register(struct backing_dev_info *bdi, struct device *parent,
118 const char *fmt, ...); 119 const char *fmt, ...);
119int bdi_register_dev(struct backing_dev_info *bdi, dev_t dev); 120int bdi_register_dev(struct backing_dev_info *bdi, dev_t dev);
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 6470792b13d..084d3c622b1 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -43,7 +43,6 @@ struct ceph_options {
43 struct ceph_entity_addr my_addr; 43 struct ceph_entity_addr my_addr;
44 int mount_timeout; 44 int mount_timeout;
45 int osd_idle_ttl; 45 int osd_idle_ttl;
46 int osd_timeout;
47 int osd_keepalive_timeout; 46 int osd_keepalive_timeout;
48 47
49 /* 48 /*
@@ -63,7 +62,6 @@ struct ceph_options {
63 * defaults 62 * defaults
64 */ 63 */
65#define CEPH_MOUNT_TIMEOUT_DEFAULT 60 64#define CEPH_MOUNT_TIMEOUT_DEFAULT 60
66#define CEPH_OSD_TIMEOUT_DEFAULT 60 /* seconds */
67#define CEPH_OSD_KEEPALIVE_DEFAULT 5 65#define CEPH_OSD_KEEPALIVE_DEFAULT 5
68#define CEPH_OSD_IDLE_TTL_DEFAULT 60 66#define CEPH_OSD_IDLE_TTL_DEFAULT 60
69 67
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index e37acbe989a..10a417f9f76 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -123,6 +123,7 @@ extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
123extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, 123extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
124 struct ceph_pg pgid); 124 struct ceph_pg pgid);
125 125
126extern const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id);
126extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name); 127extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name);
127 128
128#endif 129#endif
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index de91fbdf127..2c04afeead1 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -87,6 +87,8 @@ struct ceph_pg {
87 * 87 *
88 * lpgp_num -- as above. 88 * lpgp_num -- as above.
89 */ 89 */
90#define CEPH_NOPOOL ((__u64) (-1)) /* pool id not defined */
91
90#define CEPH_PG_TYPE_REP 1 92#define CEPH_PG_TYPE_REP 1
91#define CEPH_PG_TYPE_RAID4 2 93#define CEPH_PG_TYPE_RAID4 2
92#define CEPH_PG_POOL_VERSION 2 94#define CEPH_PG_POOL_VERSION 2
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index a8020293f34..ee71ea26777 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -305,7 +305,6 @@ ceph_parse_options(char *options, const char *dev_name,
305 305
306 /* start with defaults */ 306 /* start with defaults */
307 opt->flags = CEPH_OPT_DEFAULT; 307 opt->flags = CEPH_OPT_DEFAULT;
308 opt->osd_timeout = CEPH_OSD_TIMEOUT_DEFAULT;
309 opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT; 308 opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
310 opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */ 309 opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
311 opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */ 310 opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */
@@ -391,7 +390,7 @@ ceph_parse_options(char *options, const char *dev_name,
391 390
392 /* misc */ 391 /* misc */
393 case Opt_osdtimeout: 392 case Opt_osdtimeout:
394 opt->osd_timeout = intval; 393 pr_warning("ignoring deprecated osdtimeout option\n");
395 break; 394 break;
396 case Opt_osdkeepalivetimeout: 395 case Opt_osdkeepalivetimeout:
397 opt->osd_keepalive_timeout = intval; 396 opt->osd_keepalive_timeout = intval;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 3ef1759403b..4d111fd2b49 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -2244,22 +2244,62 @@ bad_tag:
2244 2244
2245 2245
2246/* 2246/*
2247 * Atomically queue work on a connection. Bump @con reference to 2247 * Atomically queue work on a connection after the specified delay.
2248 * avoid races with connection teardown. 2248 * Bump @con reference to avoid races with connection teardown.
2249 * Returns 0 if work was queued, or an error code otherwise.
2249 */ 2250 */
2250static void queue_con(struct ceph_connection *con) 2251static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
2251{ 2252{
2252 if (!con->ops->get(con)) { 2253 if (!con->ops->get(con)) {
2253 dout("queue_con %p ref count 0\n", con); 2254 dout("%s %p ref count 0\n", __func__, con);
2254 return; 2255
2256 return -ENOENT;
2255 } 2257 }
2256 2258
2257 if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) { 2259 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
2258 dout("queue_con %p - already queued\n", con); 2260 dout("%s %p - already queued\n", __func__, con);
2259 con->ops->put(con); 2261 con->ops->put(con);
2260 } else { 2262
2261 dout("queue_con %p\n", con); 2263 return -EBUSY;
2262 } 2264 }
2265
2266 dout("%s %p %lu\n", __func__, con, delay);
2267
2268 return 0;
2269}
2270
2271static void queue_con(struct ceph_connection *con)
2272{
2273 (void) queue_con_delay(con, 0);
2274}
2275
2276static bool con_sock_closed(struct ceph_connection *con)
2277{
2278 if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags))
2279 return false;
2280
2281#define CASE(x) \
2282 case CON_STATE_ ## x: \
2283 con->error_msg = "socket closed (con state " #x ")"; \
2284 break;
2285
2286 switch (con->state) {
2287 CASE(CLOSED);
2288 CASE(PREOPEN);
2289 CASE(CONNECTING);
2290 CASE(NEGOTIATING);
2291 CASE(OPEN);
2292 CASE(STANDBY);
2293 default:
2294 pr_warning("%s con %p unrecognized state %lu\n",
2295 __func__, con, con->state);
2296 con->error_msg = "unrecognized con state";
2297 BUG();
2298 break;
2299 }
2300#undef CASE
2301
2302 return true;
2263} 2303}
2264 2304
2265/* 2305/*
@@ -2273,35 +2313,16 @@ static void con_work(struct work_struct *work)
2273 2313
2274 mutex_lock(&con->mutex); 2314 mutex_lock(&con->mutex);
2275restart: 2315restart:
2276 if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) { 2316 if (con_sock_closed(con))
2277 switch (con->state) {
2278 case CON_STATE_CONNECTING:
2279 con->error_msg = "connection failed";
2280 break;
2281 case CON_STATE_NEGOTIATING:
2282 con->error_msg = "negotiation failed";
2283 break;
2284 case CON_STATE_OPEN:
2285 con->error_msg = "socket closed";
2286 break;
2287 default:
2288 dout("unrecognized con state %d\n", (int)con->state);
2289 con->error_msg = "unrecognized con state";
2290 BUG();
2291 }
2292 goto fault; 2317 goto fault;
2293 }
2294 2318
2295 if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) { 2319 if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
2296 dout("con_work %p backing off\n", con); 2320 dout("con_work %p backing off\n", con);
2297 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2321 ret = queue_con_delay(con, round_jiffies_relative(con->delay));
2298 round_jiffies_relative(con->delay))) { 2322 if (ret) {
2299 dout("con_work %p backoff %lu\n", con, con->delay);
2300 mutex_unlock(&con->mutex);
2301 return;
2302 } else {
2303 dout("con_work %p FAILED to back off %lu\n", con, 2323 dout("con_work %p FAILED to back off %lu\n", con,
2304 con->delay); 2324 con->delay);
2325 BUG_ON(ret == -ENOENT);
2305 set_bit(CON_FLAG_BACKOFF, &con->flags); 2326 set_bit(CON_FLAG_BACKOFF, &con->flags);
2306 } 2327 }
2307 goto done; 2328 goto done;
@@ -2356,7 +2377,7 @@ fault:
2356static void ceph_fault(struct ceph_connection *con) 2377static void ceph_fault(struct ceph_connection *con)
2357 __releases(con->mutex) 2378 __releases(con->mutex)
2358{ 2379{
2359 pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 2380 pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
2360 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); 2381 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
2361 dout("fault %p state %lu to peer %s\n", 2382 dout("fault %p state %lu to peer %s\n",
2362 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); 2383 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
@@ -2398,24 +2419,8 @@ static void ceph_fault(struct ceph_connection *con)
2398 con->delay = BASE_DELAY_INTERVAL; 2419 con->delay = BASE_DELAY_INTERVAL;
2399 else if (con->delay < MAX_DELAY_INTERVAL) 2420 else if (con->delay < MAX_DELAY_INTERVAL)
2400 con->delay *= 2; 2421 con->delay *= 2;
2401 con->ops->get(con); 2422 set_bit(CON_FLAG_BACKOFF, &con->flags);
2402 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2423 queue_con(con);
2403 round_jiffies_relative(con->delay))) {
2404 dout("fault queued %p delay %lu\n", con, con->delay);
2405 } else {
2406 con->ops->put(con);
2407 dout("fault failed to queue %p delay %lu, backoff\n",
2408 con, con->delay);
2409 /*
2410 * In many cases we see a socket state change
2411 * while con_work is running and end up
2412 * queuing (non-delayed) work, such that we
2413 * can't backoff with a delay. Set a flag so
2414 * that when con_work restarts we schedule the
2415 * delay then.
2416 */
2417 set_bit(CON_FLAG_BACKOFF, &con->flags);
2418 }
2419 } 2424 }
2420 2425
2421out_unlock: 2426out_unlock:
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index c1d756cc744..780caf6b049 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -221,6 +221,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
221 kref_init(&req->r_kref); 221 kref_init(&req->r_kref);
222 init_completion(&req->r_completion); 222 init_completion(&req->r_completion);
223 init_completion(&req->r_safe_completion); 223 init_completion(&req->r_safe_completion);
224 RB_CLEAR_NODE(&req->r_node);
224 INIT_LIST_HEAD(&req->r_unsafe_item); 225 INIT_LIST_HEAD(&req->r_unsafe_item);
225 INIT_LIST_HEAD(&req->r_linger_item); 226 INIT_LIST_HEAD(&req->r_linger_item);
226 INIT_LIST_HEAD(&req->r_linger_osd); 227 INIT_LIST_HEAD(&req->r_linger_osd);
@@ -580,7 +581,7 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
580 581
581 dout("__kick_osd_requests osd%d\n", osd->o_osd); 582 dout("__kick_osd_requests osd%d\n", osd->o_osd);
582 err = __reset_osd(osdc, osd); 583 err = __reset_osd(osdc, osd);
583 if (err == -EAGAIN) 584 if (err)
584 return; 585 return;
585 586
586 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 587 list_for_each_entry(req, &osd->o_requests, r_osd_item) {
@@ -607,14 +608,6 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
607 } 608 }
608} 609}
609 610
610static void kick_osd_requests(struct ceph_osd_client *osdc,
611 struct ceph_osd *kickosd)
612{
613 mutex_lock(&osdc->request_mutex);
614 __kick_osd_requests(osdc, kickosd);
615 mutex_unlock(&osdc->request_mutex);
616}
617
618/* 611/*
619 * If the osd connection drops, we need to resubmit all requests. 612 * If the osd connection drops, we need to resubmit all requests.
620 */ 613 */
@@ -628,7 +621,9 @@ static void osd_reset(struct ceph_connection *con)
628 dout("osd_reset osd%d\n", osd->o_osd); 621 dout("osd_reset osd%d\n", osd->o_osd);
629 osdc = osd->o_osdc; 622 osdc = osd->o_osdc;
630 down_read(&osdc->map_sem); 623 down_read(&osdc->map_sem);
631 kick_osd_requests(osdc, osd); 624 mutex_lock(&osdc->request_mutex);
625 __kick_osd_requests(osdc, osd);
626 mutex_unlock(&osdc->request_mutex);
632 send_queued(osdc); 627 send_queued(osdc);
633 up_read(&osdc->map_sem); 628 up_read(&osdc->map_sem);
634} 629}
@@ -647,6 +642,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
647 atomic_set(&osd->o_ref, 1); 642 atomic_set(&osd->o_ref, 1);
648 osd->o_osdc = osdc; 643 osd->o_osdc = osdc;
649 osd->o_osd = onum; 644 osd->o_osd = onum;
645 RB_CLEAR_NODE(&osd->o_node);
650 INIT_LIST_HEAD(&osd->o_requests); 646 INIT_LIST_HEAD(&osd->o_requests);
651 INIT_LIST_HEAD(&osd->o_linger_requests); 647 INIT_LIST_HEAD(&osd->o_linger_requests);
652 INIT_LIST_HEAD(&osd->o_osd_lru); 648 INIT_LIST_HEAD(&osd->o_osd_lru);
@@ -750,6 +746,7 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
750 if (list_empty(&osd->o_requests) && 746 if (list_empty(&osd->o_requests) &&
751 list_empty(&osd->o_linger_requests)) { 747 list_empty(&osd->o_linger_requests)) {
752 __remove_osd(osdc, osd); 748 __remove_osd(osdc, osd);
749 ret = -ENODEV;
753 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 750 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
754 &osd->o_con.peer_addr, 751 &osd->o_con.peer_addr,
755 sizeof(osd->o_con.peer_addr)) == 0 && 752 sizeof(osd->o_con.peer_addr)) == 0 &&
@@ -876,9 +873,9 @@ static void __unregister_request(struct ceph_osd_client *osdc,
876 req->r_osd = NULL; 873 req->r_osd = NULL;
877 } 874 }
878 875
876 list_del_init(&req->r_req_lru_item);
879 ceph_osdc_put_request(req); 877 ceph_osdc_put_request(req);
880 878
881 list_del_init(&req->r_req_lru_item);
882 if (osdc->num_requests == 0) { 879 if (osdc->num_requests == 0) {
883 dout(" no requests, canceling timeout\n"); 880 dout(" no requests, canceling timeout\n");
884 __cancel_osd_timeout(osdc); 881 __cancel_osd_timeout(osdc);
@@ -910,8 +907,8 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
910 struct ceph_osd_request *req) 907 struct ceph_osd_request *req)
911{ 908{
912 dout("__unregister_linger_request %p\n", req); 909 dout("__unregister_linger_request %p\n", req);
910 list_del_init(&req->r_linger_item);
913 if (req->r_osd) { 911 if (req->r_osd) {
914 list_del_init(&req->r_linger_item);
915 list_del_init(&req->r_linger_osd); 912 list_del_init(&req->r_linger_osd);
916 913
917 if (list_empty(&req->r_osd->o_requests) && 914 if (list_empty(&req->r_osd->o_requests) &&
@@ -1090,12 +1087,10 @@ static void handle_timeout(struct work_struct *work)
1090{ 1087{
1091 struct ceph_osd_client *osdc = 1088 struct ceph_osd_client *osdc =
1092 container_of(work, struct ceph_osd_client, timeout_work.work); 1089 container_of(work, struct ceph_osd_client, timeout_work.work);
1093 struct ceph_osd_request *req, *last_req = NULL; 1090 struct ceph_osd_request *req;
1094 struct ceph_osd *osd; 1091 struct ceph_osd *osd;
1095 unsigned long timeout = osdc->client->options->osd_timeout * HZ;
1096 unsigned long keepalive = 1092 unsigned long keepalive =
1097 osdc->client->options->osd_keepalive_timeout * HZ; 1093 osdc->client->options->osd_keepalive_timeout * HZ;
1098 unsigned long last_stamp = 0;
1099 struct list_head slow_osds; 1094 struct list_head slow_osds;
1100 dout("timeout\n"); 1095 dout("timeout\n");
1101 down_read(&osdc->map_sem); 1096 down_read(&osdc->map_sem);
@@ -1105,37 +1100,6 @@ static void handle_timeout(struct work_struct *work)
1105 mutex_lock(&osdc->request_mutex); 1100 mutex_lock(&osdc->request_mutex);
1106 1101
1107 /* 1102 /*
1108 * reset osds that appear to be _really_ unresponsive. this
1109 * is a failsafe measure.. we really shouldn't be getting to
1110 * this point if the system is working properly. the monitors
1111 * should mark the osd as failed and we should find out about
1112 * it from an updated osd map.
1113 */
1114 while (timeout && !list_empty(&osdc->req_lru)) {
1115 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
1116 r_req_lru_item);
1117
1118 /* hasn't been long enough since we sent it? */
1119 if (time_before(jiffies, req->r_stamp + timeout))
1120 break;
1121
1122 /* hasn't been long enough since it was acked? */
1123 if (req->r_request->ack_stamp == 0 ||
1124 time_before(jiffies, req->r_request->ack_stamp + timeout))
1125 break;
1126
1127 BUG_ON(req == last_req && req->r_stamp == last_stamp);
1128 last_req = req;
1129 last_stamp = req->r_stamp;
1130
1131 osd = req->r_osd;
1132 BUG_ON(!osd);
1133 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
1134 req->r_tid, osd->o_osd);
1135 __kick_osd_requests(osdc, osd);
1136 }
1137
1138 /*
1139 * ping osds that are a bit slow. this ensures that if there 1103 * ping osds that are a bit slow. this ensures that if there
1140 * is a break in the TCP connection we will notice, and reopen 1104 * is a break in the TCP connection we will notice, and reopen
1141 * a connection with that osd (from the fault callback). 1105 * a connection with that osd (from the fault callback).
@@ -1364,8 +1328,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1364 1328
1365 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, 1329 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1366 req->r_osd ? req->r_osd->o_osd : -1); 1330 req->r_osd ? req->r_osd->o_osd : -1);
1367 __unregister_linger_request(osdc, req);
1368 __register_request(osdc, req); 1331 __register_request(osdc, req);
1332 __unregister_linger_request(osdc, req);
1369 } 1333 }
1370 mutex_unlock(&osdc->request_mutex); 1334 mutex_unlock(&osdc->request_mutex);
1371 1335
@@ -1599,6 +1563,7 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1599 event->data = data; 1563 event->data = data;
1600 event->osdc = osdc; 1564 event->osdc = osdc;
1601 INIT_LIST_HEAD(&event->osd_node); 1565 INIT_LIST_HEAD(&event->osd_node);
1566 RB_CLEAR_NODE(&event->node);
1602 kref_init(&event->kref); /* one ref for us */ 1567 kref_init(&event->kref); /* one ref for us */
1603 kref_get(&event->kref); /* one ref for the caller */ 1568 kref_get(&event->kref); /* one ref for the caller */
1604 init_completion(&event->completion); 1569 init_completion(&event->completion);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 5433fb0eb3c..de73214b5d2 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -469,6 +469,22 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id)
469 return NULL; 469 return NULL;
470} 470}
471 471
472const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
473{
474 struct ceph_pg_pool_info *pi;
475
476 if (id == CEPH_NOPOOL)
477 return NULL;
478
479 if (WARN_ON_ONCE(id > (u64) INT_MAX))
480 return NULL;
481
482 pi = __lookup_pg_pool(&map->pg_pools, (int) id);
483
484 return pi ? pi->name : NULL;
485}
486EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
487
472int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name) 488int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
473{ 489{
474 struct rb_node *rbp; 490 struct rb_node *rbp;
@@ -645,10 +661,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
645 ceph_decode_32_safe(p, end, max, bad); 661 ceph_decode_32_safe(p, end, max, bad);
646 while (max--) { 662 while (max--) {
647 ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad); 663 ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad);
664 err = -ENOMEM;
648 pi = kzalloc(sizeof(*pi), GFP_NOFS); 665 pi = kzalloc(sizeof(*pi), GFP_NOFS);
649 if (!pi) 666 if (!pi)
650 goto bad; 667 goto bad;
651 pi->id = ceph_decode_32(p); 668 pi->id = ceph_decode_32(p);
669 err = -EINVAL;
652 ev = ceph_decode_8(p); /* encoding version */ 670 ev = ceph_decode_8(p); /* encoding version */
653 if (ev > CEPH_PG_POOL_VERSION) { 671 if (ev > CEPH_PG_POOL_VERSION) {
654 pr_warning("got unknown v %d > %d of ceph_pg_pool\n", 672 pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
@@ -664,8 +682,13 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
664 __insert_pg_pool(&map->pg_pools, pi); 682 __insert_pg_pool(&map->pg_pools, pi);
665 } 683 }
666 684
667 if (version >= 5 && __decode_pool_names(p, end, map) < 0) 685 if (version >= 5) {
668 goto bad; 686 err = __decode_pool_names(p, end, map);
687 if (err < 0) {
688 dout("fail to decode pool names");
689 goto bad;
690 }
691 }
669 692
670 ceph_decode_32_safe(p, end, map->pool_max, bad); 693 ceph_decode_32_safe(p, end, map->pool_max, bad);
671 694
@@ -745,7 +768,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
745 return map; 768 return map;
746 769
747bad: 770bad:
748 dout("osdmap_decode fail\n"); 771 dout("osdmap_decode fail err %d\n", err);
749 ceph_osdmap_destroy(map); 772 ceph_osdmap_destroy(map);
750 return ERR_PTR(err); 773 return ERR_PTR(err);
751} 774}
@@ -839,6 +862,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
839 if (ev > CEPH_PG_POOL_VERSION) { 862 if (ev > CEPH_PG_POOL_VERSION) {
840 pr_warning("got unknown v %d > %d of ceph_pg_pool\n", 863 pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
841 ev, CEPH_PG_POOL_VERSION); 864 ev, CEPH_PG_POOL_VERSION);
865 err = -EINVAL;
842 goto bad; 866 goto bad;
843 } 867 }
844 pi = __lookup_pg_pool(&map->pg_pools, pool); 868 pi = __lookup_pg_pool(&map->pg_pools, pool);
@@ -855,8 +879,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
855 if (err < 0) 879 if (err < 0)
856 goto bad; 880 goto bad;
857 } 881 }
858 if (version >= 5 && __decode_pool_names(p, end, map) < 0) 882 if (version >= 5) {
859 goto bad; 883 err = __decode_pool_names(p, end, map);
884 if (err < 0)
885 goto bad;
886 }
860 887
861 /* old_pool */ 888 /* old_pool */
862 ceph_decode_32_safe(p, end, len, bad); 889 ceph_decode_32_safe(p, end, len, bad);
@@ -932,15 +959,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
932 (void) __remove_pg_mapping(&map->pg_temp, pgid); 959 (void) __remove_pg_mapping(&map->pg_temp, pgid);
933 960
934 /* insert */ 961 /* insert */
935 if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) { 962 err = -EINVAL;
936 err = -EINVAL; 963 if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
937 goto bad; 964 goto bad;
938 } 965 err = -ENOMEM;
939 pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); 966 pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
940 if (!pg) { 967 if (!pg)
941 err = -ENOMEM;
942 goto bad; 968 goto bad;
943 }
944 pg->pgid = pgid; 969 pg->pgid = pgid;
945 pg->len = pglen; 970 pg->len = pglen;
946 for (j = 0; j < pglen; j++) 971 for (j = 0; j < pglen; j++)