aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Documentation/ABI/testing/sysfs-bus-rbd4
-rw-r--r--drivers/block/rbd.c1389
-rw-r--r--drivers/block/rbd_types.h2
-rw-r--r--fs/ceph/addr.c60
-rw-r--r--fs/ceph/caps.c18
-rw-r--r--fs/ceph/file.c73
-rw-r--r--fs/ceph/inode.c15
-rw-r--r--fs/ceph/mds_client.c11
-rw-r--r--fs/ceph/super.c4
-rw-r--r--include/linux/backing-dev.h1
-rw-r--r--include/linux/ceph/libceph.h2
-rw-r--r--include/linux/ceph/osdmap.h1
-rw-r--r--include/linux/ceph/rados.h2
-rw-r--r--net/ceph/ceph_common.c3
-rw-r--r--net/ceph/messenger.c107
-rw-r--r--net/ceph/osd_client.c59
-rw-r--r--net/ceph/osdmap.c47
17 files changed, 1192 insertions, 606 deletions
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index 1cf2adf46b11..cd9213ccf3dc 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -70,6 +70,10 @@ snap_*
70 70
71 A directory per each snapshot 71 A directory per each snapshot
72 72
73parent
74
75 Information identifying the pool, image, and snapshot id for
76 the parent image in a layered rbd image (format 2 only).
73 77
74Entries under /sys/bus/rbd/devices/<dev-id>/snap_<snap-name> 78Entries under /sys/bus/rbd/devices/<dev-id>/snap_<snap-name>
75------------------------------------------------------------- 79-------------------------------------------------------------
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index bb3d9be3b1b4..89576a0b3f2e 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -61,15 +61,29 @@
61 61
62#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ 62#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63 63
64#define RBD_MAX_SNAP_NAME_LEN 32 64#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
65#define RBD_MAX_SNAP_NAME_LEN \
66 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67
65#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ 68#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
66#define RBD_MAX_OPT_LEN 1024 69#define RBD_MAX_OPT_LEN 1024
67 70
68#define RBD_SNAP_HEAD_NAME "-" 71#define RBD_SNAP_HEAD_NAME "-"
69 72
73/* This allows a single page to hold an image name sent by OSD */
74#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
70#define RBD_IMAGE_ID_LEN_MAX 64 75#define RBD_IMAGE_ID_LEN_MAX 64
76
71#define RBD_OBJ_PREFIX_LEN_MAX 64 77#define RBD_OBJ_PREFIX_LEN_MAX 64
72 78
79/* Feature bits */
80
81#define RBD_FEATURE_LAYERING 1
82
83/* Features supported by this (client software) implementation. */
84
85#define RBD_FEATURES_ALL (0)
86
73/* 87/*
74 * An RBD device name will be "rbd#", where the "rbd" comes from 88 * An RBD device name will be "rbd#", where the "rbd" comes from
75 * RBD_DRV_NAME above, and # is a unique integer identifier. 89 * RBD_DRV_NAME above, and # is a unique integer identifier.
@@ -101,6 +115,27 @@ struct rbd_image_header {
101 u64 obj_version; 115 u64 obj_version;
102}; 116};
103 117
118/*
119 * An rbd image specification.
120 *
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122 * identify an image.
123 */
124struct rbd_spec {
125 u64 pool_id;
126 char *pool_name;
127
128 char *image_id;
129 size_t image_id_len;
130 char *image_name;
131 size_t image_name_len;
132
133 u64 snap_id;
134 char *snap_name;
135
136 struct kref kref;
137};
138
104struct rbd_options { 139struct rbd_options {
105 bool read_only; 140 bool read_only;
106}; 141};
@@ -155,11 +190,8 @@ struct rbd_snap {
155}; 190};
156 191
157struct rbd_mapping { 192struct rbd_mapping {
158 char *snap_name;
159 u64 snap_id;
160 u64 size; 193 u64 size;
161 u64 features; 194 u64 features;
162 bool snap_exists;
163 bool read_only; 195 bool read_only;
164}; 196};
165 197
@@ -173,7 +205,6 @@ struct rbd_device {
173 struct gendisk *disk; /* blkdev's gendisk and rq */ 205 struct gendisk *disk; /* blkdev's gendisk and rq */
174 206
175 u32 image_format; /* Either 1 or 2 */ 207 u32 image_format; /* Either 1 or 2 */
176 struct rbd_options rbd_opts;
177 struct rbd_client *rbd_client; 208 struct rbd_client *rbd_client;
178 209
179 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 210 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
@@ -181,17 +212,17 @@ struct rbd_device {
181 spinlock_t lock; /* queue lock */ 212 spinlock_t lock; /* queue lock */
182 213
183 struct rbd_image_header header; 214 struct rbd_image_header header;
184 char *image_id; 215 bool exists;
185 size_t image_id_len; 216 struct rbd_spec *spec;
186 char *image_name; 217
187 size_t image_name_len;
188 char *header_name; 218 char *header_name;
189 char *pool_name;
190 int pool_id;
191 219
192 struct ceph_osd_event *watch_event; 220 struct ceph_osd_event *watch_event;
193 struct ceph_osd_request *watch_request; 221 struct ceph_osd_request *watch_request;
194 222
223 struct rbd_spec *parent_spec;
224 u64 parent_overlap;
225
195 /* protects updating the header */ 226 /* protects updating the header */
196 struct rw_semaphore header_rwsem; 227 struct rw_semaphore header_rwsem;
197 228
@@ -204,6 +235,7 @@ struct rbd_device {
204 235
205 /* sysfs related */ 236 /* sysfs related */
206 struct device dev; 237 struct device dev;
238 unsigned long open_count;
207}; 239};
208 240
209static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ 241static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
@@ -218,7 +250,7 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
218static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); 250static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
219 251
220static void rbd_dev_release(struct device *dev); 252static void rbd_dev_release(struct device *dev);
221static void __rbd_remove_snap_dev(struct rbd_snap *snap); 253static void rbd_remove_snap_dev(struct rbd_snap *snap);
222 254
223static ssize_t rbd_add(struct bus_type *bus, const char *buf, 255static ssize_t rbd_add(struct bus_type *bus, const char *buf,
224 size_t count); 256 size_t count);
@@ -258,17 +290,8 @@ static struct device rbd_root_dev = {
258# define rbd_assert(expr) ((void) 0) 290# define rbd_assert(expr) ((void) 0)
259#endif /* !RBD_DEBUG */ 291#endif /* !RBD_DEBUG */
260 292
261static struct device *rbd_get_dev(struct rbd_device *rbd_dev) 293static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
262{ 294static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
263 return get_device(&rbd_dev->dev);
264}
265
266static void rbd_put_dev(struct rbd_device *rbd_dev)
267{
268 put_device(&rbd_dev->dev);
269}
270
271static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
272 295
273static int rbd_open(struct block_device *bdev, fmode_t mode) 296static int rbd_open(struct block_device *bdev, fmode_t mode)
274{ 297{
@@ -277,8 +300,11 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
277 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only) 300 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
278 return -EROFS; 301 return -EROFS;
279 302
280 rbd_get_dev(rbd_dev); 303 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
304 (void) get_device(&rbd_dev->dev);
281 set_device_ro(bdev, rbd_dev->mapping.read_only); 305 set_device_ro(bdev, rbd_dev->mapping.read_only);
306 rbd_dev->open_count++;
307 mutex_unlock(&ctl_mutex);
282 308
283 return 0; 309 return 0;
284} 310}
@@ -287,7 +313,11 @@ static int rbd_release(struct gendisk *disk, fmode_t mode)
287{ 313{
288 struct rbd_device *rbd_dev = disk->private_data; 314 struct rbd_device *rbd_dev = disk->private_data;
289 315
290 rbd_put_dev(rbd_dev); 316 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
317 rbd_assert(rbd_dev->open_count > 0);
318 rbd_dev->open_count--;
319 put_device(&rbd_dev->dev);
320 mutex_unlock(&ctl_mutex);
291 321
292 return 0; 322 return 0;
293} 323}
@@ -388,7 +418,7 @@ enum {
388static match_table_t rbd_opts_tokens = { 418static match_table_t rbd_opts_tokens = {
389 /* int args above */ 419 /* int args above */
390 /* string args above */ 420 /* string args above */
391 {Opt_read_only, "mapping.read_only"}, 421 {Opt_read_only, "read_only"},
392 {Opt_read_only, "ro"}, /* Alternate spelling */ 422 {Opt_read_only, "ro"}, /* Alternate spelling */
393 {Opt_read_write, "read_write"}, 423 {Opt_read_write, "read_write"},
394 {Opt_read_write, "rw"}, /* Alternate spelling */ 424 {Opt_read_write, "rw"}, /* Alternate spelling */
@@ -441,33 +471,17 @@ static int parse_rbd_opts_token(char *c, void *private)
441 * Get a ceph client with specific addr and configuration, if one does 471 * Get a ceph client with specific addr and configuration, if one does
442 * not exist create it. 472 * not exist create it.
443 */ 473 */
444static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, 474static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
445 size_t mon_addr_len, char *options)
446{ 475{
447 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
448 struct ceph_options *ceph_opts;
449 struct rbd_client *rbdc; 476 struct rbd_client *rbdc;
450 477
451 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
452
453 ceph_opts = ceph_parse_options(options, mon_addr,
454 mon_addr + mon_addr_len,
455 parse_rbd_opts_token, rbd_opts);
456 if (IS_ERR(ceph_opts))
457 return PTR_ERR(ceph_opts);
458
459 rbdc = rbd_client_find(ceph_opts); 478 rbdc = rbd_client_find(ceph_opts);
460 if (rbdc) { 479 if (rbdc) /* using an existing client */
461 /* using an existing client */
462 ceph_destroy_options(ceph_opts); 480 ceph_destroy_options(ceph_opts);
463 } else { 481 else
464 rbdc = rbd_client_create(ceph_opts); 482 rbdc = rbd_client_create(ceph_opts);
465 if (IS_ERR(rbdc))
466 return PTR_ERR(rbdc);
467 }
468 rbd_dev->rbd_client = rbdc;
469 483
470 return 0; 484 return rbdc;
471} 485}
472 486
473/* 487/*
@@ -492,10 +506,10 @@ static void rbd_client_release(struct kref *kref)
492 * Drop reference to ceph client node. If it's not referenced anymore, release 506 * Drop reference to ceph client node. If it's not referenced anymore, release
493 * it. 507 * it.
494 */ 508 */
495static void rbd_put_client(struct rbd_device *rbd_dev) 509static void rbd_put_client(struct rbd_client *rbdc)
496{ 510{
497 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release); 511 if (rbdc)
498 rbd_dev->rbd_client = NULL; 512 kref_put(&rbdc->kref, rbd_client_release);
499} 513}
500 514
501/* 515/*
@@ -524,6 +538,16 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
524 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) 538 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
525 return false; 539 return false;
526 540
541 /* The bio layer requires at least sector-sized I/O */
542
543 if (ondisk->options.order < SECTOR_SHIFT)
544 return false;
545
546 /* If we use u64 in a few spots we may be able to loosen this */
547
548 if (ondisk->options.order > 8 * sizeof (int) - 1)
549 return false;
550
527 /* 551 /*
528 * The size of a snapshot header has to fit in a size_t, and 552 * The size of a snapshot header has to fit in a size_t, and
529 * that limits the number of snapshots. 553 * that limits the number of snapshots.
@@ -635,6 +659,20 @@ out_err:
635 return -ENOMEM; 659 return -ENOMEM;
636} 660}
637 661
662static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
663{
664 struct rbd_snap *snap;
665
666 if (snap_id == CEPH_NOSNAP)
667 return RBD_SNAP_HEAD_NAME;
668
669 list_for_each_entry(snap, &rbd_dev->snaps, node)
670 if (snap_id == snap->id)
671 return snap->name;
672
673 return NULL;
674}
675
638static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name) 676static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
639{ 677{
640 678
@@ -642,7 +680,7 @@ static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
642 680
643 list_for_each_entry(snap, &rbd_dev->snaps, node) { 681 list_for_each_entry(snap, &rbd_dev->snaps, node) {
644 if (!strcmp(snap_name, snap->name)) { 682 if (!strcmp(snap_name, snap->name)) {
645 rbd_dev->mapping.snap_id = snap->id; 683 rbd_dev->spec->snap_id = snap->id;
646 rbd_dev->mapping.size = snap->size; 684 rbd_dev->mapping.size = snap->size;
647 rbd_dev->mapping.features = snap->features; 685 rbd_dev->mapping.features = snap->features;
648 686
@@ -653,26 +691,23 @@ static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
653 return -ENOENT; 691 return -ENOENT;
654} 692}
655 693
656static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name) 694static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
657{ 695{
658 int ret; 696 int ret;
659 697
660 if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME, 698 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
661 sizeof (RBD_SNAP_HEAD_NAME))) { 699 sizeof (RBD_SNAP_HEAD_NAME))) {
662 rbd_dev->mapping.snap_id = CEPH_NOSNAP; 700 rbd_dev->spec->snap_id = CEPH_NOSNAP;
663 rbd_dev->mapping.size = rbd_dev->header.image_size; 701 rbd_dev->mapping.size = rbd_dev->header.image_size;
664 rbd_dev->mapping.features = rbd_dev->header.features; 702 rbd_dev->mapping.features = rbd_dev->header.features;
665 rbd_dev->mapping.snap_exists = false;
666 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
667 ret = 0; 703 ret = 0;
668 } else { 704 } else {
669 ret = snap_by_name(rbd_dev, snap_name); 705 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
670 if (ret < 0) 706 if (ret < 0)
671 goto done; 707 goto done;
672 rbd_dev->mapping.snap_exists = true;
673 rbd_dev->mapping.read_only = true; 708 rbd_dev->mapping.read_only = true;
674 } 709 }
675 rbd_dev->mapping.snap_name = snap_name; 710 rbd_dev->exists = true;
676done: 711done:
677 return ret; 712 return ret;
678} 713}
@@ -695,13 +730,13 @@ static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
695 u64 segment; 730 u64 segment;
696 int ret; 731 int ret;
697 732
698 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO); 733 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
699 if (!name) 734 if (!name)
700 return NULL; 735 return NULL;
701 segment = offset >> rbd_dev->header.obj_order; 736 segment = offset >> rbd_dev->header.obj_order;
702 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx", 737 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
703 rbd_dev->header.object_prefix, segment); 738 rbd_dev->header.object_prefix, segment);
704 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) { 739 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
705 pr_err("error formatting segment name for #%llu (%d)\n", 740 pr_err("error formatting segment name for #%llu (%d)\n",
706 segment, ret); 741 segment, ret);
707 kfree(name); 742 kfree(name);
@@ -800,77 +835,144 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
800} 835}
801 836
802/* 837/*
803 * bio_chain_clone - clone a chain of bios up to a certain length. 838 * Clone a portion of a bio, starting at the given byte offset
804 * might return a bio_pair that will need to be released. 839 * and continuing for the number of bytes indicated.
805 */ 840 */
806static struct bio *bio_chain_clone(struct bio **old, struct bio **next, 841static struct bio *bio_clone_range(struct bio *bio_src,
807 struct bio_pair **bp, 842 unsigned int offset,
808 int len, gfp_t gfpmask) 843 unsigned int len,
809{ 844 gfp_t gfpmask)
810 struct bio *old_chain = *old; 845{
811 struct bio *new_chain = NULL; 846 struct bio_vec *bv;
812 struct bio *tail; 847 unsigned int resid;
813 int total = 0; 848 unsigned short idx;
814 849 unsigned int voff;
815 if (*bp) { 850 unsigned short end_idx;
816 bio_pair_release(*bp); 851 unsigned short vcnt;
817 *bp = NULL; 852 struct bio *bio;
818 }
819 853
820 while (old_chain && (total < len)) { 854 /* Handle the easy case for the caller */
821 struct bio *tmp;
822 855
823 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs); 856 if (!offset && len == bio_src->bi_size)
824 if (!tmp) 857 return bio_clone(bio_src, gfpmask);
825 goto err_out;
826 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
827 858
828 if (total + old_chain->bi_size > len) { 859 if (WARN_ON_ONCE(!len))
829 struct bio_pair *bp; 860 return NULL;
861 if (WARN_ON_ONCE(len > bio_src->bi_size))
862 return NULL;
863 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
864 return NULL;
830 865
831 /* 866 /* Find first affected segment... */
832 * this split can only happen with a single paged bio,
833 * split_bio will BUG_ON if this is not the case
834 */
835 dout("bio_chain_clone split! total=%d remaining=%d"
836 "bi_size=%u\n",
837 total, len - total, old_chain->bi_size);
838 867
839 /* split the bio. We'll release it either in the next 868 resid = offset;
840 call, or it will have to be released outside */ 869 __bio_for_each_segment(bv, bio_src, idx, 0) {
841 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE); 870 if (resid < bv->bv_len)
842 if (!bp) 871 break;
843 goto err_out; 872 resid -= bv->bv_len;
873 }
874 voff = resid;
844 875
845 __bio_clone(tmp, &bp->bio1); 876 /* ...and the last affected segment */
846 877
847 *next = &bp->bio2; 878 resid += len;
848 } else { 879 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
849 __bio_clone(tmp, old_chain); 880 if (resid <= bv->bv_len)
850 *next = old_chain->bi_next; 881 break;
851 } 882 resid -= bv->bv_len;
883 }
884 vcnt = end_idx - idx + 1;
885
886 /* Build the clone */
852 887
853 tmp->bi_bdev = NULL; 888 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
854 tmp->bi_next = NULL; 889 if (!bio)
855 if (new_chain) 890 return NULL; /* ENOMEM */
856 tail->bi_next = tmp;
857 else
858 new_chain = tmp;
859 tail = tmp;
860 old_chain = old_chain->bi_next;
861 891
862 total += tmp->bi_size; 892 bio->bi_bdev = bio_src->bi_bdev;
893 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
894 bio->bi_rw = bio_src->bi_rw;
895 bio->bi_flags |= 1 << BIO_CLONED;
896
897 /*
898 * Copy over our part of the bio_vec, then update the first
899 * and last (or only) entries.
900 */
901 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
902 vcnt * sizeof (struct bio_vec));
903 bio->bi_io_vec[0].bv_offset += voff;
904 if (vcnt > 1) {
905 bio->bi_io_vec[0].bv_len -= voff;
906 bio->bi_io_vec[vcnt - 1].bv_len = resid;
907 } else {
908 bio->bi_io_vec[0].bv_len = len;
863 } 909 }
864 910
865 rbd_assert(total == len); 911 bio->bi_vcnt = vcnt;
912 bio->bi_size = len;
913 bio->bi_idx = 0;
914
915 return bio;
916}
917
918/*
919 * Clone a portion of a bio chain, starting at the given byte offset
920 * into the first bio in the source chain and continuing for the
921 * number of bytes indicated. The result is another bio chain of
922 * exactly the given length, or a null pointer on error.
923 *
924 * The bio_src and offset parameters are both in-out. On entry they
925 * refer to the first source bio and the offset into that bio where
926 * the start of data to be cloned is located.
927 *
928 * On return, bio_src is updated to refer to the bio in the source
929 * chain that contains first un-cloned byte, and *offset will
930 * contain the offset of that byte within that bio.
931 */
932static struct bio *bio_chain_clone_range(struct bio **bio_src,
933 unsigned int *offset,
934 unsigned int len,
935 gfp_t gfpmask)
936{
937 struct bio *bi = *bio_src;
938 unsigned int off = *offset;
939 struct bio *chain = NULL;
940 struct bio **end;
941
942 /* Build up a chain of clone bios up to the limit */
943
944 if (!bi || off >= bi->bi_size || !len)
945 return NULL; /* Nothing to clone */
866 946
867 *old = old_chain; 947 end = &chain;
948 while (len) {
949 unsigned int bi_size;
950 struct bio *bio;
951
952 if (!bi)
953 goto out_err; /* EINVAL; ran out of bio's */
954 bi_size = min_t(unsigned int, bi->bi_size - off, len);
955 bio = bio_clone_range(bi, off, bi_size, gfpmask);
956 if (!bio)
957 goto out_err; /* ENOMEM */
958
959 *end = bio;
960 end = &bio->bi_next;
961
962 off += bi_size;
963 if (off == bi->bi_size) {
964 bi = bi->bi_next;
965 off = 0;
966 }
967 len -= bi_size;
968 }
969 *bio_src = bi;
970 *offset = off;
868 971
869 return new_chain; 972 return chain;
973out_err:
974 bio_chain_put(chain);
870 975
871err_out:
872 dout("bio_chain_clone with err\n");
873 bio_chain_put(new_chain);
874 return NULL; 976 return NULL;
875} 977}
876 978
@@ -988,8 +1090,9 @@ static int rbd_do_request(struct request *rq,
988 req_data->coll_index = coll_index; 1090 req_data->coll_index = coll_index;
989 } 1091 }
990 1092
991 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name, 1093 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
992 (unsigned long long) ofs, (unsigned long long) len); 1094 object_name, (unsigned long long) ofs,
1095 (unsigned long long) len, coll, coll_index);
993 1096
994 osdc = &rbd_dev->rbd_client->client->osdc; 1097 osdc = &rbd_dev->rbd_client->client->osdc;
995 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops, 1098 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
@@ -1019,7 +1122,7 @@ static int rbd_do_request(struct request *rq,
1019 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 1122 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1020 layout->fl_stripe_count = cpu_to_le32(1); 1123 layout->fl_stripe_count = cpu_to_le32(1);
1021 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 1124 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1022 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id); 1125 layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
1023 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno, 1126 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1024 req, ops); 1127 req, ops);
1025 rbd_assert(ret == 0); 1128 rbd_assert(ret == 0);
@@ -1154,8 +1257,6 @@ done:
1154static int rbd_do_op(struct request *rq, 1257static int rbd_do_op(struct request *rq,
1155 struct rbd_device *rbd_dev, 1258 struct rbd_device *rbd_dev,
1156 struct ceph_snap_context *snapc, 1259 struct ceph_snap_context *snapc,
1157 u64 snapid,
1158 int opcode, int flags,
1159 u64 ofs, u64 len, 1260 u64 ofs, u64 len,
1160 struct bio *bio, 1261 struct bio *bio,
1161 struct rbd_req_coll *coll, 1262 struct rbd_req_coll *coll,
@@ -1167,6 +1268,9 @@ static int rbd_do_op(struct request *rq,
1167 int ret; 1268 int ret;
1168 struct ceph_osd_req_op *ops; 1269 struct ceph_osd_req_op *ops;
1169 u32 payload_len; 1270 u32 payload_len;
1271 int opcode;
1272 int flags;
1273 u64 snapid;
1170 1274
1171 seg_name = rbd_segment_name(rbd_dev, ofs); 1275 seg_name = rbd_segment_name(rbd_dev, ofs);
1172 if (!seg_name) 1276 if (!seg_name)
@@ -1174,7 +1278,18 @@ static int rbd_do_op(struct request *rq,
1174 seg_len = rbd_segment_length(rbd_dev, ofs, len); 1278 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1175 seg_ofs = rbd_segment_offset(rbd_dev, ofs); 1279 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1176 1280
1177 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0); 1281 if (rq_data_dir(rq) == WRITE) {
1282 opcode = CEPH_OSD_OP_WRITE;
1283 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1284 snapid = CEPH_NOSNAP;
1285 payload_len = seg_len;
1286 } else {
1287 opcode = CEPH_OSD_OP_READ;
1288 flags = CEPH_OSD_FLAG_READ;
1289 snapc = NULL;
1290 snapid = rbd_dev->spec->snap_id;
1291 payload_len = 0;
1292 }
1178 1293
1179 ret = -ENOMEM; 1294 ret = -ENOMEM;
1180 ops = rbd_create_rw_ops(1, opcode, payload_len); 1295 ops = rbd_create_rw_ops(1, opcode, payload_len);
@@ -1202,41 +1317,6 @@ done:
1202} 1317}
1203 1318
1204/* 1319/*
1205 * Request async osd write
1206 */
1207static int rbd_req_write(struct request *rq,
1208 struct rbd_device *rbd_dev,
1209 struct ceph_snap_context *snapc,
1210 u64 ofs, u64 len,
1211 struct bio *bio,
1212 struct rbd_req_coll *coll,
1213 int coll_index)
1214{
1215 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1216 CEPH_OSD_OP_WRITE,
1217 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1218 ofs, len, bio, coll, coll_index);
1219}
1220
1221/*
1222 * Request async osd read
1223 */
1224static int rbd_req_read(struct request *rq,
1225 struct rbd_device *rbd_dev,
1226 u64 snapid,
1227 u64 ofs, u64 len,
1228 struct bio *bio,
1229 struct rbd_req_coll *coll,
1230 int coll_index)
1231{
1232 return rbd_do_op(rq, rbd_dev, NULL,
1233 snapid,
1234 CEPH_OSD_OP_READ,
1235 CEPH_OSD_FLAG_READ,
1236 ofs, len, bio, coll, coll_index);
1237}
1238
1239/*
1240 * Request sync osd read 1320 * Request sync osd read
1241 */ 1321 */
1242static int rbd_req_sync_read(struct rbd_device *rbd_dev, 1322static int rbd_req_sync_read(struct rbd_device *rbd_dev,
@@ -1304,7 +1384,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1304 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n", 1384 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1305 rbd_dev->header_name, (unsigned long long) notify_id, 1385 rbd_dev->header_name, (unsigned long long) notify_id,
1306 (unsigned int) opcode); 1386 (unsigned int) opcode);
1307 rc = rbd_refresh_header(rbd_dev, &hver); 1387 rc = rbd_dev_refresh(rbd_dev, &hver);
1308 if (rc) 1388 if (rc)
1309 pr_warning(RBD_DRV_NAME "%d got notification but failed to " 1389 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1310 " update snaps: %d\n", rbd_dev->major, rc); 1390 " update snaps: %d\n", rbd_dev->major, rc);
@@ -1460,18 +1540,16 @@ static void rbd_rq_fn(struct request_queue *q)
1460{ 1540{
1461 struct rbd_device *rbd_dev = q->queuedata; 1541 struct rbd_device *rbd_dev = q->queuedata;
1462 struct request *rq; 1542 struct request *rq;
1463 struct bio_pair *bp = NULL;
1464 1543
1465 while ((rq = blk_fetch_request(q))) { 1544 while ((rq = blk_fetch_request(q))) {
1466 struct bio *bio; 1545 struct bio *bio;
1467 struct bio *rq_bio, *next_bio = NULL;
1468 bool do_write; 1546 bool do_write;
1469 unsigned int size; 1547 unsigned int size;
1470 u64 op_size = 0;
1471 u64 ofs; 1548 u64 ofs;
1472 int num_segs, cur_seg = 0; 1549 int num_segs, cur_seg = 0;
1473 struct rbd_req_coll *coll; 1550 struct rbd_req_coll *coll;
1474 struct ceph_snap_context *snapc; 1551 struct ceph_snap_context *snapc;
1552 unsigned int bio_offset;
1475 1553
1476 dout("fetched request\n"); 1554 dout("fetched request\n");
1477 1555
@@ -1483,10 +1561,6 @@ static void rbd_rq_fn(struct request_queue *q)
1483 1561
1484 /* deduce our operation (read, write) */ 1562 /* deduce our operation (read, write) */
1485 do_write = (rq_data_dir(rq) == WRITE); 1563 do_write = (rq_data_dir(rq) == WRITE);
1486
1487 size = blk_rq_bytes(rq);
1488 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1489 rq_bio = rq->bio;
1490 if (do_write && rbd_dev->mapping.read_only) { 1564 if (do_write && rbd_dev->mapping.read_only) {
1491 __blk_end_request_all(rq, -EROFS); 1565 __blk_end_request_all(rq, -EROFS);
1492 continue; 1566 continue;
@@ -1496,8 +1570,8 @@ static void rbd_rq_fn(struct request_queue *q)
1496 1570
1497 down_read(&rbd_dev->header_rwsem); 1571 down_read(&rbd_dev->header_rwsem);
1498 1572
1499 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP && 1573 if (!rbd_dev->exists) {
1500 !rbd_dev->mapping.snap_exists) { 1574 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1501 up_read(&rbd_dev->header_rwsem); 1575 up_read(&rbd_dev->header_rwsem);
1502 dout("request for non-existent snapshot"); 1576 dout("request for non-existent snapshot");
1503 spin_lock_irq(q->queue_lock); 1577 spin_lock_irq(q->queue_lock);
@@ -1509,6 +1583,10 @@ static void rbd_rq_fn(struct request_queue *q)
1509 1583
1510 up_read(&rbd_dev->header_rwsem); 1584 up_read(&rbd_dev->header_rwsem);
1511 1585
1586 size = blk_rq_bytes(rq);
1587 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1588 bio = rq->bio;
1589
1512 dout("%s 0x%x bytes at 0x%llx\n", 1590 dout("%s 0x%x bytes at 0x%llx\n",
1513 do_write ? "write" : "read", 1591 do_write ? "write" : "read",
1514 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE); 1592 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
@@ -1528,45 +1606,37 @@ static void rbd_rq_fn(struct request_queue *q)
1528 continue; 1606 continue;
1529 } 1607 }
1530 1608
1609 bio_offset = 0;
1531 do { 1610 do {
1532 /* a bio clone to be passed down to OSD req */ 1611 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1612 unsigned int chain_size;
1613 struct bio *bio_chain;
1614
1615 BUG_ON(limit > (u64) UINT_MAX);
1616 chain_size = (unsigned int) limit;
1533 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt); 1617 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1534 op_size = rbd_segment_length(rbd_dev, ofs, size); 1618
1535 kref_get(&coll->kref); 1619 kref_get(&coll->kref);
1536 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1537 op_size, GFP_ATOMIC);
1538 if (!bio) {
1539 rbd_coll_end_req_index(rq, coll, cur_seg,
1540 -ENOMEM, op_size);
1541 goto next_seg;
1542 }
1543 1620
1621 /* Pass a cloned bio chain via an osd request */
1544 1622
1545 /* init OSD command: write or read */ 1623 bio_chain = bio_chain_clone_range(&bio,
1546 if (do_write) 1624 &bio_offset, chain_size,
1547 rbd_req_write(rq, rbd_dev, 1625 GFP_ATOMIC);
1548 snapc, 1626 if (bio_chain)
1549 ofs, 1627 (void) rbd_do_op(rq, rbd_dev, snapc,
1550 op_size, bio, 1628 ofs, chain_size,
1551 coll, cur_seg); 1629 bio_chain, coll, cur_seg);
1552 else 1630 else
1553 rbd_req_read(rq, rbd_dev, 1631 rbd_coll_end_req_index(rq, coll, cur_seg,
1554 rbd_dev->mapping.snap_id, 1632 -ENOMEM, chain_size);
1555 ofs, 1633 size -= chain_size;
1556 op_size, bio, 1634 ofs += chain_size;
1557 coll, cur_seg);
1558
1559next_seg:
1560 size -= op_size;
1561 ofs += op_size;
1562 1635
1563 cur_seg++; 1636 cur_seg++;
1564 rq_bio = next_bio;
1565 } while (size > 0); 1637 } while (size > 0);
1566 kref_put(&coll->kref, rbd_coll_release); 1638 kref_put(&coll->kref, rbd_coll_release);
1567 1639
1568 if (bp)
1569 bio_pair_release(bp);
1570 spin_lock_irq(q->queue_lock); 1640 spin_lock_irq(q->queue_lock);
1571 1641
1572 ceph_put_snap_context(snapc); 1642 ceph_put_snap_context(snapc);
@@ -1576,28 +1646,47 @@ next_seg:
1576/* 1646/*
1577 * a queue callback. Makes sure that we don't create a bio that spans across 1647 * a queue callback. Makes sure that we don't create a bio that spans across
1578 * multiple osd objects. One exception would be with a single page bios, 1648 * multiple osd objects. One exception would be with a single page bios,
1579 * which we handle later at bio_chain_clone 1649 * which we handle later at bio_chain_clone_range()
1580 */ 1650 */
1581static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, 1651static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1582 struct bio_vec *bvec) 1652 struct bio_vec *bvec)
1583{ 1653{
1584 struct rbd_device *rbd_dev = q->queuedata; 1654 struct rbd_device *rbd_dev = q->queuedata;
1585 unsigned int chunk_sectors; 1655 sector_t sector_offset;
1586 sector_t sector; 1656 sector_t sectors_per_obj;
1587 unsigned int bio_sectors; 1657 sector_t obj_sector_offset;
1588 int max; 1658 int ret;
1589 1659
1590 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT); 1660 /*
1591 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev); 1661 * Find how far into its rbd object the partition-relative
1592 bio_sectors = bmd->bi_size >> SECTOR_SHIFT; 1662 * bio start sector is to offset relative to the enclosing
1663 * device.
1664 */
1665 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1666 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1667 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1668
1669 /*
1670 * Compute the number of bytes from that offset to the end
1671 * of the object. Account for what's already used by the bio.
1672 */
1673 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1674 if (ret > bmd->bi_size)
1675 ret -= bmd->bi_size;
1676 else
1677 ret = 0;
1593 1678
1594 max = (chunk_sectors - ((sector & (chunk_sectors - 1)) 1679 /*
1595 + bio_sectors)) << SECTOR_SHIFT; 1680 * Don't send back more than was asked for. And if the bio
1596 if (max < 0) 1681 * was empty, let the whole thing through because: "Note
1597 max = 0; /* bio_add cannot handle a negative return */ 1682 * that a block device *must* allow a single page to be
1598 if (max <= bvec->bv_len && bio_sectors == 0) 1683 * added to an empty bio."
1599 return bvec->bv_len; 1684 */
1600 return max; 1685 rbd_assert(bvec->bv_len <= PAGE_SIZE);
1686 if (ret > (int) bvec->bv_len || !bmd->bi_size)
1687 ret = (int) bvec->bv_len;
1688
1689 return ret;
1601} 1690}
1602 1691
1603static void rbd_free_disk(struct rbd_device *rbd_dev) 1692static void rbd_free_disk(struct rbd_device *rbd_dev)
@@ -1663,13 +1752,13 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1663 ret = -ENXIO; 1752 ret = -ENXIO;
1664 pr_warning("short header read for image %s" 1753 pr_warning("short header read for image %s"
1665 " (want %zd got %d)\n", 1754 " (want %zd got %d)\n",
1666 rbd_dev->image_name, size, ret); 1755 rbd_dev->spec->image_name, size, ret);
1667 goto out_err; 1756 goto out_err;
1668 } 1757 }
1669 if (!rbd_dev_ondisk_valid(ondisk)) { 1758 if (!rbd_dev_ondisk_valid(ondisk)) {
1670 ret = -ENXIO; 1759 ret = -ENXIO;
1671 pr_warning("invalid header for image %s\n", 1760 pr_warning("invalid header for image %s\n",
1672 rbd_dev->image_name); 1761 rbd_dev->spec->image_name);
1673 goto out_err; 1762 goto out_err;
1674 } 1763 }
1675 1764
@@ -1707,19 +1796,32 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
1707 return ret; 1796 return ret;
1708} 1797}
1709 1798
1710static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev) 1799static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1711{ 1800{
1712 struct rbd_snap *snap; 1801 struct rbd_snap *snap;
1713 struct rbd_snap *next; 1802 struct rbd_snap *next;
1714 1803
1715 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) 1804 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1716 __rbd_remove_snap_dev(snap); 1805 rbd_remove_snap_dev(snap);
1806}
1807
1808static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1809{
1810 sector_t size;
1811
1812 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
1813 return;
1814
1815 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1816 dout("setting size to %llu sectors", (unsigned long long) size);
1817 rbd_dev->mapping.size = (u64) size;
1818 set_capacity(rbd_dev->disk, size);
1717} 1819}
1718 1820
1719/* 1821/*
1720 * only read the first part of the ondisk header, without the snaps info 1822 * only read the first part of the ondisk header, without the snaps info
1721 */ 1823 */
1722static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver) 1824static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1723{ 1825{
1724 int ret; 1826 int ret;
1725 struct rbd_image_header h; 1827 struct rbd_image_header h;
@@ -1730,17 +1832,9 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1730 1832
1731 down_write(&rbd_dev->header_rwsem); 1833 down_write(&rbd_dev->header_rwsem);
1732 1834
1733 /* resized? */ 1835 /* Update image size, and check for resize of mapped image */
1734 if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) { 1836 rbd_dev->header.image_size = h.image_size;
1735 sector_t size = (sector_t) h.image_size / SECTOR_SIZE; 1837 rbd_update_mapping_size(rbd_dev);
1736
1737 if (size != (sector_t) rbd_dev->mapping.size) {
1738 dout("setting size to %llu sectors",
1739 (unsigned long long) size);
1740 rbd_dev->mapping.size = (u64) size;
1741 set_capacity(rbd_dev->disk, size);
1742 }
1743 }
1744 1838
1745 /* rbd_dev->header.object_prefix shouldn't change */ 1839 /* rbd_dev->header.object_prefix shouldn't change */
1746 kfree(rbd_dev->header.snap_sizes); 1840 kfree(rbd_dev->header.snap_sizes);
@@ -1768,12 +1862,16 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1768 return ret; 1862 return ret;
1769} 1863}
1770 1864
1771static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver) 1865static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1772{ 1866{
1773 int ret; 1867 int ret;
1774 1868
1869 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1775 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1870 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1776 ret = __rbd_refresh_header(rbd_dev, hver); 1871 if (rbd_dev->image_format == 1)
1872 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1873 else
1874 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1777 mutex_unlock(&ctl_mutex); 1875 mutex_unlock(&ctl_mutex);
1778 1876
1779 return ret; 1877 return ret;
@@ -1885,7 +1983,7 @@ static ssize_t rbd_pool_show(struct device *dev,
1885{ 1983{
1886 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1984 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1887 1985
1888 return sprintf(buf, "%s\n", rbd_dev->pool_name); 1986 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
1889} 1987}
1890 1988
1891static ssize_t rbd_pool_id_show(struct device *dev, 1989static ssize_t rbd_pool_id_show(struct device *dev,
@@ -1893,7 +1991,8 @@ static ssize_t rbd_pool_id_show(struct device *dev,
1893{ 1991{
1894 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1992 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1895 1993
1896 return sprintf(buf, "%d\n", rbd_dev->pool_id); 1994 return sprintf(buf, "%llu\n",
1995 (unsigned long long) rbd_dev->spec->pool_id);
1897} 1996}
1898 1997
1899static ssize_t rbd_name_show(struct device *dev, 1998static ssize_t rbd_name_show(struct device *dev,
@@ -1901,7 +2000,10 @@ static ssize_t rbd_name_show(struct device *dev,
1901{ 2000{
1902 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2001 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1903 2002
1904 return sprintf(buf, "%s\n", rbd_dev->image_name); 2003 if (rbd_dev->spec->image_name)
2004 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2005
2006 return sprintf(buf, "(unknown)\n");
1905} 2007}
1906 2008
1907static ssize_t rbd_image_id_show(struct device *dev, 2009static ssize_t rbd_image_id_show(struct device *dev,
@@ -1909,7 +2011,7 @@ static ssize_t rbd_image_id_show(struct device *dev,
1909{ 2011{
1910 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2012 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1911 2013
1912 return sprintf(buf, "%s\n", rbd_dev->image_id); 2014 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
1913} 2015}
1914 2016
1915/* 2017/*
@@ -1922,7 +2024,50 @@ static ssize_t rbd_snap_show(struct device *dev,
1922{ 2024{
1923 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2025 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1924 2026
1925 return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name); 2027 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2028}
2029
2030/*
2031 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2032 * for the parent image. If there is no parent, simply shows
2033 * "(no parent image)".
2034 */
2035static ssize_t rbd_parent_show(struct device *dev,
2036 struct device_attribute *attr,
2037 char *buf)
2038{
2039 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2040 struct rbd_spec *spec = rbd_dev->parent_spec;
2041 int count;
2042 char *bufp = buf;
2043
2044 if (!spec)
2045 return sprintf(buf, "(no parent image)\n");
2046
2047 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2048 (unsigned long long) spec->pool_id, spec->pool_name);
2049 if (count < 0)
2050 return count;
2051 bufp += count;
2052
2053 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2054 spec->image_name ? spec->image_name : "(unknown)");
2055 if (count < 0)
2056 return count;
2057 bufp += count;
2058
2059 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2060 (unsigned long long) spec->snap_id, spec->snap_name);
2061 if (count < 0)
2062 return count;
2063 bufp += count;
2064
2065 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2066 if (count < 0)
2067 return count;
2068 bufp += count;
2069
2070 return (ssize_t) (bufp - buf);
1926} 2071}
1927 2072
1928static ssize_t rbd_image_refresh(struct device *dev, 2073static ssize_t rbd_image_refresh(struct device *dev,
@@ -1933,7 +2078,7 @@ static ssize_t rbd_image_refresh(struct device *dev,
1933 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2078 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1934 int ret; 2079 int ret;
1935 2080
1936 ret = rbd_refresh_header(rbd_dev, NULL); 2081 ret = rbd_dev_refresh(rbd_dev, NULL);
1937 2082
1938 return ret < 0 ? ret : size; 2083 return ret < 0 ? ret : size;
1939} 2084}
@@ -1948,6 +2093,7 @@ static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1948static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); 2093static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1949static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); 2094static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1950static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); 2095static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2096static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
1951 2097
1952static struct attribute *rbd_attrs[] = { 2098static struct attribute *rbd_attrs[] = {
1953 &dev_attr_size.attr, 2099 &dev_attr_size.attr,
@@ -1959,6 +2105,7 @@ static struct attribute *rbd_attrs[] = {
1959 &dev_attr_name.attr, 2105 &dev_attr_name.attr,
1960 &dev_attr_image_id.attr, 2106 &dev_attr_image_id.attr,
1961 &dev_attr_current_snap.attr, 2107 &dev_attr_current_snap.attr,
2108 &dev_attr_parent.attr,
1962 &dev_attr_refresh.attr, 2109 &dev_attr_refresh.attr,
1963 NULL 2110 NULL
1964}; 2111};
@@ -2047,6 +2194,74 @@ static struct device_type rbd_snap_device_type = {
2047 .release = rbd_snap_dev_release, 2194 .release = rbd_snap_dev_release,
2048}; 2195};
2049 2196
2197static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2198{
2199 kref_get(&spec->kref);
2200
2201 return spec;
2202}
2203
2204static void rbd_spec_free(struct kref *kref);
2205static void rbd_spec_put(struct rbd_spec *spec)
2206{
2207 if (spec)
2208 kref_put(&spec->kref, rbd_spec_free);
2209}
2210
2211static struct rbd_spec *rbd_spec_alloc(void)
2212{
2213 struct rbd_spec *spec;
2214
2215 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2216 if (!spec)
2217 return NULL;
2218 kref_init(&spec->kref);
2219
2220 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2221
2222 return spec;
2223}
2224
2225static void rbd_spec_free(struct kref *kref)
2226{
2227 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2228
2229 kfree(spec->pool_name);
2230 kfree(spec->image_id);
2231 kfree(spec->image_name);
2232 kfree(spec->snap_name);
2233 kfree(spec);
2234}
2235
2236struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2237 struct rbd_spec *spec)
2238{
2239 struct rbd_device *rbd_dev;
2240
2241 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2242 if (!rbd_dev)
2243 return NULL;
2244
2245 spin_lock_init(&rbd_dev->lock);
2246 INIT_LIST_HEAD(&rbd_dev->node);
2247 INIT_LIST_HEAD(&rbd_dev->snaps);
2248 init_rwsem(&rbd_dev->header_rwsem);
2249
2250 rbd_dev->spec = spec;
2251 rbd_dev->rbd_client = rbdc;
2252
2253 return rbd_dev;
2254}
2255
2256static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2257{
2258 rbd_spec_put(rbd_dev->parent_spec);
2259 kfree(rbd_dev->header_name);
2260 rbd_put_client(rbd_dev->rbd_client);
2261 rbd_spec_put(rbd_dev->spec);
2262 kfree(rbd_dev);
2263}
2264
2050static bool rbd_snap_registered(struct rbd_snap *snap) 2265static bool rbd_snap_registered(struct rbd_snap *snap)
2051{ 2266{
2052 bool ret = snap->dev.type == &rbd_snap_device_type; 2267 bool ret = snap->dev.type == &rbd_snap_device_type;
@@ -2057,7 +2272,7 @@ static bool rbd_snap_registered(struct rbd_snap *snap)
2057 return ret; 2272 return ret;
2058} 2273}
2059 2274
2060static void __rbd_remove_snap_dev(struct rbd_snap *snap) 2275static void rbd_remove_snap_dev(struct rbd_snap *snap)
2061{ 2276{
2062 list_del(&snap->node); 2277 list_del(&snap->node);
2063 if (device_is_registered(&snap->dev)) 2278 if (device_is_registered(&snap->dev))
@@ -2073,7 +2288,7 @@ static int rbd_register_snap_dev(struct rbd_snap *snap,
2073 dev->type = &rbd_snap_device_type; 2288 dev->type = &rbd_snap_device_type;
2074 dev->parent = parent; 2289 dev->parent = parent;
2075 dev->release = rbd_snap_dev_release; 2290 dev->release = rbd_snap_dev_release;
2076 dev_set_name(dev, "snap_%s", snap->name); 2291 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2077 dout("%s: registering device for snapshot %s\n", __func__, snap->name); 2292 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2078 2293
2079 ret = device_register(dev); 2294 ret = device_register(dev);
@@ -2189,6 +2404,7 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2189 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2404 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2190 if (ret < 0) 2405 if (ret < 0)
2191 goto out; 2406 goto out;
2407 ret = 0; /* rbd_req_sync_exec() can return positive */
2192 2408
2193 p = reply_buf; 2409 p = reply_buf;
2194 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, 2410 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
@@ -2216,6 +2432,7 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2216 __le64 features; 2432 __le64 features;
2217 __le64 incompat; 2433 __le64 incompat;
2218 } features_buf = { 0 }; 2434 } features_buf = { 0 };
2435 u64 incompat;
2219 int ret; 2436 int ret;
2220 2437
2221 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, 2438 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
@@ -2226,6 +2443,11 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2226 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 2443 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2227 if (ret < 0) 2444 if (ret < 0)
2228 return ret; 2445 return ret;
2446
2447 incompat = le64_to_cpu(features_buf.incompat);
2448 if (incompat & ~RBD_FEATURES_ALL)
2449 return -ENXIO;
2450
2229 *snap_features = le64_to_cpu(features_buf.features); 2451 *snap_features = le64_to_cpu(features_buf.features);
2230 2452
2231 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", 2453 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
@@ -2242,6 +2464,183 @@ static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2242 &rbd_dev->header.features); 2464 &rbd_dev->header.features);
2243} 2465}
2244 2466
2467static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2468{
2469 struct rbd_spec *parent_spec;
2470 size_t size;
2471 void *reply_buf = NULL;
2472 __le64 snapid;
2473 void *p;
2474 void *end;
2475 char *image_id;
2476 u64 overlap;
2477 size_t len = 0;
2478 int ret;
2479
2480 parent_spec = rbd_spec_alloc();
2481 if (!parent_spec)
2482 return -ENOMEM;
2483
2484 size = sizeof (__le64) + /* pool_id */
2485 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2486 sizeof (__le64) + /* snap_id */
2487 sizeof (__le64); /* overlap */
2488 reply_buf = kmalloc(size, GFP_KERNEL);
2489 if (!reply_buf) {
2490 ret = -ENOMEM;
2491 goto out_err;
2492 }
2493
2494 snapid = cpu_to_le64(CEPH_NOSNAP);
2495 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2496 "rbd", "get_parent",
2497 (char *) &snapid, sizeof (snapid),
2498 (char *) reply_buf, size,
2499 CEPH_OSD_FLAG_READ, NULL);
2500 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2501 if (ret < 0)
2502 goto out_err;
2503
2504 ret = -ERANGE;
2505 p = reply_buf;
2506 end = (char *) reply_buf + size;
2507 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2508 if (parent_spec->pool_id == CEPH_NOPOOL)
2509 goto out; /* No parent? No problem. */
2510
2511 image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2512 if (IS_ERR(image_id)) {
2513 ret = PTR_ERR(image_id);
2514 goto out_err;
2515 }
2516 parent_spec->image_id = image_id;
2517 parent_spec->image_id_len = len;
2518 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2519 ceph_decode_64_safe(&p, end, overlap, out_err);
2520
2521 rbd_dev->parent_overlap = overlap;
2522 rbd_dev->parent_spec = parent_spec;
2523 parent_spec = NULL; /* rbd_dev now owns this */
2524out:
2525 ret = 0;
2526out_err:
2527 kfree(reply_buf);
2528 rbd_spec_put(parent_spec);
2529
2530 return ret;
2531}
2532
2533static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2534{
2535 size_t image_id_size;
2536 char *image_id;
2537 void *p;
2538 void *end;
2539 size_t size;
2540 void *reply_buf = NULL;
2541 size_t len = 0;
2542 char *image_name = NULL;
2543 int ret;
2544
2545 rbd_assert(!rbd_dev->spec->image_name);
2546
2547 image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len;
2548 image_id = kmalloc(image_id_size, GFP_KERNEL);
2549 if (!image_id)
2550 return NULL;
2551
2552 p = image_id;
2553 end = (char *) image_id + image_id_size;
2554 ceph_encode_string(&p, end, rbd_dev->spec->image_id,
2555 (u32) rbd_dev->spec->image_id_len);
2556
2557 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2558 reply_buf = kmalloc(size, GFP_KERNEL);
2559 if (!reply_buf)
2560 goto out;
2561
2562 ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
2563 "rbd", "dir_get_name",
2564 image_id, image_id_size,
2565 (char *) reply_buf, size,
2566 CEPH_OSD_FLAG_READ, NULL);
2567 if (ret < 0)
2568 goto out;
2569 p = reply_buf;
2570 end = (char *) reply_buf + size;
2571 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2572 if (IS_ERR(image_name))
2573 image_name = NULL;
2574 else
2575 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2576out:
2577 kfree(reply_buf);
2578 kfree(image_id);
2579
2580 return image_name;
2581}
2582
2583/*
2584 * When a parent image gets probed, we only have the pool, image,
2585 * and snapshot ids but not the names of any of them. This call
2586 * is made later to fill in those names. It has to be done after
2587 * rbd_dev_snaps_update() has completed because some of the
2588 * information (in particular, snapshot name) is not available
2589 * until then.
2590 */
2591static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2592{
2593 struct ceph_osd_client *osdc;
2594 const char *name;
2595 void *reply_buf = NULL;
2596 int ret;
2597
2598 if (rbd_dev->spec->pool_name)
2599 return 0; /* Already have the names */
2600
2601 /* Look up the pool name */
2602
2603 osdc = &rbd_dev->rbd_client->client->osdc;
2604 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
2605 if (!name)
2606 return -EIO; /* pool id too large (>= 2^31) */
2607
2608 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
2609 if (!rbd_dev->spec->pool_name)
2610 return -ENOMEM;
2611
2612 /* Fetch the image name; tolerate failure here */
2613
2614 name = rbd_dev_image_name(rbd_dev);
2615 if (name) {
2616 rbd_dev->spec->image_name_len = strlen(name);
2617 rbd_dev->spec->image_name = (char *) name;
2618 } else {
2619 pr_warning(RBD_DRV_NAME "%d "
2620 "unable to get image name for image id %s\n",
2621 rbd_dev->major, rbd_dev->spec->image_id);
2622 }
2623
2624 /* Look up the snapshot name. */
2625
2626 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
2627 if (!name) {
2628 ret = -EIO;
2629 goto out_err;
2630 }
2631 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
2632 if(!rbd_dev->spec->snap_name)
2633 goto out_err;
2634
2635 return 0;
2636out_err:
2637 kfree(reply_buf);
2638 kfree(rbd_dev->spec->pool_name);
2639 rbd_dev->spec->pool_name = NULL;
2640
2641 return ret;
2642}
2643
2245static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver) 2644static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2246{ 2645{
2247 size_t size; 2646 size_t size;
@@ -2328,7 +2727,6 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2328 int ret; 2727 int ret;
2329 void *p; 2728 void *p;
2330 void *end; 2729 void *end;
2331 size_t snap_name_len;
2332 char *snap_name; 2730 char *snap_name;
2333 2731
2334 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN; 2732 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
@@ -2348,9 +2746,7 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2348 2746
2349 p = reply_buf; 2747 p = reply_buf;
2350 end = (char *) reply_buf + size; 2748 end = (char *) reply_buf + size;
2351 snap_name_len = 0; 2749 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2352 snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2353 GFP_KERNEL);
2354 if (IS_ERR(snap_name)) { 2750 if (IS_ERR(snap_name)) {
2355 ret = PTR_ERR(snap_name); 2751 ret = PTR_ERR(snap_name);
2356 goto out; 2752 goto out;
@@ -2397,6 +2793,41 @@ static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2397 return ERR_PTR(-EINVAL); 2793 return ERR_PTR(-EINVAL);
2398} 2794}
2399 2795
2796static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2797{
2798 int ret;
2799 __u8 obj_order;
2800
2801 down_write(&rbd_dev->header_rwsem);
2802
2803 /* Grab old order first, to see if it changes */
2804
2805 obj_order = rbd_dev->header.obj_order,
2806 ret = rbd_dev_v2_image_size(rbd_dev);
2807 if (ret)
2808 goto out;
2809 if (rbd_dev->header.obj_order != obj_order) {
2810 ret = -EIO;
2811 goto out;
2812 }
2813 rbd_update_mapping_size(rbd_dev);
2814
2815 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2816 dout("rbd_dev_v2_snap_context returned %d\n", ret);
2817 if (ret)
2818 goto out;
2819 ret = rbd_dev_snaps_update(rbd_dev);
2820 dout("rbd_dev_snaps_update returned %d\n", ret);
2821 if (ret)
2822 goto out;
2823 ret = rbd_dev_snaps_register(rbd_dev);
2824 dout("rbd_dev_snaps_register returned %d\n", ret);
2825out:
2826 up_write(&rbd_dev->header_rwsem);
2827
2828 return ret;
2829}
2830
2400/* 2831/*
2401 * Scan the rbd device's current snapshot list and compare it to the 2832 * Scan the rbd device's current snapshot list and compare it to the
2402 * newly-received snapshot context. Remove any existing snapshots 2833 * newly-received snapshot context. Remove any existing snapshots
@@ -2436,12 +2867,12 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2436 2867
2437 /* Existing snapshot not in the new snap context */ 2868 /* Existing snapshot not in the new snap context */
2438 2869
2439 if (rbd_dev->mapping.snap_id == snap->id) 2870 if (rbd_dev->spec->snap_id == snap->id)
2440 rbd_dev->mapping.snap_exists = false; 2871 rbd_dev->exists = false;
2441 __rbd_remove_snap_dev(snap); 2872 rbd_remove_snap_dev(snap);
2442 dout("%ssnap id %llu has been removed\n", 2873 dout("%ssnap id %llu has been removed\n",
2443 rbd_dev->mapping.snap_id == snap->id ? 2874 rbd_dev->spec->snap_id == snap->id ?
2444 "mapped " : "", 2875 "mapped " : "",
2445 (unsigned long long) snap->id); 2876 (unsigned long long) snap->id);
2446 2877
2447 /* Done with this list entry; advance */ 2878 /* Done with this list entry; advance */
@@ -2559,7 +2990,7 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2559 do { 2990 do {
2560 ret = rbd_req_sync_watch(rbd_dev); 2991 ret = rbd_req_sync_watch(rbd_dev);
2561 if (ret == -ERANGE) { 2992 if (ret == -ERANGE) {
2562 rc = rbd_refresh_header(rbd_dev, NULL); 2993 rc = rbd_dev_refresh(rbd_dev, NULL);
2563 if (rc < 0) 2994 if (rc < 0)
2564 return rc; 2995 return rc;
2565 } 2996 }
@@ -2621,8 +3052,8 @@ static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2621 struct rbd_device *rbd_dev; 3052 struct rbd_device *rbd_dev;
2622 3053
2623 rbd_dev = list_entry(tmp, struct rbd_device, node); 3054 rbd_dev = list_entry(tmp, struct rbd_device, node);
2624 if (rbd_id > max_id) 3055 if (rbd_dev->dev_id > max_id)
2625 max_id = rbd_id; 3056 max_id = rbd_dev->dev_id;
2626 } 3057 }
2627 spin_unlock(&rbd_dev_list_lock); 3058 spin_unlock(&rbd_dev_list_lock);
2628 3059
@@ -2722,73 +3153,140 @@ static inline char *dup_token(const char **buf, size_t *lenp)
2722} 3153}
2723 3154
2724/* 3155/*
2725 * This fills in the pool_name, image_name, image_name_len, rbd_dev, 3156 * Parse the options provided for an "rbd add" (i.e., rbd image
2726 * rbd_md_name, and name fields of the given rbd_dev, based on the 3157 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
2727 * list of monitor addresses and other options provided via 3158 * and the data written is passed here via a NUL-terminated buffer.
2728 * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated 3159 * Returns 0 if successful or an error code otherwise.
2729 * copy of the snapshot name to map if successful, or a 3160 *
2730 * pointer-coded error otherwise. 3161 * The information extracted from these options is recorded in
3162 * the other parameters which return dynamically-allocated
3163 * structures:
3164 * ceph_opts
3165 * The address of a pointer that will refer to a ceph options
3166 * structure. Caller must release the returned pointer using
3167 * ceph_destroy_options() when it is no longer needed.
3168 * rbd_opts
3169 * Address of an rbd options pointer. Fully initialized by
3170 * this function; caller must release with kfree().
3171 * spec
3172 * Address of an rbd image specification pointer. Fully
3173 * initialized by this function based on parsed options.
3174 * Caller must release with rbd_spec_put().
2731 * 3175 *
2732 * Note: rbd_dev is assumed to have been initially zero-filled. 3176 * The options passed take this form:
3177 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3178 * where:
3179 * <mon_addrs>
3180 * A comma-separated list of one or more monitor addresses.
3181 * A monitor address is an ip address, optionally followed
3182 * by a port number (separated by a colon).
3183 * I.e.: ip1[:port1][,ip2[:port2]...]
3184 * <options>
3185 * A comma-separated list of ceph and/or rbd options.
3186 * <pool_name>
3187 * The name of the rados pool containing the rbd image.
3188 * <image_name>
3189 * The name of the image in that pool to map.
3190 * <snap_id>
3191 * An optional snapshot id. If provided, the mapping will
3192 * present data from the image at the time that snapshot was
3193 * created. The image head is used if no snapshot id is
3194 * provided. Snapshot mappings are always read-only.
2733 */ 3195 */
2734static char *rbd_add_parse_args(struct rbd_device *rbd_dev, 3196static int rbd_add_parse_args(const char *buf,
2735 const char *buf, 3197 struct ceph_options **ceph_opts,
2736 const char **mon_addrs, 3198 struct rbd_options **opts,
2737 size_t *mon_addrs_size, 3199 struct rbd_spec **rbd_spec)
2738 char *options,
2739 size_t options_size)
2740{ 3200{
2741 size_t len; 3201 size_t len;
2742 char *err_ptr = ERR_PTR(-EINVAL); 3202 char *options;
2743 char *snap_name; 3203 const char *mon_addrs;
3204 size_t mon_addrs_size;
3205 struct rbd_spec *spec = NULL;
3206 struct rbd_options *rbd_opts = NULL;
3207 struct ceph_options *copts;
3208 int ret;
2744 3209
2745 /* The first four tokens are required */ 3210 /* The first four tokens are required */
2746 3211
2747 len = next_token(&buf); 3212 len = next_token(&buf);
2748 if (!len) 3213 if (!len)
2749 return err_ptr; 3214 return -EINVAL; /* Missing monitor address(es) */
2750 *mon_addrs_size = len + 1; 3215 mon_addrs = buf;
2751 *mon_addrs = buf; 3216 mon_addrs_size = len + 1;
2752
2753 buf += len; 3217 buf += len;
2754 3218
2755 len = copy_token(&buf, options, options_size); 3219 ret = -EINVAL;
2756 if (!len || len >= options_size) 3220 options = dup_token(&buf, NULL);
2757 return err_ptr; 3221 if (!options)
3222 return -ENOMEM;
3223 if (!*options)
3224 goto out_err; /* Missing options */
2758 3225
2759 err_ptr = ERR_PTR(-ENOMEM); 3226 spec = rbd_spec_alloc();
2760 rbd_dev->pool_name = dup_token(&buf, NULL); 3227 if (!spec)
2761 if (!rbd_dev->pool_name) 3228 goto out_mem;
2762 goto out_err;
2763 3229
2764 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len); 3230 spec->pool_name = dup_token(&buf, NULL);
2765 if (!rbd_dev->image_name) 3231 if (!spec->pool_name)
2766 goto out_err; 3232 goto out_mem;
3233 if (!*spec->pool_name)
3234 goto out_err; /* Missing pool name */
2767 3235
2768 /* Snapshot name is optional */ 3236 spec->image_name = dup_token(&buf, &spec->image_name_len);
3237 if (!spec->image_name)
3238 goto out_mem;
3239 if (!*spec->image_name)
3240 goto out_err; /* Missing image name */
3241
3242 /*
3243 * Snapshot name is optional; default is to use "-"
3244 * (indicating the head/no snapshot).
3245 */
2769 len = next_token(&buf); 3246 len = next_token(&buf);
2770 if (!len) { 3247 if (!len) {
2771 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */ 3248 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2772 len = sizeof (RBD_SNAP_HEAD_NAME) - 1; 3249 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2773 } 3250 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
2774 snap_name = kmalloc(len + 1, GFP_KERNEL); 3251 ret = -ENAMETOOLONG;
2775 if (!snap_name)
2776 goto out_err; 3252 goto out_err;
2777 memcpy(snap_name, buf, len); 3253 }
2778 *(snap_name + len) = '\0'; 3254 spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
3255 if (!spec->snap_name)
3256 goto out_mem;
3257 memcpy(spec->snap_name, buf, len);
3258 *(spec->snap_name + len) = '\0';
2779 3259
2780dout(" SNAP_NAME is <%s>, len is %zd\n", snap_name, len); 3260 /* Initialize all rbd options to the defaults */
2781 3261
2782 return snap_name; 3262 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3263 if (!rbd_opts)
3264 goto out_mem;
3265
3266 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3267
3268 copts = ceph_parse_options(options, mon_addrs,
3269 mon_addrs + mon_addrs_size - 1,
3270 parse_rbd_opts_token, rbd_opts);
3271 if (IS_ERR(copts)) {
3272 ret = PTR_ERR(copts);
3273 goto out_err;
3274 }
3275 kfree(options);
2783 3276
3277 *ceph_opts = copts;
3278 *opts = rbd_opts;
3279 *rbd_spec = spec;
3280
3281 return 0;
3282out_mem:
3283 ret = -ENOMEM;
2784out_err: 3284out_err:
2785 kfree(rbd_dev->image_name); 3285 kfree(rbd_opts);
2786 rbd_dev->image_name = NULL; 3286 rbd_spec_put(spec);
2787 rbd_dev->image_name_len = 0; 3287 kfree(options);
2788 kfree(rbd_dev->pool_name);
2789 rbd_dev->pool_name = NULL;
2790 3288
2791 return err_ptr; 3289 return ret;
2792} 3290}
2793 3291
2794/* 3292/*
@@ -2814,14 +3312,22 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2814 void *p; 3312 void *p;
2815 3313
2816 /* 3314 /*
3315 * When probing a parent image, the image id is already
3316 * known (and the image name likely is not). There's no
3317 * need to fetch the image id again in this case.
3318 */
3319 if (rbd_dev->spec->image_id)
3320 return 0;
3321
3322 /*
2817 * First, see if the format 2 image id file exists, and if 3323 * First, see if the format 2 image id file exists, and if
2818 * so, get the image's persistent id from it. 3324 * so, get the image's persistent id from it.
2819 */ 3325 */
2820 size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len; 3326 size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
2821 object_name = kmalloc(size, GFP_NOIO); 3327 object_name = kmalloc(size, GFP_NOIO);
2822 if (!object_name) 3328 if (!object_name)
2823 return -ENOMEM; 3329 return -ENOMEM;
2824 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name); 3330 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
2825 dout("rbd id object name is %s\n", object_name); 3331 dout("rbd id object name is %s\n", object_name);
2826 3332
2827 /* Response will be an encoded string, which includes a length */ 3333 /* Response will be an encoded string, which includes a length */
@@ -2841,17 +3347,18 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2841 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); 3347 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2842 if (ret < 0) 3348 if (ret < 0)
2843 goto out; 3349 goto out;
3350 ret = 0; /* rbd_req_sync_exec() can return positive */
2844 3351
2845 p = response; 3352 p = response;
2846 rbd_dev->image_id = ceph_extract_encoded_string(&p, 3353 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
2847 p + RBD_IMAGE_ID_LEN_MAX, 3354 p + RBD_IMAGE_ID_LEN_MAX,
2848 &rbd_dev->image_id_len, 3355 &rbd_dev->spec->image_id_len,
2849 GFP_NOIO); 3356 GFP_NOIO);
2850 if (IS_ERR(rbd_dev->image_id)) { 3357 if (IS_ERR(rbd_dev->spec->image_id)) {
2851 ret = PTR_ERR(rbd_dev->image_id); 3358 ret = PTR_ERR(rbd_dev->spec->image_id);
2852 rbd_dev->image_id = NULL; 3359 rbd_dev->spec->image_id = NULL;
2853 } else { 3360 } else {
2854 dout("image_id is %s\n", rbd_dev->image_id); 3361 dout("image_id is %s\n", rbd_dev->spec->image_id);
2855 } 3362 }
2856out: 3363out:
2857 kfree(response); 3364 kfree(response);
@@ -2867,26 +3374,33 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2867 3374
2868 /* Version 1 images have no id; empty string is used */ 3375 /* Version 1 images have no id; empty string is used */
2869 3376
2870 rbd_dev->image_id = kstrdup("", GFP_KERNEL); 3377 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
2871 if (!rbd_dev->image_id) 3378 if (!rbd_dev->spec->image_id)
2872 return -ENOMEM; 3379 return -ENOMEM;
2873 rbd_dev->image_id_len = 0; 3380 rbd_dev->spec->image_id_len = 0;
2874 3381
2875 /* Record the header object name for this rbd image. */ 3382 /* Record the header object name for this rbd image. */
2876 3383
2877 size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX); 3384 size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
2878 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 3385 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2879 if (!rbd_dev->header_name) { 3386 if (!rbd_dev->header_name) {
2880 ret = -ENOMEM; 3387 ret = -ENOMEM;
2881 goto out_err; 3388 goto out_err;
2882 } 3389 }
2883 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX); 3390 sprintf(rbd_dev->header_name, "%s%s",
3391 rbd_dev->spec->image_name, RBD_SUFFIX);
2884 3392
2885 /* Populate rbd image metadata */ 3393 /* Populate rbd image metadata */
2886 3394
2887 ret = rbd_read_header(rbd_dev, &rbd_dev->header); 3395 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2888 if (ret < 0) 3396 if (ret < 0)
2889 goto out_err; 3397 goto out_err;
3398
3399 /* Version 1 images have no parent (no layering) */
3400
3401 rbd_dev->parent_spec = NULL;
3402 rbd_dev->parent_overlap = 0;
3403
2890 rbd_dev->image_format = 1; 3404 rbd_dev->image_format = 1;
2891 3405
2892 dout("discovered version 1 image, header name is %s\n", 3406 dout("discovered version 1 image, header name is %s\n",
@@ -2897,8 +3411,8 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2897out_err: 3411out_err:
2898 kfree(rbd_dev->header_name); 3412 kfree(rbd_dev->header_name);
2899 rbd_dev->header_name = NULL; 3413 rbd_dev->header_name = NULL;
2900 kfree(rbd_dev->image_id); 3414 kfree(rbd_dev->spec->image_id);
2901 rbd_dev->image_id = NULL; 3415 rbd_dev->spec->image_id = NULL;
2902 3416
2903 return ret; 3417 return ret;
2904} 3418}
@@ -2913,12 +3427,12 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2913 * Image id was filled in by the caller. Record the header 3427 * Image id was filled in by the caller. Record the header
2914 * object name for this rbd image. 3428 * object name for this rbd image.
2915 */ 3429 */
2916 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len; 3430 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
2917 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 3431 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2918 if (!rbd_dev->header_name) 3432 if (!rbd_dev->header_name)
2919 return -ENOMEM; 3433 return -ENOMEM;
2920 sprintf(rbd_dev->header_name, "%s%s", 3434 sprintf(rbd_dev->header_name, "%s%s",
2921 RBD_HEADER_PREFIX, rbd_dev->image_id); 3435 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
2922 3436
2923 /* Get the size and object order for the image */ 3437 /* Get the size and object order for the image */
2924 3438
@@ -2932,12 +3446,20 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2932 if (ret < 0) 3446 if (ret < 0)
2933 goto out_err; 3447 goto out_err;
2934 3448
2935 /* Get the features for the image */ 3449 /* Get the and check features for the image */
2936 3450
2937 ret = rbd_dev_v2_features(rbd_dev); 3451 ret = rbd_dev_v2_features(rbd_dev);
2938 if (ret < 0) 3452 if (ret < 0)
2939 goto out_err; 3453 goto out_err;
2940 3454
3455 /* If the image supports layering, get the parent info */
3456
3457 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3458 ret = rbd_dev_v2_parent_info(rbd_dev);
3459 if (ret < 0)
3460 goto out_err;
3461 }
3462
2941 /* crypto and compression type aren't (yet) supported for v2 images */ 3463 /* crypto and compression type aren't (yet) supported for v2 images */
2942 3464
2943 rbd_dev->header.crypt_type = 0; 3465 rbd_dev->header.crypt_type = 0;
@@ -2955,8 +3477,11 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2955 dout("discovered version 2 image, header name is %s\n", 3477 dout("discovered version 2 image, header name is %s\n",
2956 rbd_dev->header_name); 3478 rbd_dev->header_name);
2957 3479
2958 return -ENOTSUPP; 3480 return 0;
2959out_err: 3481out_err:
3482 rbd_dev->parent_overlap = 0;
3483 rbd_spec_put(rbd_dev->parent_spec);
3484 rbd_dev->parent_spec = NULL;
2960 kfree(rbd_dev->header_name); 3485 kfree(rbd_dev->header_name);
2961 rbd_dev->header_name = NULL; 3486 rbd_dev->header_name = NULL;
2962 kfree(rbd_dev->header.object_prefix); 3487 kfree(rbd_dev->header.object_prefix);
@@ -2965,91 +3490,22 @@ out_err:
2965 return ret; 3490 return ret;
2966} 3491}
2967 3492
2968/* 3493static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
2969 * Probe for the existence of the header object for the given rbd
2970 * device. For format 2 images this includes determining the image
2971 * id.
2972 */
2973static int rbd_dev_probe(struct rbd_device *rbd_dev)
2974{ 3494{
2975 int ret; 3495 int ret;
2976 3496
2977 /* 3497 /* no need to lock here, as rbd_dev is not registered yet */
2978 * Get the id from the image id object. If it's not a 3498 ret = rbd_dev_snaps_update(rbd_dev);
2979 * format 2 image, we'll get ENOENT back, and we'll assume
2980 * it's a format 1 image.
2981 */
2982 ret = rbd_dev_image_id(rbd_dev);
2983 if (ret)
2984 ret = rbd_dev_v1_probe(rbd_dev);
2985 else
2986 ret = rbd_dev_v2_probe(rbd_dev);
2987 if (ret) 3499 if (ret)
2988 dout("probe failed, returning %d\n", ret); 3500 return ret;
2989
2990 return ret;
2991}
2992
2993static ssize_t rbd_add(struct bus_type *bus,
2994 const char *buf,
2995 size_t count)
2996{
2997 char *options;
2998 struct rbd_device *rbd_dev = NULL;
2999 const char *mon_addrs = NULL;
3000 size_t mon_addrs_size = 0;
3001 struct ceph_osd_client *osdc;
3002 int rc = -ENOMEM;
3003 char *snap_name;
3004
3005 if (!try_module_get(THIS_MODULE))
3006 return -ENODEV;
3007
3008 options = kmalloc(count, GFP_KERNEL);
3009 if (!options)
3010 goto err_out_mem;
3011 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3012 if (!rbd_dev)
3013 goto err_out_mem;
3014
3015 /* static rbd_device initialization */
3016 spin_lock_init(&rbd_dev->lock);
3017 INIT_LIST_HEAD(&rbd_dev->node);
3018 INIT_LIST_HEAD(&rbd_dev->snaps);
3019 init_rwsem(&rbd_dev->header_rwsem);
3020
3021 /* parse add command */
3022 snap_name = rbd_add_parse_args(rbd_dev, buf,
3023 &mon_addrs, &mon_addrs_size, options, count);
3024 if (IS_ERR(snap_name)) {
3025 rc = PTR_ERR(snap_name);
3026 goto err_out_mem;
3027 }
3028
3029 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3030 if (rc < 0)
3031 goto err_out_args;
3032
3033 /* pick the pool */
3034 osdc = &rbd_dev->rbd_client->client->osdc;
3035 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3036 if (rc < 0)
3037 goto err_out_client;
3038 rbd_dev->pool_id = rc;
3039
3040 rc = rbd_dev_probe(rbd_dev);
3041 if (rc < 0)
3042 goto err_out_client;
3043 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3044 3501
3045 /* no need to lock here, as rbd_dev is not registered yet */ 3502 ret = rbd_dev_probe_update_spec(rbd_dev);
3046 rc = rbd_dev_snaps_update(rbd_dev); 3503 if (ret)
3047 if (rc) 3504 goto err_out_snaps;
3048 goto err_out_header;
3049 3505
3050 rc = rbd_dev_set_mapping(rbd_dev, snap_name); 3506 ret = rbd_dev_set_mapping(rbd_dev);
3051 if (rc) 3507 if (ret)
3052 goto err_out_header; 3508 goto err_out_snaps;
3053 3509
3054 /* generate unique id: find highest unique id, add one */ 3510 /* generate unique id: find highest unique id, add one */
3055 rbd_dev_id_get(rbd_dev); 3511 rbd_dev_id_get(rbd_dev);
@@ -3061,34 +3517,33 @@ static ssize_t rbd_add(struct bus_type *bus,
3061 3517
3062 /* Get our block major device number. */ 3518 /* Get our block major device number. */
3063 3519
3064 rc = register_blkdev(0, rbd_dev->name); 3520 ret = register_blkdev(0, rbd_dev->name);
3065 if (rc < 0) 3521 if (ret < 0)
3066 goto err_out_id; 3522 goto err_out_id;
3067 rbd_dev->major = rc; 3523 rbd_dev->major = ret;
3068 3524
3069 /* Set up the blkdev mapping. */ 3525 /* Set up the blkdev mapping. */
3070 3526
3071 rc = rbd_init_disk(rbd_dev); 3527 ret = rbd_init_disk(rbd_dev);
3072 if (rc) 3528 if (ret)
3073 goto err_out_blkdev; 3529 goto err_out_blkdev;
3074 3530
3075 rc = rbd_bus_add_dev(rbd_dev); 3531 ret = rbd_bus_add_dev(rbd_dev);
3076 if (rc) 3532 if (ret)
3077 goto err_out_disk; 3533 goto err_out_disk;
3078 3534
3079 /* 3535 /*
3080 * At this point cleanup in the event of an error is the job 3536 * At this point cleanup in the event of an error is the job
3081 * of the sysfs code (initiated by rbd_bus_del_dev()). 3537 * of the sysfs code (initiated by rbd_bus_del_dev()).
3082 */ 3538 */
3083
3084 down_write(&rbd_dev->header_rwsem); 3539 down_write(&rbd_dev->header_rwsem);
3085 rc = rbd_dev_snaps_register(rbd_dev); 3540 ret = rbd_dev_snaps_register(rbd_dev);
3086 up_write(&rbd_dev->header_rwsem); 3541 up_write(&rbd_dev->header_rwsem);
3087 if (rc) 3542 if (ret)
3088 goto err_out_bus; 3543 goto err_out_bus;
3089 3544
3090 rc = rbd_init_watch_dev(rbd_dev); 3545 ret = rbd_init_watch_dev(rbd_dev);
3091 if (rc) 3546 if (ret)
3092 goto err_out_bus; 3547 goto err_out_bus;
3093 3548
3094 /* Everything's ready. Announce the disk to the world. */ 3549 /* Everything's ready. Announce the disk to the world. */
@@ -3098,37 +3553,119 @@ static ssize_t rbd_add(struct bus_type *bus,
3098 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, 3553 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3099 (unsigned long long) rbd_dev->mapping.size); 3554 (unsigned long long) rbd_dev->mapping.size);
3100 3555
3101 return count; 3556 return ret;
3102
3103err_out_bus: 3557err_out_bus:
3104 /* this will also clean up rest of rbd_dev stuff */ 3558 /* this will also clean up rest of rbd_dev stuff */
3105 3559
3106 rbd_bus_del_dev(rbd_dev); 3560 rbd_bus_del_dev(rbd_dev);
3107 kfree(options);
3108 return rc;
3109 3561
3562 return ret;
3110err_out_disk: 3563err_out_disk:
3111 rbd_free_disk(rbd_dev); 3564 rbd_free_disk(rbd_dev);
3112err_out_blkdev: 3565err_out_blkdev:
3113 unregister_blkdev(rbd_dev->major, rbd_dev->name); 3566 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3114err_out_id: 3567err_out_id:
3115 rbd_dev_id_put(rbd_dev); 3568 rbd_dev_id_put(rbd_dev);
3116err_out_header: 3569err_out_snaps:
3117 rbd_header_free(&rbd_dev->header); 3570 rbd_remove_all_snaps(rbd_dev);
3571
3572 return ret;
3573}
3574
3575/*
3576 * Probe for the existence of the header object for the given rbd
3577 * device. For format 2 images this includes determining the image
3578 * id.
3579 */
3580static int rbd_dev_probe(struct rbd_device *rbd_dev)
3581{
3582 int ret;
3583
3584 /*
3585 * Get the id from the image id object. If it's not a
3586 * format 2 image, we'll get ENOENT back, and we'll assume
3587 * it's a format 1 image.
3588 */
3589 ret = rbd_dev_image_id(rbd_dev);
3590 if (ret)
3591 ret = rbd_dev_v1_probe(rbd_dev);
3592 else
3593 ret = rbd_dev_v2_probe(rbd_dev);
3594 if (ret) {
3595 dout("probe failed, returning %d\n", ret);
3596
3597 return ret;
3598 }
3599
3600 ret = rbd_dev_probe_finish(rbd_dev);
3601 if (ret)
3602 rbd_header_free(&rbd_dev->header);
3603
3604 return ret;
3605}
3606
3607static ssize_t rbd_add(struct bus_type *bus,
3608 const char *buf,
3609 size_t count)
3610{
3611 struct rbd_device *rbd_dev = NULL;
3612 struct ceph_options *ceph_opts = NULL;
3613 struct rbd_options *rbd_opts = NULL;
3614 struct rbd_spec *spec = NULL;
3615 struct rbd_client *rbdc;
3616 struct ceph_osd_client *osdc;
3617 int rc = -ENOMEM;
3618
3619 if (!try_module_get(THIS_MODULE))
3620 return -ENODEV;
3621
3622 /* parse add command */
3623 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
3624 if (rc < 0)
3625 goto err_out_module;
3626
3627 rbdc = rbd_get_client(ceph_opts);
3628 if (IS_ERR(rbdc)) {
3629 rc = PTR_ERR(rbdc);
3630 goto err_out_args;
3631 }
3632 ceph_opts = NULL; /* rbd_dev client now owns this */
3633
3634 /* pick the pool */
3635 osdc = &rbdc->client->osdc;
3636 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
3637 if (rc < 0)
3638 goto err_out_client;
3639 spec->pool_id = (u64) rc;
3640
3641 rbd_dev = rbd_dev_create(rbdc, spec);
3642 if (!rbd_dev)
3643 goto err_out_client;
3644 rbdc = NULL; /* rbd_dev now owns this */
3645 spec = NULL; /* rbd_dev now owns this */
3646
3647 rbd_dev->mapping.read_only = rbd_opts->read_only;
3648 kfree(rbd_opts);
3649 rbd_opts = NULL; /* done with this */
3650
3651 rc = rbd_dev_probe(rbd_dev);
3652 if (rc < 0)
3653 goto err_out_rbd_dev;
3654
3655 return count;
3656err_out_rbd_dev:
3657 rbd_dev_destroy(rbd_dev);
3118err_out_client: 3658err_out_client:
3119 kfree(rbd_dev->header_name); 3659 rbd_put_client(rbdc);
3120 rbd_put_client(rbd_dev);
3121 kfree(rbd_dev->image_id);
3122err_out_args: 3660err_out_args:
3123 kfree(rbd_dev->mapping.snap_name); 3661 if (ceph_opts)
3124 kfree(rbd_dev->image_name); 3662 ceph_destroy_options(ceph_opts);
3125 kfree(rbd_dev->pool_name); 3663 kfree(rbd_opts);
3126err_out_mem: 3664 rbd_spec_put(spec);
3127 kfree(rbd_dev); 3665err_out_module:
3128 kfree(options); 3666 module_put(THIS_MODULE);
3129 3667
3130 dout("Error adding device %s\n", buf); 3668 dout("Error adding device %s\n", buf);
3131 module_put(THIS_MODULE);
3132 3669
3133 return (ssize_t) rc; 3670 return (ssize_t) rc;
3134} 3671}
@@ -3163,7 +3700,6 @@ static void rbd_dev_release(struct device *dev)
3163 if (rbd_dev->watch_event) 3700 if (rbd_dev->watch_event)
3164 rbd_req_sync_unwatch(rbd_dev); 3701 rbd_req_sync_unwatch(rbd_dev);
3165 3702
3166 rbd_put_client(rbd_dev);
3167 3703
3168 /* clean up and free blkdev */ 3704 /* clean up and free blkdev */
3169 rbd_free_disk(rbd_dev); 3705 rbd_free_disk(rbd_dev);
@@ -3173,13 +3709,9 @@ static void rbd_dev_release(struct device *dev)
3173 rbd_header_free(&rbd_dev->header); 3709 rbd_header_free(&rbd_dev->header);
3174 3710
3175 /* done with the id, and with the rbd_dev */ 3711 /* done with the id, and with the rbd_dev */
3176 kfree(rbd_dev->mapping.snap_name);
3177 kfree(rbd_dev->image_id);
3178 kfree(rbd_dev->header_name);
3179 kfree(rbd_dev->pool_name);
3180 kfree(rbd_dev->image_name);
3181 rbd_dev_id_put(rbd_dev); 3712 rbd_dev_id_put(rbd_dev);
3182 kfree(rbd_dev); 3713 rbd_assert(rbd_dev->rbd_client != NULL);
3714 rbd_dev_destroy(rbd_dev);
3183 3715
3184 /* release module ref */ 3716 /* release module ref */
3185 module_put(THIS_MODULE); 3717 module_put(THIS_MODULE);
@@ -3211,7 +3743,12 @@ static ssize_t rbd_remove(struct bus_type *bus,
3211 goto done; 3743 goto done;
3212 } 3744 }
3213 3745
3214 __rbd_remove_all_snaps(rbd_dev); 3746 if (rbd_dev->open_count) {
3747 ret = -EBUSY;
3748 goto done;
3749 }
3750
3751 rbd_remove_all_snaps(rbd_dev);
3215 rbd_bus_del_dev(rbd_dev); 3752 rbd_bus_del_dev(rbd_dev);
3216 3753
3217done: 3754done:
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index cbe77fa105ba..49d77cbcf8bd 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -46,8 +46,6 @@
46#define RBD_MIN_OBJ_ORDER 16 46#define RBD_MIN_OBJ_ORDER 16
47#define RBD_MAX_OBJ_ORDER 30 47#define RBD_MAX_OBJ_ORDER 30
48 48
49#define RBD_MAX_SEG_NAME_LEN 128
50
51#define RBD_COMP_NONE 0 49#define RBD_COMP_NONE 0
52#define RBD_CRYPT_NONE 0 50#define RBD_CRYPT_NONE 0
53 51
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 6690269f5dde..064d1a68d2c1 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -267,6 +267,14 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
267 kfree(req->r_pages); 267 kfree(req->r_pages);
268} 268}
269 269
270static void ceph_unlock_page_vector(struct page **pages, int num_pages)
271{
272 int i;
273
274 for (i = 0; i < num_pages; i++)
275 unlock_page(pages[i]);
276}
277
270/* 278/*
271 * start an async read(ahead) operation. return nr_pages we submitted 279 * start an async read(ahead) operation. return nr_pages we submitted
272 * a read for on success, or negative error code. 280 * a read for on success, or negative error code.
@@ -347,6 +355,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
347 return nr_pages; 355 return nr_pages;
348 356
349out_pages: 357out_pages:
358 ceph_unlock_page_vector(pages, nr_pages);
350 ceph_release_page_vector(pages, nr_pages); 359 ceph_release_page_vector(pages, nr_pages);
351out: 360out:
352 ceph_osdc_put_request(req); 361 ceph_osdc_put_request(req);
@@ -1078,23 +1087,51 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
1078 struct page **pagep, void **fsdata) 1087 struct page **pagep, void **fsdata)
1079{ 1088{
1080 struct inode *inode = file->f_dentry->d_inode; 1089 struct inode *inode = file->f_dentry->d_inode;
1090 struct ceph_inode_info *ci = ceph_inode(inode);
1091 struct ceph_file_info *fi = file->private_data;
1081 struct page *page; 1092 struct page *page;
1082 pgoff_t index = pos >> PAGE_CACHE_SHIFT; 1093 pgoff_t index = pos >> PAGE_CACHE_SHIFT;
1083 int r; 1094 int r, want, got = 0;
1095
1096 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1097 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1098 else
1099 want = CEPH_CAP_FILE_BUFFER;
1100
1101 dout("write_begin %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
1102 inode, ceph_vinop(inode), pos, len, inode->i_size);
1103 r = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos+len);
1104 if (r < 0)
1105 return r;
1106 dout("write_begin %p %llx.%llx %llu~%u got cap refs on %s\n",
1107 inode, ceph_vinop(inode), pos, len, ceph_cap_string(got));
1108 if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) {
1109 ceph_put_cap_refs(ci, got);
1110 return -EAGAIN;
1111 }
1084 1112
1085 do { 1113 do {
1086 /* get a page */ 1114 /* get a page */
1087 page = grab_cache_page_write_begin(mapping, index, 0); 1115 page = grab_cache_page_write_begin(mapping, index, 0);
1088 if (!page) 1116 if (!page) {
1089 return -ENOMEM; 1117 r = -ENOMEM;
1090 *pagep = page; 1118 break;
1119 }
1091 1120
1092 dout("write_begin file %p inode %p page %p %d~%d\n", file, 1121 dout("write_begin file %p inode %p page %p %d~%d\n", file,
1093 inode, page, (int)pos, (int)len); 1122 inode, page, (int)pos, (int)len);
1094 1123
1095 r = ceph_update_writeable_page(file, pos, len, page); 1124 r = ceph_update_writeable_page(file, pos, len, page);
1125 if (r)
1126 page_cache_release(page);
1096 } while (r == -EAGAIN); 1127 } while (r == -EAGAIN);
1097 1128
1129 if (r) {
1130 ceph_put_cap_refs(ci, got);
1131 } else {
1132 *pagep = page;
1133 *(int *)fsdata = got;
1134 }
1098 return r; 1135 return r;
1099} 1136}
1100 1137
@@ -1108,10 +1145,12 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
1108 struct page *page, void *fsdata) 1145 struct page *page, void *fsdata)
1109{ 1146{
1110 struct inode *inode = file->f_dentry->d_inode; 1147 struct inode *inode = file->f_dentry->d_inode;
1148 struct ceph_inode_info *ci = ceph_inode(inode);
1111 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 1149 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1112 struct ceph_mds_client *mdsc = fsc->mdsc; 1150 struct ceph_mds_client *mdsc = fsc->mdsc;
1113 unsigned from = pos & (PAGE_CACHE_SIZE - 1); 1151 unsigned from = pos & (PAGE_CACHE_SIZE - 1);
1114 int check_cap = 0; 1152 int check_cap = 0;
1153 int got = (unsigned long)fsdata;
1115 1154
1116 dout("write_end file %p inode %p page %p %d~%d (%d)\n", file, 1155 dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
1117 inode, page, (int)pos, (int)copied, (int)len); 1156 inode, page, (int)pos, (int)copied, (int)len);
@@ -1134,6 +1173,19 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
1134 up_read(&mdsc->snap_rwsem); 1173 up_read(&mdsc->snap_rwsem);
1135 page_cache_release(page); 1174 page_cache_release(page);
1136 1175
1176 if (copied > 0) {
1177 int dirty;
1178 spin_lock(&ci->i_ceph_lock);
1179 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
1180 spin_unlock(&ci->i_ceph_lock);
1181 if (dirty)
1182 __mark_inode_dirty(inode, dirty);
1183 }
1184
1185 dout("write_end %p %llx.%llx %llu~%u dropping cap refs on %s\n",
1186 inode, ceph_vinop(inode), pos, len, ceph_cap_string(got));
1187 ceph_put_cap_refs(ci, got);
1188
1137 if (check_cap) 1189 if (check_cap)
1138 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL); 1190 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
1139 1191
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 3251e9cc6401..a1d9bb30c1bf 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -236,8 +236,10 @@ static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
236 if (!ctx) { 236 if (!ctx) {
237 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); 237 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
238 if (cap) { 238 if (cap) {
239 spin_lock(&mdsc->caps_list_lock);
239 mdsc->caps_use_count++; 240 mdsc->caps_use_count++;
240 mdsc->caps_total_count++; 241 mdsc->caps_total_count++;
242 spin_unlock(&mdsc->caps_list_lock);
241 } 243 }
242 return cap; 244 return cap;
243 } 245 }
@@ -1349,11 +1351,15 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
1349 if (!ci->i_head_snapc) 1351 if (!ci->i_head_snapc)
1350 ci->i_head_snapc = ceph_get_snap_context( 1352 ci->i_head_snapc = ceph_get_snap_context(
1351 ci->i_snap_realm->cached_context); 1353 ci->i_snap_realm->cached_context);
1352 dout(" inode %p now dirty snapc %p\n", &ci->vfs_inode, 1354 dout(" inode %p now dirty snapc %p auth cap %p\n",
1353 ci->i_head_snapc); 1355 &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
1354 BUG_ON(!list_empty(&ci->i_dirty_item)); 1356 BUG_ON(!list_empty(&ci->i_dirty_item));
1355 spin_lock(&mdsc->cap_dirty_lock); 1357 spin_lock(&mdsc->cap_dirty_lock);
1356 list_add(&ci->i_dirty_item, &mdsc->cap_dirty); 1358 if (ci->i_auth_cap)
1359 list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
1360 else
1361 list_add(&ci->i_dirty_item,
1362 &mdsc->cap_dirty_migrating);
1357 spin_unlock(&mdsc->cap_dirty_lock); 1363 spin_unlock(&mdsc->cap_dirty_lock);
1358 if (ci->i_flushing_caps == 0) { 1364 if (ci->i_flushing_caps == 0) {
1359 ihold(inode); 1365 ihold(inode);
@@ -2388,7 +2394,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2388 &atime); 2394 &atime);
2389 2395
2390 /* max size increase? */ 2396 /* max size increase? */
2391 if (max_size != ci->i_max_size) { 2397 if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
2392 dout("max_size %lld -> %llu\n", ci->i_max_size, max_size); 2398 dout("max_size %lld -> %llu\n", ci->i_max_size, max_size);
2393 ci->i_max_size = max_size; 2399 ci->i_max_size = max_size;
2394 if (max_size >= ci->i_wanted_max_size) { 2400 if (max_size >= ci->i_wanted_max_size) {
@@ -2745,6 +2751,7 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
2745 2751
2746 /* make sure we re-request max_size, if necessary */ 2752 /* make sure we re-request max_size, if necessary */
2747 spin_lock(&ci->i_ceph_lock); 2753 spin_lock(&ci->i_ceph_lock);
2754 ci->i_wanted_max_size = 0; /* reset */
2748 ci->i_requested_max_size = 0; 2755 ci->i_requested_max_size = 0;
2749 spin_unlock(&ci->i_ceph_lock); 2756 spin_unlock(&ci->i_ceph_lock);
2750} 2757}
@@ -2840,8 +2847,6 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2840 case CEPH_CAP_OP_IMPORT: 2847 case CEPH_CAP_OP_IMPORT:
2841 handle_cap_import(mdsc, inode, h, session, 2848 handle_cap_import(mdsc, inode, h, session,
2842 snaptrace, snaptrace_len); 2849 snaptrace, snaptrace_len);
2843 ceph_check_caps(ceph_inode(inode), 0, session);
2844 goto done_unlocked;
2845 } 2850 }
2846 2851
2847 /* the rest require a cap */ 2852 /* the rest require a cap */
@@ -2858,6 +2863,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2858 switch (op) { 2863 switch (op) {
2859 case CEPH_CAP_OP_REVOKE: 2864 case CEPH_CAP_OP_REVOKE:
2860 case CEPH_CAP_OP_GRANT: 2865 case CEPH_CAP_OP_GRANT:
2866 case CEPH_CAP_OP_IMPORT:
2861 handle_cap_grant(inode, h, session, cap, msg->middle); 2867 handle_cap_grant(inode, h, session, cap, msg->middle);
2862 goto done_unlocked; 2868 goto done_unlocked;
2863 2869
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index d4dfdcf76d7f..e51558fca3a3 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -712,63 +712,53 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
712 struct ceph_osd_client *osdc = 712 struct ceph_osd_client *osdc =
713 &ceph_sb_to_client(inode->i_sb)->client->osdc; 713 &ceph_sb_to_client(inode->i_sb)->client->osdc;
714 loff_t endoff = pos + iov->iov_len; 714 loff_t endoff = pos + iov->iov_len;
715 int want, got = 0; 715 int got = 0;
716 int ret, err; 716 int ret, err, written;
717 717
718 if (ceph_snap(inode) != CEPH_NOSNAP) 718 if (ceph_snap(inode) != CEPH_NOSNAP)
719 return -EROFS; 719 return -EROFS;
720 720
721retry_snap: 721retry_snap:
722 written = 0;
722 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 723 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
723 return -ENOSPC; 724 return -ENOSPC;
724 __ceph_do_pending_vmtruncate(inode); 725 __ceph_do_pending_vmtruncate(inode);
725 dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
726 inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
727 inode->i_size);
728 if (fi->fmode & CEPH_FILE_MODE_LAZY)
729 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
730 else
731 want = CEPH_CAP_FILE_BUFFER;
732 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
733 if (ret < 0)
734 goto out_put;
735
736 dout("aio_write %p %llx.%llx %llu~%u got cap refs on %s\n",
737 inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
738 ceph_cap_string(got));
739
740 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
741 (iocb->ki_filp->f_flags & O_DIRECT) ||
742 (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
743 (fi->flags & CEPH_F_SYNC)) {
744 ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
745 &iocb->ki_pos);
746 } else {
747 /*
748 * buffered write; drop Fw early to avoid slow
749 * revocation if we get stuck on balance_dirty_pages
750 */
751 int dirty;
752
753 spin_lock(&ci->i_ceph_lock);
754 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
755 spin_unlock(&ci->i_ceph_lock);
756 ceph_put_cap_refs(ci, got);
757 726
727 /*
728 * try to do a buffered write. if we don't have sufficient
729 * caps, we'll get -EAGAIN from generic_file_aio_write, or a
730 * short write if we only get caps for some pages.
731 */
732 if (!(iocb->ki_filp->f_flags & O_DIRECT) &&
733 !(inode->i_sb->s_flags & MS_SYNCHRONOUS) &&
734 !(fi->flags & CEPH_F_SYNC)) {
758 ret = generic_file_aio_write(iocb, iov, nr_segs, pos); 735 ret = generic_file_aio_write(iocb, iov, nr_segs, pos);
736 if (ret >= 0)
737 written = ret;
738
759 if ((ret >= 0 || ret == -EIOCBQUEUED) && 739 if ((ret >= 0 || ret == -EIOCBQUEUED) &&
760 ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host) 740 ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host)
761 || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) { 741 || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) {
762 err = vfs_fsync_range(file, pos, pos + ret - 1, 1); 742 err = vfs_fsync_range(file, pos, pos + written - 1, 1);
763 if (err < 0) 743 if (err < 0)
764 ret = err; 744 ret = err;
765 } 745 }
746 if ((ret < 0 && ret != -EAGAIN) || pos + written >= endoff)
747 goto out;
748 }
766 749
767 if (dirty) 750 dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
768 __mark_inode_dirty(inode, dirty); 751 inode, ceph_vinop(inode), pos + written,
752 (unsigned)iov->iov_len - written, inode->i_size);
753 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, 0, &got, endoff);
754 if (ret < 0)
769 goto out; 755 goto out;
770 }
771 756
757 dout("aio_write %p %llx.%llx %llu~%u got cap refs on %s\n",
758 inode, ceph_vinop(inode), pos + written,
759 (unsigned)iov->iov_len - written, ceph_cap_string(got));
760 ret = ceph_sync_write(file, iov->iov_base + written,
761 iov->iov_len - written, &iocb->ki_pos);
772 if (ret >= 0) { 762 if (ret >= 0) {
773 int dirty; 763 int dirty;
774 spin_lock(&ci->i_ceph_lock); 764 spin_lock(&ci->i_ceph_lock);
@@ -777,13 +767,10 @@ retry_snap:
777 if (dirty) 767 if (dirty)
778 __mark_inode_dirty(inode, dirty); 768 __mark_inode_dirty(inode, dirty);
779 } 769 }
780
781out_put:
782 dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n", 770 dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n",
783 inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len, 771 inode, ceph_vinop(inode), pos + written,
784 ceph_cap_string(got)); 772 (unsigned)iov->iov_len - written, ceph_cap_string(got));
785 ceph_put_cap_refs(ci, got); 773 ceph_put_cap_refs(ci, got);
786
787out: 774out:
788 if (ret == -EOLDSNAPC) { 775 if (ret == -EOLDSNAPC) {
789 dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n", 776 dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n",
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index ba95eea201bf..2971eaa65cdc 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1466,7 +1466,7 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)
1466{ 1466{
1467 struct ceph_inode_info *ci = ceph_inode(inode); 1467 struct ceph_inode_info *ci = ceph_inode(inode);
1468 u64 to; 1468 u64 to;
1469 int wrbuffer_refs, wake = 0; 1469 int wrbuffer_refs, finish = 0;
1470 1470
1471retry: 1471retry:
1472 spin_lock(&ci->i_ceph_lock); 1472 spin_lock(&ci->i_ceph_lock);
@@ -1498,15 +1498,18 @@ retry:
1498 truncate_inode_pages(inode->i_mapping, to); 1498 truncate_inode_pages(inode->i_mapping, to);
1499 1499
1500 spin_lock(&ci->i_ceph_lock); 1500 spin_lock(&ci->i_ceph_lock);
1501 ci->i_truncate_pending--; 1501 if (to == ci->i_truncate_size) {
1502 if (ci->i_truncate_pending == 0) 1502 ci->i_truncate_pending = 0;
1503 wake = 1; 1503 finish = 1;
1504 }
1504 spin_unlock(&ci->i_ceph_lock); 1505 spin_unlock(&ci->i_ceph_lock);
1506 if (!finish)
1507 goto retry;
1505 1508
1506 if (wrbuffer_refs == 0) 1509 if (wrbuffer_refs == 0)
1507 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); 1510 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
1508 if (wake) 1511
1509 wake_up_all(&ci->i_cap_wq); 1512 wake_up_all(&ci->i_cap_wq);
1510} 1513}
1511 1514
1512 1515
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 1bcf712655d9..9165eb8309eb 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1590,7 +1590,7 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1590 } else if (rpath || rino) { 1590 } else if (rpath || rino) {
1591 *ino = rino; 1591 *ino = rino;
1592 *ppath = rpath; 1592 *ppath = rpath;
1593 *pathlen = strlen(rpath); 1593 *pathlen = rpath ? strlen(rpath) : 0;
1594 dout(" path %.*s\n", *pathlen, rpath); 1594 dout(" path %.*s\n", *pathlen, rpath);
1595 } 1595 }
1596 1596
@@ -1876,9 +1876,14 @@ finish:
1876static void __wake_requests(struct ceph_mds_client *mdsc, 1876static void __wake_requests(struct ceph_mds_client *mdsc,
1877 struct list_head *head) 1877 struct list_head *head)
1878{ 1878{
1879 struct ceph_mds_request *req, *nreq; 1879 struct ceph_mds_request *req;
1880 LIST_HEAD(tmp_list);
1881
1882 list_splice_init(head, &tmp_list);
1880 1883
1881 list_for_each_entry_safe(req, nreq, head, r_wait) { 1884 while (!list_empty(&tmp_list)) {
1885 req = list_entry(tmp_list.next,
1886 struct ceph_mds_request, r_wait);
1882 list_del_init(&req->r_wait); 1887 list_del_init(&req->r_wait);
1883 __do_request(mdsc, req); 1888 __do_request(mdsc, req);
1884 } 1889 }
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 2eb43f211325..e86aa9948124 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -403,8 +403,6 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
403 seq_printf(m, ",mount_timeout=%d", opt->mount_timeout); 403 seq_printf(m, ",mount_timeout=%d", opt->mount_timeout);
404 if (opt->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT) 404 if (opt->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT)
405 seq_printf(m, ",osd_idle_ttl=%d", opt->osd_idle_ttl); 405 seq_printf(m, ",osd_idle_ttl=%d", opt->osd_idle_ttl);
406 if (opt->osd_timeout != CEPH_OSD_TIMEOUT_DEFAULT)
407 seq_printf(m, ",osdtimeout=%d", opt->osd_timeout);
408 if (opt->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT) 406 if (opt->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT)
409 seq_printf(m, ",osdkeepalivetimeout=%d", 407 seq_printf(m, ",osdkeepalivetimeout=%d",
410 opt->osd_keepalive_timeout); 408 opt->osd_keepalive_timeout);
@@ -849,7 +847,7 @@ static int ceph_register_bdi(struct super_block *sb,
849 fsc->backing_dev_info.ra_pages = 847 fsc->backing_dev_info.ra_pages =
850 default_backing_dev_info.ra_pages; 848 default_backing_dev_info.ra_pages;
851 849
852 err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%d", 850 err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%ld",
853 atomic_long_inc_return(&bdi_seq)); 851 atomic_long_inc_return(&bdi_seq));
854 if (!err) 852 if (!err)
855 sb->s_bdi = &fsc->backing_dev_info; 853 sb->s_bdi = &fsc->backing_dev_info;
diff --git a/include/linux/backing-dev.h b/include/linux/backing-dev.h
index 2a9a9abc9126..12731a19ef06 100644
--- a/include/linux/backing-dev.h
+++ b/include/linux/backing-dev.h
@@ -114,6 +114,7 @@ struct backing_dev_info {
114int bdi_init(struct backing_dev_info *bdi); 114int bdi_init(struct backing_dev_info *bdi);
115void bdi_destroy(struct backing_dev_info *bdi); 115void bdi_destroy(struct backing_dev_info *bdi);
116 116
117__printf(3, 4)
117int bdi_register(struct backing_dev_info *bdi, struct device *parent, 118int bdi_register(struct backing_dev_info *bdi, struct device *parent,
118 const char *fmt, ...); 119 const char *fmt, ...);
119int bdi_register_dev(struct backing_dev_info *bdi, dev_t dev); 120int bdi_register_dev(struct backing_dev_info *bdi, dev_t dev);
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 6470792b13d3..084d3c622b12 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -43,7 +43,6 @@ struct ceph_options {
43 struct ceph_entity_addr my_addr; 43 struct ceph_entity_addr my_addr;
44 int mount_timeout; 44 int mount_timeout;
45 int osd_idle_ttl; 45 int osd_idle_ttl;
46 int osd_timeout;
47 int osd_keepalive_timeout; 46 int osd_keepalive_timeout;
48 47
49 /* 48 /*
@@ -63,7 +62,6 @@ struct ceph_options {
63 * defaults 62 * defaults
64 */ 63 */
65#define CEPH_MOUNT_TIMEOUT_DEFAULT 60 64#define CEPH_MOUNT_TIMEOUT_DEFAULT 60
66#define CEPH_OSD_TIMEOUT_DEFAULT 60 /* seconds */
67#define CEPH_OSD_KEEPALIVE_DEFAULT 5 65#define CEPH_OSD_KEEPALIVE_DEFAULT 5
68#define CEPH_OSD_IDLE_TTL_DEFAULT 60 66#define CEPH_OSD_IDLE_TTL_DEFAULT 60
69 67
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index e37acbe989a9..10a417f9f76f 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -123,6 +123,7 @@ extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
123extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, 123extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
124 struct ceph_pg pgid); 124 struct ceph_pg pgid);
125 125
126extern const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id);
126extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name); 127extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name);
127 128
128#endif 129#endif
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index de91fbdf127e..2c04afeead1c 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -87,6 +87,8 @@ struct ceph_pg {
87 * 87 *
88 * lpgp_num -- as above. 88 * lpgp_num -- as above.
89 */ 89 */
90#define CEPH_NOPOOL ((__u64) (-1)) /* pool id not defined */
91
90#define CEPH_PG_TYPE_REP 1 92#define CEPH_PG_TYPE_REP 1
91#define CEPH_PG_TYPE_RAID4 2 93#define CEPH_PG_TYPE_RAID4 2
92#define CEPH_PG_POOL_VERSION 2 94#define CEPH_PG_POOL_VERSION 2
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index a8020293f342..ee71ea26777a 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -305,7 +305,6 @@ ceph_parse_options(char *options, const char *dev_name,
305 305
306 /* start with defaults */ 306 /* start with defaults */
307 opt->flags = CEPH_OPT_DEFAULT; 307 opt->flags = CEPH_OPT_DEFAULT;
308 opt->osd_timeout = CEPH_OSD_TIMEOUT_DEFAULT;
309 opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT; 308 opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
310 opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */ 309 opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
311 opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */ 310 opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */
@@ -391,7 +390,7 @@ ceph_parse_options(char *options, const char *dev_name,
391 390
392 /* misc */ 391 /* misc */
393 case Opt_osdtimeout: 392 case Opt_osdtimeout:
394 opt->osd_timeout = intval; 393 pr_warning("ignoring deprecated osdtimeout option\n");
395 break; 394 break;
396 case Opt_osdkeepalivetimeout: 395 case Opt_osdkeepalivetimeout:
397 opt->osd_keepalive_timeout = intval; 396 opt->osd_keepalive_timeout = intval;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 3ef1759403b4..4d111fd2b492 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -2244,22 +2244,62 @@ bad_tag:
2244 2244
2245 2245
2246/* 2246/*
2247 * Atomically queue work on a connection. Bump @con reference to 2247 * Atomically queue work on a connection after the specified delay.
2248 * avoid races with connection teardown. 2248 * Bump @con reference to avoid races with connection teardown.
2249 * Returns 0 if work was queued, or an error code otherwise.
2249 */ 2250 */
2250static void queue_con(struct ceph_connection *con) 2251static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
2251{ 2252{
2252 if (!con->ops->get(con)) { 2253 if (!con->ops->get(con)) {
2253 dout("queue_con %p ref count 0\n", con); 2254 dout("%s %p ref count 0\n", __func__, con);
2254 return; 2255
2256 return -ENOENT;
2255 } 2257 }
2256 2258
2257 if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) { 2259 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
2258 dout("queue_con %p - already queued\n", con); 2260 dout("%s %p - already queued\n", __func__, con);
2259 con->ops->put(con); 2261 con->ops->put(con);
2260 } else { 2262
2261 dout("queue_con %p\n", con); 2263 return -EBUSY;
2262 } 2264 }
2265
2266 dout("%s %p %lu\n", __func__, con, delay);
2267
2268 return 0;
2269}
2270
2271static void queue_con(struct ceph_connection *con)
2272{
2273 (void) queue_con_delay(con, 0);
2274}
2275
2276static bool con_sock_closed(struct ceph_connection *con)
2277{
2278 if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags))
2279 return false;
2280
2281#define CASE(x) \
2282 case CON_STATE_ ## x: \
2283 con->error_msg = "socket closed (con state " #x ")"; \
2284 break;
2285
2286 switch (con->state) {
2287 CASE(CLOSED);
2288 CASE(PREOPEN);
2289 CASE(CONNECTING);
2290 CASE(NEGOTIATING);
2291 CASE(OPEN);
2292 CASE(STANDBY);
2293 default:
2294 pr_warning("%s con %p unrecognized state %lu\n",
2295 __func__, con, con->state);
2296 con->error_msg = "unrecognized con state";
2297 BUG();
2298 break;
2299 }
2300#undef CASE
2301
2302 return true;
2263} 2303}
2264 2304
2265/* 2305/*
@@ -2273,35 +2313,16 @@ static void con_work(struct work_struct *work)
2273 2313
2274 mutex_lock(&con->mutex); 2314 mutex_lock(&con->mutex);
2275restart: 2315restart:
2276 if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) { 2316 if (con_sock_closed(con))
2277 switch (con->state) {
2278 case CON_STATE_CONNECTING:
2279 con->error_msg = "connection failed";
2280 break;
2281 case CON_STATE_NEGOTIATING:
2282 con->error_msg = "negotiation failed";
2283 break;
2284 case CON_STATE_OPEN:
2285 con->error_msg = "socket closed";
2286 break;
2287 default:
2288 dout("unrecognized con state %d\n", (int)con->state);
2289 con->error_msg = "unrecognized con state";
2290 BUG();
2291 }
2292 goto fault; 2317 goto fault;
2293 }
2294 2318
2295 if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) { 2319 if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
2296 dout("con_work %p backing off\n", con); 2320 dout("con_work %p backing off\n", con);
2297 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2321 ret = queue_con_delay(con, round_jiffies_relative(con->delay));
2298 round_jiffies_relative(con->delay))) { 2322 if (ret) {
2299 dout("con_work %p backoff %lu\n", con, con->delay);
2300 mutex_unlock(&con->mutex);
2301 return;
2302 } else {
2303 dout("con_work %p FAILED to back off %lu\n", con, 2323 dout("con_work %p FAILED to back off %lu\n", con,
2304 con->delay); 2324 con->delay);
2325 BUG_ON(ret == -ENOENT);
2305 set_bit(CON_FLAG_BACKOFF, &con->flags); 2326 set_bit(CON_FLAG_BACKOFF, &con->flags);
2306 } 2327 }
2307 goto done; 2328 goto done;
@@ -2356,7 +2377,7 @@ fault:
2356static void ceph_fault(struct ceph_connection *con) 2377static void ceph_fault(struct ceph_connection *con)
2357 __releases(con->mutex) 2378 __releases(con->mutex)
2358{ 2379{
2359 pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 2380 pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
2360 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); 2381 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
2361 dout("fault %p state %lu to peer %s\n", 2382 dout("fault %p state %lu to peer %s\n",
2362 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); 2383 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
@@ -2398,24 +2419,8 @@ static void ceph_fault(struct ceph_connection *con)
2398 con->delay = BASE_DELAY_INTERVAL; 2419 con->delay = BASE_DELAY_INTERVAL;
2399 else if (con->delay < MAX_DELAY_INTERVAL) 2420 else if (con->delay < MAX_DELAY_INTERVAL)
2400 con->delay *= 2; 2421 con->delay *= 2;
2401 con->ops->get(con); 2422 set_bit(CON_FLAG_BACKOFF, &con->flags);
2402 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2423 queue_con(con);
2403 round_jiffies_relative(con->delay))) {
2404 dout("fault queued %p delay %lu\n", con, con->delay);
2405 } else {
2406 con->ops->put(con);
2407 dout("fault failed to queue %p delay %lu, backoff\n",
2408 con, con->delay);
2409 /*
2410 * In many cases we see a socket state change
2411 * while con_work is running and end up
2412 * queuing (non-delayed) work, such that we
2413 * can't backoff with a delay. Set a flag so
2414 * that when con_work restarts we schedule the
2415 * delay then.
2416 */
2417 set_bit(CON_FLAG_BACKOFF, &con->flags);
2418 }
2419 } 2424 }
2420 2425
2421out_unlock: 2426out_unlock:
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index c1d756cc7448..780caf6b0491 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -221,6 +221,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
221 kref_init(&req->r_kref); 221 kref_init(&req->r_kref);
222 init_completion(&req->r_completion); 222 init_completion(&req->r_completion);
223 init_completion(&req->r_safe_completion); 223 init_completion(&req->r_safe_completion);
224 RB_CLEAR_NODE(&req->r_node);
224 INIT_LIST_HEAD(&req->r_unsafe_item); 225 INIT_LIST_HEAD(&req->r_unsafe_item);
225 INIT_LIST_HEAD(&req->r_linger_item); 226 INIT_LIST_HEAD(&req->r_linger_item);
226 INIT_LIST_HEAD(&req->r_linger_osd); 227 INIT_LIST_HEAD(&req->r_linger_osd);
@@ -580,7 +581,7 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
580 581
581 dout("__kick_osd_requests osd%d\n", osd->o_osd); 582 dout("__kick_osd_requests osd%d\n", osd->o_osd);
582 err = __reset_osd(osdc, osd); 583 err = __reset_osd(osdc, osd);
583 if (err == -EAGAIN) 584 if (err)
584 return; 585 return;
585 586
586 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 587 list_for_each_entry(req, &osd->o_requests, r_osd_item) {
@@ -607,14 +608,6 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
607 } 608 }
608} 609}
609 610
610static void kick_osd_requests(struct ceph_osd_client *osdc,
611 struct ceph_osd *kickosd)
612{
613 mutex_lock(&osdc->request_mutex);
614 __kick_osd_requests(osdc, kickosd);
615 mutex_unlock(&osdc->request_mutex);
616}
617
618/* 611/*
619 * If the osd connection drops, we need to resubmit all requests. 612 * If the osd connection drops, we need to resubmit all requests.
620 */ 613 */
@@ -628,7 +621,9 @@ static void osd_reset(struct ceph_connection *con)
628 dout("osd_reset osd%d\n", osd->o_osd); 621 dout("osd_reset osd%d\n", osd->o_osd);
629 osdc = osd->o_osdc; 622 osdc = osd->o_osdc;
630 down_read(&osdc->map_sem); 623 down_read(&osdc->map_sem);
631 kick_osd_requests(osdc, osd); 624 mutex_lock(&osdc->request_mutex);
625 __kick_osd_requests(osdc, osd);
626 mutex_unlock(&osdc->request_mutex);
632 send_queued(osdc); 627 send_queued(osdc);
633 up_read(&osdc->map_sem); 628 up_read(&osdc->map_sem);
634} 629}
@@ -647,6 +642,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
647 atomic_set(&osd->o_ref, 1); 642 atomic_set(&osd->o_ref, 1);
648 osd->o_osdc = osdc; 643 osd->o_osdc = osdc;
649 osd->o_osd = onum; 644 osd->o_osd = onum;
645 RB_CLEAR_NODE(&osd->o_node);
650 INIT_LIST_HEAD(&osd->o_requests); 646 INIT_LIST_HEAD(&osd->o_requests);
651 INIT_LIST_HEAD(&osd->o_linger_requests); 647 INIT_LIST_HEAD(&osd->o_linger_requests);
652 INIT_LIST_HEAD(&osd->o_osd_lru); 648 INIT_LIST_HEAD(&osd->o_osd_lru);
@@ -750,6 +746,7 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
750 if (list_empty(&osd->o_requests) && 746 if (list_empty(&osd->o_requests) &&
751 list_empty(&osd->o_linger_requests)) { 747 list_empty(&osd->o_linger_requests)) {
752 __remove_osd(osdc, osd); 748 __remove_osd(osdc, osd);
749 ret = -ENODEV;
753 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 750 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
754 &osd->o_con.peer_addr, 751 &osd->o_con.peer_addr,
755 sizeof(osd->o_con.peer_addr)) == 0 && 752 sizeof(osd->o_con.peer_addr)) == 0 &&
@@ -876,9 +873,9 @@ static void __unregister_request(struct ceph_osd_client *osdc,
876 req->r_osd = NULL; 873 req->r_osd = NULL;
877 } 874 }
878 875
876 list_del_init(&req->r_req_lru_item);
879 ceph_osdc_put_request(req); 877 ceph_osdc_put_request(req);
880 878
881 list_del_init(&req->r_req_lru_item);
882 if (osdc->num_requests == 0) { 879 if (osdc->num_requests == 0) {
883 dout(" no requests, canceling timeout\n"); 880 dout(" no requests, canceling timeout\n");
884 __cancel_osd_timeout(osdc); 881 __cancel_osd_timeout(osdc);
@@ -910,8 +907,8 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
910 struct ceph_osd_request *req) 907 struct ceph_osd_request *req)
911{ 908{
912 dout("__unregister_linger_request %p\n", req); 909 dout("__unregister_linger_request %p\n", req);
910 list_del_init(&req->r_linger_item);
913 if (req->r_osd) { 911 if (req->r_osd) {
914 list_del_init(&req->r_linger_item);
915 list_del_init(&req->r_linger_osd); 912 list_del_init(&req->r_linger_osd);
916 913
917 if (list_empty(&req->r_osd->o_requests) && 914 if (list_empty(&req->r_osd->o_requests) &&
@@ -1090,12 +1087,10 @@ static void handle_timeout(struct work_struct *work)
1090{ 1087{
1091 struct ceph_osd_client *osdc = 1088 struct ceph_osd_client *osdc =
1092 container_of(work, struct ceph_osd_client, timeout_work.work); 1089 container_of(work, struct ceph_osd_client, timeout_work.work);
1093 struct ceph_osd_request *req, *last_req = NULL; 1090 struct ceph_osd_request *req;
1094 struct ceph_osd *osd; 1091 struct ceph_osd *osd;
1095 unsigned long timeout = osdc->client->options->osd_timeout * HZ;
1096 unsigned long keepalive = 1092 unsigned long keepalive =
1097 osdc->client->options->osd_keepalive_timeout * HZ; 1093 osdc->client->options->osd_keepalive_timeout * HZ;
1098 unsigned long last_stamp = 0;
1099 struct list_head slow_osds; 1094 struct list_head slow_osds;
1100 dout("timeout\n"); 1095 dout("timeout\n");
1101 down_read(&osdc->map_sem); 1096 down_read(&osdc->map_sem);
@@ -1105,37 +1100,6 @@ static void handle_timeout(struct work_struct *work)
1105 mutex_lock(&osdc->request_mutex); 1100 mutex_lock(&osdc->request_mutex);
1106 1101
1107 /* 1102 /*
1108 * reset osds that appear to be _really_ unresponsive. this
1109 * is a failsafe measure.. we really shouldn't be getting to
1110 * this point if the system is working properly. the monitors
1111 * should mark the osd as failed and we should find out about
1112 * it from an updated osd map.
1113 */
1114 while (timeout && !list_empty(&osdc->req_lru)) {
1115 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
1116 r_req_lru_item);
1117
1118 /* hasn't been long enough since we sent it? */
1119 if (time_before(jiffies, req->r_stamp + timeout))
1120 break;
1121
1122 /* hasn't been long enough since it was acked? */
1123 if (req->r_request->ack_stamp == 0 ||
1124 time_before(jiffies, req->r_request->ack_stamp + timeout))
1125 break;
1126
1127 BUG_ON(req == last_req && req->r_stamp == last_stamp);
1128 last_req = req;
1129 last_stamp = req->r_stamp;
1130
1131 osd = req->r_osd;
1132 BUG_ON(!osd);
1133 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
1134 req->r_tid, osd->o_osd);
1135 __kick_osd_requests(osdc, osd);
1136 }
1137
1138 /*
1139 * ping osds that are a bit slow. this ensures that if there 1103 * ping osds that are a bit slow. this ensures that if there
1140 * is a break in the TCP connection we will notice, and reopen 1104 * is a break in the TCP connection we will notice, and reopen
1141 * a connection with that osd (from the fault callback). 1105 * a connection with that osd (from the fault callback).
@@ -1364,8 +1328,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1364 1328
1365 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, 1329 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1366 req->r_osd ? req->r_osd->o_osd : -1); 1330 req->r_osd ? req->r_osd->o_osd : -1);
1367 __unregister_linger_request(osdc, req);
1368 __register_request(osdc, req); 1331 __register_request(osdc, req);
1332 __unregister_linger_request(osdc, req);
1369 } 1333 }
1370 mutex_unlock(&osdc->request_mutex); 1334 mutex_unlock(&osdc->request_mutex);
1371 1335
@@ -1599,6 +1563,7 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1599 event->data = data; 1563 event->data = data;
1600 event->osdc = osdc; 1564 event->osdc = osdc;
1601 INIT_LIST_HEAD(&event->osd_node); 1565 INIT_LIST_HEAD(&event->osd_node);
1566 RB_CLEAR_NODE(&event->node);
1602 kref_init(&event->kref); /* one ref for us */ 1567 kref_init(&event->kref); /* one ref for us */
1603 kref_get(&event->kref); /* one ref for the caller */ 1568 kref_get(&event->kref); /* one ref for the caller */
1604 init_completion(&event->completion); 1569 init_completion(&event->completion);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 5433fb0eb3c6..de73214b5d26 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -469,6 +469,22 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id)
469 return NULL; 469 return NULL;
470} 470}
471 471
472const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
473{
474 struct ceph_pg_pool_info *pi;
475
476 if (id == CEPH_NOPOOL)
477 return NULL;
478
479 if (WARN_ON_ONCE(id > (u64) INT_MAX))
480 return NULL;
481
482 pi = __lookup_pg_pool(&map->pg_pools, (int) id);
483
484 return pi ? pi->name : NULL;
485}
486EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
487
472int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name) 488int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
473{ 489{
474 struct rb_node *rbp; 490 struct rb_node *rbp;
@@ -645,10 +661,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
645 ceph_decode_32_safe(p, end, max, bad); 661 ceph_decode_32_safe(p, end, max, bad);
646 while (max--) { 662 while (max--) {
647 ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad); 663 ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad);
664 err = -ENOMEM;
648 pi = kzalloc(sizeof(*pi), GFP_NOFS); 665 pi = kzalloc(sizeof(*pi), GFP_NOFS);
649 if (!pi) 666 if (!pi)
650 goto bad; 667 goto bad;
651 pi->id = ceph_decode_32(p); 668 pi->id = ceph_decode_32(p);
669 err = -EINVAL;
652 ev = ceph_decode_8(p); /* encoding version */ 670 ev = ceph_decode_8(p); /* encoding version */
653 if (ev > CEPH_PG_POOL_VERSION) { 671 if (ev > CEPH_PG_POOL_VERSION) {
654 pr_warning("got unknown v %d > %d of ceph_pg_pool\n", 672 pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
@@ -664,8 +682,13 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
664 __insert_pg_pool(&map->pg_pools, pi); 682 __insert_pg_pool(&map->pg_pools, pi);
665 } 683 }
666 684
667 if (version >= 5 && __decode_pool_names(p, end, map) < 0) 685 if (version >= 5) {
668 goto bad; 686 err = __decode_pool_names(p, end, map);
687 if (err < 0) {
688 dout("fail to decode pool names");
689 goto bad;
690 }
691 }
669 692
670 ceph_decode_32_safe(p, end, map->pool_max, bad); 693 ceph_decode_32_safe(p, end, map->pool_max, bad);
671 694
@@ -745,7 +768,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
745 return map; 768 return map;
746 769
747bad: 770bad:
748 dout("osdmap_decode fail\n"); 771 dout("osdmap_decode fail err %d\n", err);
749 ceph_osdmap_destroy(map); 772 ceph_osdmap_destroy(map);
750 return ERR_PTR(err); 773 return ERR_PTR(err);
751} 774}
@@ -839,6 +862,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
839 if (ev > CEPH_PG_POOL_VERSION) { 862 if (ev > CEPH_PG_POOL_VERSION) {
840 pr_warning("got unknown v %d > %d of ceph_pg_pool\n", 863 pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
841 ev, CEPH_PG_POOL_VERSION); 864 ev, CEPH_PG_POOL_VERSION);
865 err = -EINVAL;
842 goto bad; 866 goto bad;
843 } 867 }
844 pi = __lookup_pg_pool(&map->pg_pools, pool); 868 pi = __lookup_pg_pool(&map->pg_pools, pool);
@@ -855,8 +879,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
855 if (err < 0) 879 if (err < 0)
856 goto bad; 880 goto bad;
857 } 881 }
858 if (version >= 5 && __decode_pool_names(p, end, map) < 0) 882 if (version >= 5) {
859 goto bad; 883 err = __decode_pool_names(p, end, map);
884 if (err < 0)
885 goto bad;
886 }
860 887
861 /* old_pool */ 888 /* old_pool */
862 ceph_decode_32_safe(p, end, len, bad); 889 ceph_decode_32_safe(p, end, len, bad);
@@ -932,15 +959,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
932 (void) __remove_pg_mapping(&map->pg_temp, pgid); 959 (void) __remove_pg_mapping(&map->pg_temp, pgid);
933 960
934 /* insert */ 961 /* insert */
935 if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) { 962 err = -EINVAL;
936 err = -EINVAL; 963 if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
937 goto bad; 964 goto bad;
938 } 965 err = -ENOMEM;
939 pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); 966 pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
940 if (!pg) { 967 if (!pg)
941 err = -ENOMEM;
942 goto bad; 968 goto bad;
943 }
944 pg->pgid = pgid; 969 pg->pgid = pgid;
945 pg->len = pglen; 970 pg->len = pglen;
946 for (j = 0; j < pglen; j++) 971 for (j = 0; j < pglen; j++)