aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2013-05-15 16:36:19 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2013-05-15 16:36:19 -0400
commit109c3c0292d5b256bf9e5ca2b591aa9ac5804bc2 (patch)
tree43445158523686b55783e40935513b47f35c9840
parentb973425cbb51e08301b34fecdfd476a44507d8cf (diff)
parent638f5abed3f7d8a7fc24087bd760fa3d99f68a39 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph fixes from Sage Weil: "Yes, this is a much larger pull than I would like after -rc1. There are a few things included: - a few fixes for leaks and incorrect assertions - a few patches fixing behavior when mapped images are resized - handling for cloned/layered images that are flattened out from underneath the client The last bit was non-trivial, and there is some code movement and associated cleanup mixed in. This was ready and was meant to go in last week but I missed the boat on Friday. My only excuse is that I was waiting for an all clear from the testing and there were many other shiny things to distract me. Strictly speaking, handling the flatten case isn't a regression and could wait, so if you like we can try to pull the series apart, but Alex and I would much prefer to have it all in as it is a case real users will hit with 3.10." * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (33 commits) rbd: re-submit flattened write request (part 2) rbd: re-submit write request for flattened clone rbd: re-submit read request for flattened clone rbd: detect when clone image is flattened rbd: reference count parent requests rbd: define parent image request routines rbd: define rbd_dev_unparent() rbd: don't release write request until necessary rbd: get parent info on refresh rbd: ignore zero-overlap parent rbd: support reading parent page data for writes rbd: fix parent request size assumption libceph: init sent and completed when starting rbd: kill rbd_img_request_get() rbd: only set up watch for mapped images rbd: set mapping read-only flag in rbd_add() rbd: support reading parent page data rbd: fix an incorrect assertion condition rbd: define rbd_dev_v2_header_info() rbd: get rid of trivial v1 header wrappers ...
-rw-r--r--drivers/block/rbd.c935
-rw-r--r--net/ceph/osd_client.c5
2 files changed, 553 insertions, 387 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index ca63104136e0..d6d314027b5d 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -55,6 +55,39 @@
55#define SECTOR_SHIFT 9 55#define SECTOR_SHIFT 9
56#define SECTOR_SIZE (1ULL << SECTOR_SHIFT) 56#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
57 57
58/*
59 * Increment the given counter and return its updated value.
60 * If the counter is already 0 it will not be incremented.
61 * If the counter is already at its maximum value returns
62 * -EINVAL without updating it.
63 */
64static int atomic_inc_return_safe(atomic_t *v)
65{
66 unsigned int counter;
67
68 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
69 if (counter <= (unsigned int)INT_MAX)
70 return (int)counter;
71
72 atomic_dec(v);
73
74 return -EINVAL;
75}
76
77/* Decrement the counter. Return the resulting value, or -EINVAL */
78static int atomic_dec_return_safe(atomic_t *v)
79{
80 int counter;
81
82 counter = atomic_dec_return(v);
83 if (counter >= 0)
84 return counter;
85
86 atomic_inc(v);
87
88 return -EINVAL;
89}
90
58#define RBD_DRV_NAME "rbd" 91#define RBD_DRV_NAME "rbd"
59#define RBD_DRV_NAME_LONG "rbd (rados block device)" 92#define RBD_DRV_NAME_LONG "rbd (rados block device)"
60 93
@@ -100,21 +133,20 @@
100 * block device image metadata (in-memory version) 133 * block device image metadata (in-memory version)
101 */ 134 */
102struct rbd_image_header { 135struct rbd_image_header {
103 /* These four fields never change for a given rbd image */ 136 /* These six fields never change for a given rbd image */
104 char *object_prefix; 137 char *object_prefix;
105 u64 features;
106 __u8 obj_order; 138 __u8 obj_order;
107 __u8 crypt_type; 139 __u8 crypt_type;
108 __u8 comp_type; 140 __u8 comp_type;
141 u64 stripe_unit;
142 u64 stripe_count;
143 u64 features; /* Might be changeable someday? */
109 144
110 /* The remaining fields need to be updated occasionally */ 145 /* The remaining fields need to be updated occasionally */
111 u64 image_size; 146 u64 image_size;
112 struct ceph_snap_context *snapc; 147 struct ceph_snap_context *snapc;
113 char *snap_names; 148 char *snap_names; /* format 1 only */
114 u64 *snap_sizes; 149 u64 *snap_sizes; /* format 1 only */
115
116 u64 stripe_unit;
117 u64 stripe_count;
118}; 150};
119 151
120/* 152/*
@@ -225,6 +257,7 @@ struct rbd_obj_request {
225 }; 257 };
226 }; 258 };
227 struct page **copyup_pages; 259 struct page **copyup_pages;
260 u32 copyup_page_count;
228 261
229 struct ceph_osd_request *osd_req; 262 struct ceph_osd_request *osd_req;
230 263
@@ -257,6 +290,7 @@ struct rbd_img_request {
257 struct rbd_obj_request *obj_request; /* obj req initiator */ 290 struct rbd_obj_request *obj_request; /* obj req initiator */
258 }; 291 };
259 struct page **copyup_pages; 292 struct page **copyup_pages;
293 u32 copyup_page_count;
260 spinlock_t completion_lock;/* protects next_completion */ 294 spinlock_t completion_lock;/* protects next_completion */
261 u32 next_completion; 295 u32 next_completion;
262 rbd_img_callback_t callback; 296 rbd_img_callback_t callback;
@@ -311,6 +345,7 @@ struct rbd_device {
311 345
312 struct rbd_spec *parent_spec; 346 struct rbd_spec *parent_spec;
313 u64 parent_overlap; 347 u64 parent_overlap;
348 atomic_t parent_ref;
314 struct rbd_device *parent; 349 struct rbd_device *parent;
315 350
316 /* protects updating the header */ 351 /* protects updating the header */
@@ -359,7 +394,8 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf,
359 size_t count); 394 size_t count);
360static ssize_t rbd_remove(struct bus_type *bus, const char *buf, 395static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
361 size_t count); 396 size_t count);
362static int rbd_dev_image_probe(struct rbd_device *rbd_dev); 397static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
398static void rbd_spec_put(struct rbd_spec *spec);
363 399
364static struct bus_attribute rbd_bus_attrs[] = { 400static struct bus_attribute rbd_bus_attrs[] = {
365 __ATTR(add, S_IWUSR, NULL, rbd_add), 401 __ATTR(add, S_IWUSR, NULL, rbd_add),
@@ -426,7 +462,8 @@ static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
426static void rbd_dev_remove_parent(struct rbd_device *rbd_dev); 462static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
427 463
428static int rbd_dev_refresh(struct rbd_device *rbd_dev); 464static int rbd_dev_refresh(struct rbd_device *rbd_dev);
429static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev); 465static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
466static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
430static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, 467static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
431 u64 snap_id); 468 u64 snap_id);
432static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 469static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
@@ -726,88 +763,123 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
726} 763}
727 764
728/* 765/*
729 * Create a new header structure, translate header format from the on-disk 766 * Fill an rbd image header with information from the given format 1
730 * header. 767 * on-disk header.
731 */ 768 */
732static int rbd_header_from_disk(struct rbd_image_header *header, 769static int rbd_header_from_disk(struct rbd_device *rbd_dev,
733 struct rbd_image_header_ondisk *ondisk) 770 struct rbd_image_header_ondisk *ondisk)
734{ 771{
772 struct rbd_image_header *header = &rbd_dev->header;
773 bool first_time = header->object_prefix == NULL;
774 struct ceph_snap_context *snapc;
775 char *object_prefix = NULL;
776 char *snap_names = NULL;
777 u64 *snap_sizes = NULL;
735 u32 snap_count; 778 u32 snap_count;
736 size_t len;
737 size_t size; 779 size_t size;
780 int ret = -ENOMEM;
738 u32 i; 781 u32 i;
739 782
740 memset(header, 0, sizeof (*header)); 783 /* Allocate this now to avoid having to handle failure below */
741 784
742 snap_count = le32_to_cpu(ondisk->snap_count); 785 if (first_time) {
786 size_t len;
743 787
744 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix)); 788 len = strnlen(ondisk->object_prefix,
745 header->object_prefix = kmalloc(len + 1, GFP_KERNEL); 789 sizeof (ondisk->object_prefix));
746 if (!header->object_prefix) 790 object_prefix = kmalloc(len + 1, GFP_KERNEL);
747 return -ENOMEM; 791 if (!object_prefix)
748 memcpy(header->object_prefix, ondisk->object_prefix, len); 792 return -ENOMEM;
749 header->object_prefix[len] = '\0'; 793 memcpy(object_prefix, ondisk->object_prefix, len);
794 object_prefix[len] = '\0';
795 }
750 796
797 /* Allocate the snapshot context and fill it in */
798
799 snap_count = le32_to_cpu(ondisk->snap_count);
800 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
801 if (!snapc)
802 goto out_err;
803 snapc->seq = le64_to_cpu(ondisk->snap_seq);
751 if (snap_count) { 804 if (snap_count) {
805 struct rbd_image_snap_ondisk *snaps;
752 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len); 806 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
753 807
754 /* Save a copy of the snapshot names */ 808 /* We'll keep a copy of the snapshot names... */
755 809
756 if (snap_names_len > (u64) SIZE_MAX) 810 if (snap_names_len > (u64)SIZE_MAX)
757 return -EIO; 811 goto out_2big;
758 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL); 812 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
759 if (!header->snap_names) 813 if (!snap_names)
760 goto out_err; 814 goto out_err;
815
816 /* ...as well as the array of their sizes. */
817
818 size = snap_count * sizeof (*header->snap_sizes);
819 snap_sizes = kmalloc(size, GFP_KERNEL);
820 if (!snap_sizes)
821 goto out_err;
822
761 /* 823 /*
762 * Note that rbd_dev_v1_header_read() guarantees 824 * Copy the names, and fill in each snapshot's id
763 * the ondisk buffer we're working with has 825 * and size.
826 *
827 * Note that rbd_dev_v1_header_info() guarantees the
828 * ondisk buffer we're working with has
764 * snap_names_len bytes beyond the end of the 829 * snap_names_len bytes beyond the end of the
765 * snapshot id array, this memcpy() is safe. 830 * snapshot id array, this memcpy() is safe.
766 */ 831 */
767 memcpy(header->snap_names, &ondisk->snaps[snap_count], 832 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
768 snap_names_len); 833 snaps = ondisk->snaps;
834 for (i = 0; i < snap_count; i++) {
835 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
836 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
837 }
838 }
769 839
770 /* Record each snapshot's size */ 840 /* We won't fail any more, fill in the header */
771 841
772 size = snap_count * sizeof (*header->snap_sizes); 842 down_write(&rbd_dev->header_rwsem);
773 header->snap_sizes = kmalloc(size, GFP_KERNEL); 843 if (first_time) {
774 if (!header->snap_sizes) 844 header->object_prefix = object_prefix;
775 goto out_err; 845 header->obj_order = ondisk->options.order;
776 for (i = 0; i < snap_count; i++) 846 header->crypt_type = ondisk->options.crypt_type;
777 header->snap_sizes[i] = 847 header->comp_type = ondisk->options.comp_type;
778 le64_to_cpu(ondisk->snaps[i].image_size); 848 /* The rest aren't used for format 1 images */
849 header->stripe_unit = 0;
850 header->stripe_count = 0;
851 header->features = 0;
779 } else { 852 } else {
780 header->snap_names = NULL; 853 ceph_put_snap_context(header->snapc);
781 header->snap_sizes = NULL; 854 kfree(header->snap_names);
855 kfree(header->snap_sizes);
782 } 856 }
783 857
784 header->features = 0; /* No features support in v1 images */ 858 /* The remaining fields always get updated (when we refresh) */
785 header->obj_order = ondisk->options.order;
786 header->crypt_type = ondisk->options.crypt_type;
787 header->comp_type = ondisk->options.comp_type;
788
789 /* Allocate and fill in the snapshot context */
790 859
791 header->image_size = le64_to_cpu(ondisk->image_size); 860 header->image_size = le64_to_cpu(ondisk->image_size);
861 header->snapc = snapc;
862 header->snap_names = snap_names;
863 header->snap_sizes = snap_sizes;
792 864
793 header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); 865 /* Make sure mapping size is consistent with header info */
794 if (!header->snapc)
795 goto out_err;
796 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
797 for (i = 0; i < snap_count; i++)
798 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
799 866
800 return 0; 867 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
868 if (rbd_dev->mapping.size != header->image_size)
869 rbd_dev->mapping.size = header->image_size;
870
871 up_write(&rbd_dev->header_rwsem);
801 872
873 return 0;
874out_2big:
875 ret = -EIO;
802out_err: 876out_err:
803 kfree(header->snap_sizes); 877 kfree(snap_sizes);
804 header->snap_sizes = NULL; 878 kfree(snap_names);
805 kfree(header->snap_names); 879 ceph_put_snap_context(snapc);
806 header->snap_names = NULL; 880 kfree(object_prefix);
807 kfree(header->object_prefix);
808 header->object_prefix = NULL;
809 881
810 return -ENOMEM; 882 return ret;
811} 883}
812 884
813static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which) 885static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
@@ -934,20 +1006,11 @@ static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
934 1006
935static int rbd_dev_mapping_set(struct rbd_device *rbd_dev) 1007static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
936{ 1008{
937 const char *snap_name = rbd_dev->spec->snap_name; 1009 u64 snap_id = rbd_dev->spec->snap_id;
938 u64 snap_id;
939 u64 size = 0; 1010 u64 size = 0;
940 u64 features = 0; 1011 u64 features = 0;
941 int ret; 1012 int ret;
942 1013
943 if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) {
944 snap_id = rbd_snap_id_by_name(rbd_dev, snap_name);
945 if (snap_id == CEPH_NOSNAP)
946 return -ENOENT;
947 } else {
948 snap_id = CEPH_NOSNAP;
949 }
950
951 ret = rbd_snap_size(rbd_dev, snap_id, &size); 1014 ret = rbd_snap_size(rbd_dev, snap_id, &size);
952 if (ret) 1015 if (ret)
953 return ret; 1016 return ret;
@@ -958,11 +1021,6 @@ static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
958 rbd_dev->mapping.size = size; 1021 rbd_dev->mapping.size = size;
959 rbd_dev->mapping.features = features; 1022 rbd_dev->mapping.features = features;
960 1023
961 /* If we are mapping a snapshot it must be marked read-only */
962
963 if (snap_id != CEPH_NOSNAP)
964 rbd_dev->mapping.read_only = true;
965
966 return 0; 1024 return 0;
967} 1025}
968 1026
@@ -970,14 +1028,6 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
970{ 1028{
971 rbd_dev->mapping.size = 0; 1029 rbd_dev->mapping.size = 0;
972 rbd_dev->mapping.features = 0; 1030 rbd_dev->mapping.features = 0;
973 rbd_dev->mapping.read_only = true;
974}
975
976static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
977{
978 rbd_dev->mapping.size = 0;
979 rbd_dev->mapping.features = 0;
980 rbd_dev->mapping.read_only = true;
981} 1031}
982 1032
983static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) 1033static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
@@ -1342,20 +1392,18 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1342 kref_put(&obj_request->kref, rbd_obj_request_destroy); 1392 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1343} 1393}
1344 1394
1345static void rbd_img_request_get(struct rbd_img_request *img_request) 1395static bool img_request_child_test(struct rbd_img_request *img_request);
1346{ 1396static void rbd_parent_request_destroy(struct kref *kref);
1347 dout("%s: img %p (was %d)\n", __func__, img_request,
1348 atomic_read(&img_request->kref.refcount));
1349 kref_get(&img_request->kref);
1350}
1351
1352static void rbd_img_request_destroy(struct kref *kref); 1397static void rbd_img_request_destroy(struct kref *kref);
1353static void rbd_img_request_put(struct rbd_img_request *img_request) 1398static void rbd_img_request_put(struct rbd_img_request *img_request)
1354{ 1399{
1355 rbd_assert(img_request != NULL); 1400 rbd_assert(img_request != NULL);
1356 dout("%s: img %p (was %d)\n", __func__, img_request, 1401 dout("%s: img %p (was %d)\n", __func__, img_request,
1357 atomic_read(&img_request->kref.refcount)); 1402 atomic_read(&img_request->kref.refcount));
1358 kref_put(&img_request->kref, rbd_img_request_destroy); 1403 if (img_request_child_test(img_request))
1404 kref_put(&img_request->kref, rbd_parent_request_destroy);
1405 else
1406 kref_put(&img_request->kref, rbd_img_request_destroy);
1359} 1407}
1360 1408
1361static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, 1409static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
@@ -1472,6 +1520,12 @@ static void img_request_child_set(struct rbd_img_request *img_request)
1472 smp_mb(); 1520 smp_mb();
1473} 1521}
1474 1522
1523static void img_request_child_clear(struct rbd_img_request *img_request)
1524{
1525 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1526 smp_mb();
1527}
1528
1475static bool img_request_child_test(struct rbd_img_request *img_request) 1529static bool img_request_child_test(struct rbd_img_request *img_request)
1476{ 1530{
1477 smp_mb(); 1531 smp_mb();
@@ -1484,6 +1538,12 @@ static void img_request_layered_set(struct rbd_img_request *img_request)
1484 smp_mb(); 1538 smp_mb();
1485} 1539}
1486 1540
1541static void img_request_layered_clear(struct rbd_img_request *img_request)
1542{
1543 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1544 smp_mb();
1545}
1546
1487static bool img_request_layered_test(struct rbd_img_request *img_request) 1547static bool img_request_layered_test(struct rbd_img_request *img_request)
1488{ 1548{
1489 smp_mb(); 1549 smp_mb();
@@ -1827,6 +1887,74 @@ static void rbd_obj_request_destroy(struct kref *kref)
1827 kmem_cache_free(rbd_obj_request_cache, obj_request); 1887 kmem_cache_free(rbd_obj_request_cache, obj_request);
1828} 1888}
1829 1889
1890/* It's OK to call this for a device with no parent */
1891
1892static void rbd_spec_put(struct rbd_spec *spec);
1893static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1894{
1895 rbd_dev_remove_parent(rbd_dev);
1896 rbd_spec_put(rbd_dev->parent_spec);
1897 rbd_dev->parent_spec = NULL;
1898 rbd_dev->parent_overlap = 0;
1899}
1900
1901/*
1902 * Parent image reference counting is used to determine when an
1903 * image's parent fields can be safely torn down--after there are no
1904 * more in-flight requests to the parent image. When the last
1905 * reference is dropped, cleaning them up is safe.
1906 */
1907static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1908{
1909 int counter;
1910
1911 if (!rbd_dev->parent_spec)
1912 return;
1913
1914 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1915 if (counter > 0)
1916 return;
1917
1918 /* Last reference; clean up parent data structures */
1919
1920 if (!counter)
1921 rbd_dev_unparent(rbd_dev);
1922 else
1923 rbd_warn(rbd_dev, "parent reference underflow\n");
1924}
1925
1926/*
1927 * If an image has a non-zero parent overlap, get a reference to its
1928 * parent.
1929 *
1930 * We must get the reference before checking for the overlap to
1931 * coordinate properly with zeroing the parent overlap in
1932 * rbd_dev_v2_parent_info() when an image gets flattened. We
1933 * drop it again if there is no overlap.
1934 *
1935 * Returns true if the rbd device has a parent with a non-zero
1936 * overlap and a reference for it was successfully taken, or
1937 * false otherwise.
1938 */
1939static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1940{
1941 int counter;
1942
1943 if (!rbd_dev->parent_spec)
1944 return false;
1945
1946 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1947 if (counter > 0 && rbd_dev->parent_overlap)
1948 return true;
1949
1950 /* Image was flattened, but parent is not yet torn down */
1951
1952 if (counter < 0)
1953 rbd_warn(rbd_dev, "parent reference overflow\n");
1954
1955 return false;
1956}
1957
1830/* 1958/*
1831 * Caller is responsible for filling in the list of object requests 1959 * Caller is responsible for filling in the list of object requests
1832 * that comprises the image request, and the Linux request pointer 1960 * that comprises the image request, and the Linux request pointer
@@ -1835,8 +1963,7 @@ static void rbd_obj_request_destroy(struct kref *kref)
1835static struct rbd_img_request *rbd_img_request_create( 1963static struct rbd_img_request *rbd_img_request_create(
1836 struct rbd_device *rbd_dev, 1964 struct rbd_device *rbd_dev,
1837 u64 offset, u64 length, 1965 u64 offset, u64 length,
1838 bool write_request, 1966 bool write_request)
1839 bool child_request)
1840{ 1967{
1841 struct rbd_img_request *img_request; 1968 struct rbd_img_request *img_request;
1842 1969
@@ -1861,9 +1988,7 @@ static struct rbd_img_request *rbd_img_request_create(
1861 } else { 1988 } else {
1862 img_request->snap_id = rbd_dev->spec->snap_id; 1989 img_request->snap_id = rbd_dev->spec->snap_id;
1863 } 1990 }
1864 if (child_request) 1991 if (rbd_dev_parent_get(rbd_dev))
1865 img_request_child_set(img_request);
1866 if (rbd_dev->parent_spec)
1867 img_request_layered_set(img_request); 1992 img_request_layered_set(img_request);
1868 spin_lock_init(&img_request->completion_lock); 1993 spin_lock_init(&img_request->completion_lock);
1869 img_request->next_completion = 0; 1994 img_request->next_completion = 0;
@@ -1873,9 +1998,6 @@ static struct rbd_img_request *rbd_img_request_create(
1873 INIT_LIST_HEAD(&img_request->obj_requests); 1998 INIT_LIST_HEAD(&img_request->obj_requests);
1874 kref_init(&img_request->kref); 1999 kref_init(&img_request->kref);
1875 2000
1876 rbd_img_request_get(img_request); /* Avoid a warning */
1877 rbd_img_request_put(img_request); /* TEMPORARY */
1878
1879 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev, 2001 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1880 write_request ? "write" : "read", offset, length, 2002 write_request ? "write" : "read", offset, length,
1881 img_request); 2003 img_request);
@@ -1897,15 +2019,54 @@ static void rbd_img_request_destroy(struct kref *kref)
1897 rbd_img_obj_request_del(img_request, obj_request); 2019 rbd_img_obj_request_del(img_request, obj_request);
1898 rbd_assert(img_request->obj_request_count == 0); 2020 rbd_assert(img_request->obj_request_count == 0);
1899 2021
2022 if (img_request_layered_test(img_request)) {
2023 img_request_layered_clear(img_request);
2024 rbd_dev_parent_put(img_request->rbd_dev);
2025 }
2026
1900 if (img_request_write_test(img_request)) 2027 if (img_request_write_test(img_request))
1901 ceph_put_snap_context(img_request->snapc); 2028 ceph_put_snap_context(img_request->snapc);
1902 2029
1903 if (img_request_child_test(img_request))
1904 rbd_obj_request_put(img_request->obj_request);
1905
1906 kmem_cache_free(rbd_img_request_cache, img_request); 2030 kmem_cache_free(rbd_img_request_cache, img_request);
1907} 2031}
1908 2032
2033static struct rbd_img_request *rbd_parent_request_create(
2034 struct rbd_obj_request *obj_request,
2035 u64 img_offset, u64 length)
2036{
2037 struct rbd_img_request *parent_request;
2038 struct rbd_device *rbd_dev;
2039
2040 rbd_assert(obj_request->img_request);
2041 rbd_dev = obj_request->img_request->rbd_dev;
2042
2043 parent_request = rbd_img_request_create(rbd_dev->parent,
2044 img_offset, length, false);
2045 if (!parent_request)
2046 return NULL;
2047
2048 img_request_child_set(parent_request);
2049 rbd_obj_request_get(obj_request);
2050 parent_request->obj_request = obj_request;
2051
2052 return parent_request;
2053}
2054
2055static void rbd_parent_request_destroy(struct kref *kref)
2056{
2057 struct rbd_img_request *parent_request;
2058 struct rbd_obj_request *orig_request;
2059
2060 parent_request = container_of(kref, struct rbd_img_request, kref);
2061 orig_request = parent_request->obj_request;
2062
2063 parent_request->obj_request = NULL;
2064 rbd_obj_request_put(orig_request);
2065 img_request_child_clear(parent_request);
2066
2067 rbd_img_request_destroy(kref);
2068}
2069
1909static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) 2070static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1910{ 2071{
1911 struct rbd_img_request *img_request; 2072 struct rbd_img_request *img_request;
@@ -2114,7 +2275,7 @@ rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2114{ 2275{
2115 struct rbd_img_request *img_request; 2276 struct rbd_img_request *img_request;
2116 struct rbd_device *rbd_dev; 2277 struct rbd_device *rbd_dev;
2117 u64 length; 2278 struct page **pages;
2118 u32 page_count; 2279 u32 page_count;
2119 2280
2120 rbd_assert(obj_request->type == OBJ_REQUEST_BIO); 2281 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
@@ -2124,12 +2285,14 @@ rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2124 2285
2125 rbd_dev = img_request->rbd_dev; 2286 rbd_dev = img_request->rbd_dev;
2126 rbd_assert(rbd_dev); 2287 rbd_assert(rbd_dev);
2127 length = (u64)1 << rbd_dev->header.obj_order;
2128 page_count = (u32)calc_pages_for(0, length);
2129 2288
2130 rbd_assert(obj_request->copyup_pages); 2289 pages = obj_request->copyup_pages;
2131 ceph_release_page_vector(obj_request->copyup_pages, page_count); 2290 rbd_assert(pages != NULL);
2132 obj_request->copyup_pages = NULL; 2291 obj_request->copyup_pages = NULL;
2292 page_count = obj_request->copyup_page_count;
2293 rbd_assert(page_count);
2294 obj_request->copyup_page_count = 0;
2295 ceph_release_page_vector(pages, page_count);
2133 2296
2134 /* 2297 /*
2135 * We want the transfer count to reflect the size of the 2298 * We want the transfer count to reflect the size of the
@@ -2153,9 +2316,11 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2153 struct ceph_osd_client *osdc; 2316 struct ceph_osd_client *osdc;
2154 struct rbd_device *rbd_dev; 2317 struct rbd_device *rbd_dev;
2155 struct page **pages; 2318 struct page **pages;
2156 int result; 2319 u32 page_count;
2157 u64 obj_size; 2320 int img_result;
2158 u64 xferred; 2321 u64 parent_length;
2322 u64 offset;
2323 u64 length;
2159 2324
2160 rbd_assert(img_request_child_test(img_request)); 2325 rbd_assert(img_request_child_test(img_request));
2161 2326
@@ -2164,46 +2329,74 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2164 pages = img_request->copyup_pages; 2329 pages = img_request->copyup_pages;
2165 rbd_assert(pages != NULL); 2330 rbd_assert(pages != NULL);
2166 img_request->copyup_pages = NULL; 2331 img_request->copyup_pages = NULL;
2332 page_count = img_request->copyup_page_count;
2333 rbd_assert(page_count);
2334 img_request->copyup_page_count = 0;
2167 2335
2168 orig_request = img_request->obj_request; 2336 orig_request = img_request->obj_request;
2169 rbd_assert(orig_request != NULL); 2337 rbd_assert(orig_request != NULL);
2170 rbd_assert(orig_request->type == OBJ_REQUEST_BIO); 2338 rbd_assert(obj_request_type_valid(orig_request->type));
2171 result = img_request->result; 2339 img_result = img_request->result;
2172 obj_size = img_request->length; 2340 parent_length = img_request->length;
2173 xferred = img_request->xferred; 2341 rbd_assert(parent_length == img_request->xferred);
2342 rbd_img_request_put(img_request);
2174 2343
2175 rbd_dev = img_request->rbd_dev; 2344 rbd_assert(orig_request->img_request);
2345 rbd_dev = orig_request->img_request->rbd_dev;
2176 rbd_assert(rbd_dev); 2346 rbd_assert(rbd_dev);
2177 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2178 2347
2179 rbd_img_request_put(img_request); 2348 /*
2349 * If the overlap has become 0 (most likely because the
2350 * image has been flattened) we need to free the pages
2351 * and re-submit the original write request.
2352 */
2353 if (!rbd_dev->parent_overlap) {
2354 struct ceph_osd_client *osdc;
2180 2355
2181 if (result) 2356 ceph_release_page_vector(pages, page_count);
2182 goto out_err; 2357 osdc = &rbd_dev->rbd_client->client->osdc;
2358 img_result = rbd_obj_request_submit(osdc, orig_request);
2359 if (!img_result)
2360 return;
2361 }
2183 2362
2184 /* Allocate the new copyup osd request for the original request */ 2363 if (img_result)
2364 goto out_err;
2185 2365
2186 result = -ENOMEM; 2366 /*
2187 rbd_assert(!orig_request->osd_req); 2367 * The original osd request is of no use to use any more.
2368 * We need a new one that can hold the two ops in a copyup
2369 * request. Allocate the new copyup osd request for the
2370 * original request, and release the old one.
2371 */
2372 img_result = -ENOMEM;
2188 osd_req = rbd_osd_req_create_copyup(orig_request); 2373 osd_req = rbd_osd_req_create_copyup(orig_request);
2189 if (!osd_req) 2374 if (!osd_req)
2190 goto out_err; 2375 goto out_err;
2376 rbd_osd_req_destroy(orig_request->osd_req);
2191 orig_request->osd_req = osd_req; 2377 orig_request->osd_req = osd_req;
2192 orig_request->copyup_pages = pages; 2378 orig_request->copyup_pages = pages;
2379 orig_request->copyup_page_count = page_count;
2193 2380
2194 /* Initialize the copyup op */ 2381 /* Initialize the copyup op */
2195 2382
2196 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup"); 2383 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2197 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0, 2384 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2198 false, false); 2385 false, false);
2199 2386
2200 /* Then the original write request op */ 2387 /* Then the original write request op */
2201 2388
2389 offset = orig_request->offset;
2390 length = orig_request->length;
2202 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE, 2391 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2203 orig_request->offset, 2392 offset, length, 0, 0);
2204 orig_request->length, 0, 0); 2393 if (orig_request->type == OBJ_REQUEST_BIO)
2205 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list, 2394 osd_req_op_extent_osd_data_bio(osd_req, 1,
2206 orig_request->length); 2395 orig_request->bio_list, length);
2396 else
2397 osd_req_op_extent_osd_data_pages(osd_req, 1,
2398 orig_request->pages, length,
2399 offset & ~PAGE_MASK, false, false);
2207 2400
2208 rbd_osd_req_format_write(orig_request); 2401 rbd_osd_req_format_write(orig_request);
2209 2402
@@ -2211,13 +2404,13 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2211 2404
2212 orig_request->callback = rbd_img_obj_copyup_callback; 2405 orig_request->callback = rbd_img_obj_copyup_callback;
2213 osdc = &rbd_dev->rbd_client->client->osdc; 2406 osdc = &rbd_dev->rbd_client->client->osdc;
2214 result = rbd_obj_request_submit(osdc, orig_request); 2407 img_result = rbd_obj_request_submit(osdc, orig_request);
2215 if (!result) 2408 if (!img_result)
2216 return; 2409 return;
2217out_err: 2410out_err:
2218 /* Record the error code and complete the request */ 2411 /* Record the error code and complete the request */
2219 2412
2220 orig_request->result = result; 2413 orig_request->result = img_result;
2221 orig_request->xferred = 0; 2414 orig_request->xferred = 0;
2222 obj_request_done_set(orig_request); 2415 obj_request_done_set(orig_request);
2223 rbd_obj_request_complete(orig_request); 2416 rbd_obj_request_complete(orig_request);
@@ -2249,7 +2442,7 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2249 int result; 2442 int result;
2250 2443
2251 rbd_assert(obj_request_img_data_test(obj_request)); 2444 rbd_assert(obj_request_img_data_test(obj_request));
2252 rbd_assert(obj_request->type == OBJ_REQUEST_BIO); 2445 rbd_assert(obj_request_type_valid(obj_request->type));
2253 2446
2254 img_request = obj_request->img_request; 2447 img_request = obj_request->img_request;
2255 rbd_assert(img_request != NULL); 2448 rbd_assert(img_request != NULL);
@@ -2257,15 +2450,6 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2257 rbd_assert(rbd_dev->parent != NULL); 2450 rbd_assert(rbd_dev->parent != NULL);
2258 2451
2259 /* 2452 /*
2260 * First things first. The original osd request is of no
2261 * use to use any more, we'll need a new one that can hold
2262 * the two ops in a copyup request. We'll get that later,
2263 * but for now we can release the old one.
2264 */
2265 rbd_osd_req_destroy(obj_request->osd_req);
2266 obj_request->osd_req = NULL;
2267
2268 /*
2269 * Determine the byte range covered by the object in the 2453 * Determine the byte range covered by the object in the
2270 * child image to which the original request was to be sent. 2454 * child image to which the original request was to be sent.
2271 */ 2455 */
@@ -2295,18 +2479,16 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2295 } 2479 }
2296 2480
2297 result = -ENOMEM; 2481 result = -ENOMEM;
2298 parent_request = rbd_img_request_create(rbd_dev->parent, 2482 parent_request = rbd_parent_request_create(obj_request,
2299 img_offset, length, 2483 img_offset, length);
2300 false, true);
2301 if (!parent_request) 2484 if (!parent_request)
2302 goto out_err; 2485 goto out_err;
2303 rbd_obj_request_get(obj_request);
2304 parent_request->obj_request = obj_request;
2305 2486
2306 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); 2487 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2307 if (result) 2488 if (result)
2308 goto out_err; 2489 goto out_err;
2309 parent_request->copyup_pages = pages; 2490 parent_request->copyup_pages = pages;
2491 parent_request->copyup_page_count = page_count;
2310 2492
2311 parent_request->callback = rbd_img_obj_parent_read_full_callback; 2493 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2312 result = rbd_img_request_submit(parent_request); 2494 result = rbd_img_request_submit(parent_request);
@@ -2314,6 +2496,7 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2314 return 0; 2496 return 0;
2315 2497
2316 parent_request->copyup_pages = NULL; 2498 parent_request->copyup_pages = NULL;
2499 parent_request->copyup_page_count = 0;
2317 parent_request->obj_request = NULL; 2500 parent_request->obj_request = NULL;
2318 rbd_obj_request_put(obj_request); 2501 rbd_obj_request_put(obj_request);
2319out_err: 2502out_err:
@@ -2331,6 +2514,7 @@ out_err:
2331static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) 2514static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2332{ 2515{
2333 struct rbd_obj_request *orig_request; 2516 struct rbd_obj_request *orig_request;
2517 struct rbd_device *rbd_dev;
2334 int result; 2518 int result;
2335 2519
2336 rbd_assert(!obj_request_img_data_test(obj_request)); 2520 rbd_assert(!obj_request_img_data_test(obj_request));
@@ -2353,8 +2537,21 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2353 obj_request->xferred, obj_request->length); 2537 obj_request->xferred, obj_request->length);
2354 rbd_obj_request_put(obj_request); 2538 rbd_obj_request_put(obj_request);
2355 2539
2356 rbd_assert(orig_request); 2540 /*
2357 rbd_assert(orig_request->img_request); 2541 * If the overlap has become 0 (most likely because the
2542 * image has been flattened) we need to free the pages
2543 * and re-submit the original write request.
2544 */
2545 rbd_dev = orig_request->img_request->rbd_dev;
2546 if (!rbd_dev->parent_overlap) {
2547 struct ceph_osd_client *osdc;
2548
2549 rbd_obj_request_put(orig_request);
2550 osdc = &rbd_dev->rbd_client->client->osdc;
2551 result = rbd_obj_request_submit(osdc, orig_request);
2552 if (!result)
2553 return;
2554 }
2358 2555
2359 /* 2556 /*
2360 * Our only purpose here is to determine whether the object 2557 * Our only purpose here is to determine whether the object
@@ -2512,14 +2709,36 @@ static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2512 struct rbd_obj_request *obj_request; 2709 struct rbd_obj_request *obj_request;
2513 struct rbd_device *rbd_dev; 2710 struct rbd_device *rbd_dev;
2514 u64 obj_end; 2711 u64 obj_end;
2712 u64 img_xferred;
2713 int img_result;
2515 2714
2516 rbd_assert(img_request_child_test(img_request)); 2715 rbd_assert(img_request_child_test(img_request));
2517 2716
2717 /* First get what we need from the image request and release it */
2718
2518 obj_request = img_request->obj_request; 2719 obj_request = img_request->obj_request;
2720 img_xferred = img_request->xferred;
2721 img_result = img_request->result;
2722 rbd_img_request_put(img_request);
2723
2724 /*
2725 * If the overlap has become 0 (most likely because the
2726 * image has been flattened) we need to re-submit the
2727 * original request.
2728 */
2519 rbd_assert(obj_request); 2729 rbd_assert(obj_request);
2520 rbd_assert(obj_request->img_request); 2730 rbd_assert(obj_request->img_request);
2731 rbd_dev = obj_request->img_request->rbd_dev;
2732 if (!rbd_dev->parent_overlap) {
2733 struct ceph_osd_client *osdc;
2734
2735 osdc = &rbd_dev->rbd_client->client->osdc;
2736 img_result = rbd_obj_request_submit(osdc, obj_request);
2737 if (!img_result)
2738 return;
2739 }
2521 2740
2522 obj_request->result = img_request->result; 2741 obj_request->result = img_result;
2523 if (obj_request->result) 2742 if (obj_request->result)
2524 goto out; 2743 goto out;
2525 2744
@@ -2532,7 +2751,6 @@ static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2532 */ 2751 */
2533 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length); 2752 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2534 obj_end = obj_request->img_offset + obj_request->length; 2753 obj_end = obj_request->img_offset + obj_request->length;
2535 rbd_dev = obj_request->img_request->rbd_dev;
2536 if (obj_end > rbd_dev->parent_overlap) { 2754 if (obj_end > rbd_dev->parent_overlap) {
2537 u64 xferred = 0; 2755 u64 xferred = 0;
2538 2756
@@ -2540,43 +2758,39 @@ static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2540 xferred = rbd_dev->parent_overlap - 2758 xferred = rbd_dev->parent_overlap -
2541 obj_request->img_offset; 2759 obj_request->img_offset;
2542 2760
2543 obj_request->xferred = min(img_request->xferred, xferred); 2761 obj_request->xferred = min(img_xferred, xferred);
2544 } else { 2762 } else {
2545 obj_request->xferred = img_request->xferred; 2763 obj_request->xferred = img_xferred;
2546 } 2764 }
2547out: 2765out:
2548 rbd_img_request_put(img_request);
2549 rbd_img_obj_request_read_callback(obj_request); 2766 rbd_img_obj_request_read_callback(obj_request);
2550 rbd_obj_request_complete(obj_request); 2767 rbd_obj_request_complete(obj_request);
2551} 2768}
2552 2769
2553static void rbd_img_parent_read(struct rbd_obj_request *obj_request) 2770static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2554{ 2771{
2555 struct rbd_device *rbd_dev;
2556 struct rbd_img_request *img_request; 2772 struct rbd_img_request *img_request;
2557 int result; 2773 int result;
2558 2774
2559 rbd_assert(obj_request_img_data_test(obj_request)); 2775 rbd_assert(obj_request_img_data_test(obj_request));
2560 rbd_assert(obj_request->img_request != NULL); 2776 rbd_assert(obj_request->img_request != NULL);
2561 rbd_assert(obj_request->result == (s32) -ENOENT); 2777 rbd_assert(obj_request->result == (s32) -ENOENT);
2562 rbd_assert(obj_request->type == OBJ_REQUEST_BIO); 2778 rbd_assert(obj_request_type_valid(obj_request->type));
2563 2779
2564 rbd_dev = obj_request->img_request->rbd_dev;
2565 rbd_assert(rbd_dev->parent != NULL);
2566 /* rbd_read_finish(obj_request, obj_request->length); */ 2780 /* rbd_read_finish(obj_request, obj_request->length); */
2567 img_request = rbd_img_request_create(rbd_dev->parent, 2781 img_request = rbd_parent_request_create(obj_request,
2568 obj_request->img_offset, 2782 obj_request->img_offset,
2569 obj_request->length, 2783 obj_request->length);
2570 false, true);
2571 result = -ENOMEM; 2784 result = -ENOMEM;
2572 if (!img_request) 2785 if (!img_request)
2573 goto out_err; 2786 goto out_err;
2574 2787
2575 rbd_obj_request_get(obj_request); 2788 if (obj_request->type == OBJ_REQUEST_BIO)
2576 img_request->obj_request = obj_request; 2789 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2577 2790 obj_request->bio_list);
2578 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, 2791 else
2579 obj_request->bio_list); 2792 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2793 obj_request->pages);
2580 if (result) 2794 if (result)
2581 goto out_err; 2795 goto out_err;
2582 2796
@@ -2626,6 +2840,7 @@ out:
2626static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) 2840static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2627{ 2841{
2628 struct rbd_device *rbd_dev = (struct rbd_device *)data; 2842 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2843 int ret;
2629 2844
2630 if (!rbd_dev) 2845 if (!rbd_dev)
2631 return; 2846 return;
@@ -2633,7 +2848,9 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2633 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__, 2848 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2634 rbd_dev->header_name, (unsigned long long)notify_id, 2849 rbd_dev->header_name, (unsigned long long)notify_id,
2635 (unsigned int)opcode); 2850 (unsigned int)opcode);
2636 (void)rbd_dev_refresh(rbd_dev); 2851 ret = rbd_dev_refresh(rbd_dev);
2852 if (ret)
2853 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2637 2854
2638 rbd_obj_notify_ack(rbd_dev, notify_id); 2855 rbd_obj_notify_ack(rbd_dev, notify_id);
2639} 2856}
@@ -2642,7 +2859,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2642 * Request sync osd watch/unwatch. The value of "start" determines 2859 * Request sync osd watch/unwatch. The value of "start" determines
2643 * whether a watch request is being initiated or torn down. 2860 * whether a watch request is being initiated or torn down.
2644 */ 2861 */
2645static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start) 2862static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2646{ 2863{
2647 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2864 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2648 struct rbd_obj_request *obj_request; 2865 struct rbd_obj_request *obj_request;
@@ -2676,7 +2893,7 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2676 rbd_dev->watch_request->osd_req); 2893 rbd_dev->watch_request->osd_req);
2677 2894
2678 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, 2895 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2679 rbd_dev->watch_event->cookie, 0, start); 2896 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2680 rbd_osd_req_format_write(obj_request); 2897 rbd_osd_req_format_write(obj_request);
2681 2898
2682 ret = rbd_obj_request_submit(osdc, obj_request); 2899 ret = rbd_obj_request_submit(osdc, obj_request);
@@ -2869,9 +3086,16 @@ static void rbd_request_fn(struct request_queue *q)
2869 goto end_request; /* Shouldn't happen */ 3086 goto end_request; /* Shouldn't happen */
2870 } 3087 }
2871 3088
3089 result = -EIO;
3090 if (offset + length > rbd_dev->mapping.size) {
3091 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3092 offset, length, rbd_dev->mapping.size);
3093 goto end_request;
3094 }
3095
2872 result = -ENOMEM; 3096 result = -ENOMEM;
2873 img_request = rbd_img_request_create(rbd_dev, offset, length, 3097 img_request = rbd_img_request_create(rbd_dev, offset, length,
2874 write_request, false); 3098 write_request);
2875 if (!img_request) 3099 if (!img_request)
2876 goto end_request; 3100 goto end_request;
2877 3101
@@ -3022,17 +3246,11 @@ out:
3022} 3246}
3023 3247
3024/* 3248/*
3025 * Read the complete header for the given rbd device. 3249 * Read the complete header for the given rbd device. On successful
3026 * 3250 * return, the rbd_dev->header field will contain up-to-date
3027 * Returns a pointer to a dynamically-allocated buffer containing 3251 * information about the image.
3028 * the complete and validated header. Caller can pass the address
3029 * of a variable that will be filled in with the version of the
3030 * header object at the time it was read.
3031 *
3032 * Returns a pointer-coded errno if a failure occurs.
3033 */ 3252 */
3034static struct rbd_image_header_ondisk * 3253static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3035rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3036{ 3254{
3037 struct rbd_image_header_ondisk *ondisk = NULL; 3255 struct rbd_image_header_ondisk *ondisk = NULL;
3038 u32 snap_count = 0; 3256 u32 snap_count = 0;
@@ -3057,22 +3275,22 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3057 size += names_size; 3275 size += names_size;
3058 ondisk = kmalloc(size, GFP_KERNEL); 3276 ondisk = kmalloc(size, GFP_KERNEL);
3059 if (!ondisk) 3277 if (!ondisk)
3060 return ERR_PTR(-ENOMEM); 3278 return -ENOMEM;
3061 3279
3062 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name, 3280 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3063 0, size, ondisk); 3281 0, size, ondisk);
3064 if (ret < 0) 3282 if (ret < 0)
3065 goto out_err; 3283 goto out;
3066 if ((size_t)ret < size) { 3284 if ((size_t)ret < size) {
3067 ret = -ENXIO; 3285 ret = -ENXIO;
3068 rbd_warn(rbd_dev, "short header read (want %zd got %d)", 3286 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3069 size, ret); 3287 size, ret);
3070 goto out_err; 3288 goto out;
3071 } 3289 }
3072 if (!rbd_dev_ondisk_valid(ondisk)) { 3290 if (!rbd_dev_ondisk_valid(ondisk)) {
3073 ret = -ENXIO; 3291 ret = -ENXIO;
3074 rbd_warn(rbd_dev, "invalid header"); 3292 rbd_warn(rbd_dev, "invalid header");
3075 goto out_err; 3293 goto out;
3076 } 3294 }
3077 3295
3078 names_size = le64_to_cpu(ondisk->snap_names_len); 3296 names_size = le64_to_cpu(ondisk->snap_names_len);
@@ -3080,85 +3298,13 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3080 snap_count = le32_to_cpu(ondisk->snap_count); 3298 snap_count = le32_to_cpu(ondisk->snap_count);
3081 } while (snap_count != want_count); 3299 } while (snap_count != want_count);
3082 3300
3083 return ondisk; 3301 ret = rbd_header_from_disk(rbd_dev, ondisk);
3084 3302out:
3085out_err:
3086 kfree(ondisk);
3087
3088 return ERR_PTR(ret);
3089}
3090
3091/*
3092 * reload the ondisk the header
3093 */
3094static int rbd_read_header(struct rbd_device *rbd_dev,
3095 struct rbd_image_header *header)
3096{
3097 struct rbd_image_header_ondisk *ondisk;
3098 int ret;
3099
3100 ondisk = rbd_dev_v1_header_read(rbd_dev);
3101 if (IS_ERR(ondisk))
3102 return PTR_ERR(ondisk);
3103 ret = rbd_header_from_disk(header, ondisk);
3104 kfree(ondisk); 3303 kfree(ondisk);
3105 3304
3106 return ret; 3305 return ret;
3107} 3306}
3108 3307
3109static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3110{
3111 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3112 return;
3113
3114 if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3115 sector_t size;
3116
3117 rbd_dev->mapping.size = rbd_dev->header.image_size;
3118 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3119 dout("setting size to %llu sectors", (unsigned long long)size);
3120 set_capacity(rbd_dev->disk, size);
3121 }
3122}
3123
3124/*
3125 * only read the first part of the ondisk header, without the snaps info
3126 */
3127static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3128{
3129 int ret;
3130 struct rbd_image_header h;
3131
3132 ret = rbd_read_header(rbd_dev, &h);
3133 if (ret < 0)
3134 return ret;
3135
3136 down_write(&rbd_dev->header_rwsem);
3137
3138 /* Update image size, and check for resize of mapped image */
3139 rbd_dev->header.image_size = h.image_size;
3140 rbd_update_mapping_size(rbd_dev);
3141
3142 /* rbd_dev->header.object_prefix shouldn't change */
3143 kfree(rbd_dev->header.snap_sizes);
3144 kfree(rbd_dev->header.snap_names);
3145 /* osd requests may still refer to snapc */
3146 ceph_put_snap_context(rbd_dev->header.snapc);
3147
3148 rbd_dev->header.image_size = h.image_size;
3149 rbd_dev->header.snapc = h.snapc;
3150 rbd_dev->header.snap_names = h.snap_names;
3151 rbd_dev->header.snap_sizes = h.snap_sizes;
3152 /* Free the extra copy of the object prefix */
3153 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3154 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3155 kfree(h.object_prefix);
3156
3157 up_write(&rbd_dev->header_rwsem);
3158
3159 return ret;
3160}
3161
3162/* 3308/*
3163 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to 3309 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3164 * has disappeared from the (just updated) snapshot context. 3310 * has disappeared from the (just updated) snapshot context.
@@ -3180,26 +3326,29 @@ static void rbd_exists_validate(struct rbd_device *rbd_dev)
3180 3326
3181static int rbd_dev_refresh(struct rbd_device *rbd_dev) 3327static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3182{ 3328{
3183 u64 image_size; 3329 u64 mapping_size;
3184 int ret; 3330 int ret;
3185 3331
3186 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 3332 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3187 image_size = rbd_dev->header.image_size; 3333 mapping_size = rbd_dev->mapping.size;
3188 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 3334 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3189 if (rbd_dev->image_format == 1) 3335 if (rbd_dev->image_format == 1)
3190 ret = rbd_dev_v1_refresh(rbd_dev); 3336 ret = rbd_dev_v1_header_info(rbd_dev);
3191 else 3337 else
3192 ret = rbd_dev_v2_refresh(rbd_dev); 3338 ret = rbd_dev_v2_header_info(rbd_dev);
3193 3339
3194 /* If it's a mapped snapshot, validate its EXISTS flag */ 3340 /* If it's a mapped snapshot, validate its EXISTS flag */
3195 3341
3196 rbd_exists_validate(rbd_dev); 3342 rbd_exists_validate(rbd_dev);
3197 mutex_unlock(&ctl_mutex); 3343 mutex_unlock(&ctl_mutex);
3198 if (ret) 3344 if (mapping_size != rbd_dev->mapping.size) {
3199 rbd_warn(rbd_dev, "got notification but failed to " 3345 sector_t size;
3200 " update snaps: %d\n", ret); 3346
3201 if (image_size != rbd_dev->header.image_size) 3347 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3348 dout("setting size to %llu sectors", (unsigned long long)size);
3349 set_capacity(rbd_dev->disk, size);
3202 revalidate_disk(rbd_dev->disk); 3350 revalidate_disk(rbd_dev->disk);
3351 }
3203 3352
3204 return ret; 3353 return ret;
3205} 3354}
@@ -3403,6 +3552,8 @@ static ssize_t rbd_image_refresh(struct device *dev,
3403 int ret; 3552 int ret;
3404 3553
3405 ret = rbd_dev_refresh(rbd_dev); 3554 ret = rbd_dev_refresh(rbd_dev);
3555 if (ret)
3556 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3406 3557
3407 return ret < 0 ? ret : size; 3558 return ret < 0 ? ret : size;
3408} 3559}
@@ -3501,6 +3652,7 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3501 3652
3502 spin_lock_init(&rbd_dev->lock); 3653 spin_lock_init(&rbd_dev->lock);
3503 rbd_dev->flags = 0; 3654 rbd_dev->flags = 0;
3655 atomic_set(&rbd_dev->parent_ref, 0);
3504 INIT_LIST_HEAD(&rbd_dev->node); 3656 INIT_LIST_HEAD(&rbd_dev->node);
3505 init_rwsem(&rbd_dev->header_rwsem); 3657 init_rwsem(&rbd_dev->header_rwsem);
3506 3658
@@ -3650,6 +3802,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3650 __le64 snapid; 3802 __le64 snapid;
3651 void *p; 3803 void *p;
3652 void *end; 3804 void *end;
3805 u64 pool_id;
3653 char *image_id; 3806 char *image_id;
3654 u64 overlap; 3807 u64 overlap;
3655 int ret; 3808 int ret;
@@ -3680,18 +3833,37 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3680 p = reply_buf; 3833 p = reply_buf;
3681 end = reply_buf + ret; 3834 end = reply_buf + ret;
3682 ret = -ERANGE; 3835 ret = -ERANGE;
3683 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err); 3836 ceph_decode_64_safe(&p, end, pool_id, out_err);
3684 if (parent_spec->pool_id == CEPH_NOPOOL) 3837 if (pool_id == CEPH_NOPOOL) {
3838 /*
3839 * Either the parent never existed, or we have
3840 * record of it but the image got flattened so it no
3841 * longer has a parent. When the parent of a
3842 * layered image disappears we immediately set the
3843 * overlap to 0. The effect of this is that all new
3844 * requests will be treated as if the image had no
3845 * parent.
3846 */
3847 if (rbd_dev->parent_overlap) {
3848 rbd_dev->parent_overlap = 0;
3849 smp_mb();
3850 rbd_dev_parent_put(rbd_dev);
3851 pr_info("%s: clone image has been flattened\n",
3852 rbd_dev->disk->disk_name);
3853 }
3854
3685 goto out; /* No parent? No problem. */ 3855 goto out; /* No parent? No problem. */
3856 }
3686 3857
3687 /* The ceph file layout needs to fit pool id in 32 bits */ 3858 /* The ceph file layout needs to fit pool id in 32 bits */
3688 3859
3689 ret = -EIO; 3860 ret = -EIO;
3690 if (parent_spec->pool_id > (u64)U32_MAX) { 3861 if (pool_id > (u64)U32_MAX) {
3691 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n", 3862 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3692 (unsigned long long)parent_spec->pool_id, U32_MAX); 3863 (unsigned long long)pool_id, U32_MAX);
3693 goto out_err; 3864 goto out_err;
3694 } 3865 }
3866 parent_spec->pool_id = pool_id;
3695 3867
3696 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 3868 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3697 if (IS_ERR(image_id)) { 3869 if (IS_ERR(image_id)) {
@@ -3702,9 +3874,14 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3702 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err); 3874 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3703 ceph_decode_64_safe(&p, end, overlap, out_err); 3875 ceph_decode_64_safe(&p, end, overlap, out_err);
3704 3876
3705 rbd_dev->parent_overlap = overlap; 3877 if (overlap) {
3706 rbd_dev->parent_spec = parent_spec; 3878 rbd_spec_put(rbd_dev->parent_spec);
3707 parent_spec = NULL; /* rbd_dev now owns this */ 3879 rbd_dev->parent_spec = parent_spec;
3880 parent_spec = NULL; /* rbd_dev now owns this */
3881 rbd_dev->parent_overlap = overlap;
3882 } else {
3883 rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
3884 }
3708out: 3885out:
3709 ret = 0; 3886 ret = 0;
3710out_err: 3887out_err:
@@ -4002,6 +4179,7 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4002 for (i = 0; i < snap_count; i++) 4179 for (i = 0; i < snap_count; i++)
4003 snapc->snaps[i] = ceph_decode_64(&p); 4180 snapc->snaps[i] = ceph_decode_64(&p);
4004 4181
4182 ceph_put_snap_context(rbd_dev->header.snapc);
4005 rbd_dev->header.snapc = snapc; 4183 rbd_dev->header.snapc = snapc;
4006 4184
4007 dout(" snap context seq = %llu, snap_count = %u\n", 4185 dout(" snap context seq = %llu, snap_count = %u\n",
@@ -4053,21 +4231,56 @@ out:
4053 return snap_name; 4231 return snap_name;
4054} 4232}
4055 4233
4056static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev) 4234static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4057{ 4235{
4236 bool first_time = rbd_dev->header.object_prefix == NULL;
4058 int ret; 4237 int ret;
4059 4238
4060 down_write(&rbd_dev->header_rwsem); 4239 down_write(&rbd_dev->header_rwsem);
4061 4240
4241 if (first_time) {
4242 ret = rbd_dev_v2_header_onetime(rbd_dev);
4243 if (ret)
4244 goto out;
4245 }
4246
4247 /*
4248 * If the image supports layering, get the parent info. We
4249 * need to probe the first time regardless. Thereafter we
4250 * only need to if there's a parent, to see if it has
4251 * disappeared due to the mapped image getting flattened.
4252 */
4253 if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4254 (first_time || rbd_dev->parent_spec)) {
4255 bool warn;
4256
4257 ret = rbd_dev_v2_parent_info(rbd_dev);
4258 if (ret)
4259 goto out;
4260
4261 /*
4262 * Print a warning if this is the initial probe and
4263 * the image has a parent. Don't print it if the
4264 * image now being probed is itself a parent. We
4265 * can tell at this point because we won't know its
4266 * pool name yet (just its pool id).
4267 */
4268 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4269 if (first_time && warn)
4270 rbd_warn(rbd_dev, "WARNING: kernel layering "
4271 "is EXPERIMENTAL!");
4272 }
4273
4062 ret = rbd_dev_v2_image_size(rbd_dev); 4274 ret = rbd_dev_v2_image_size(rbd_dev);
4063 if (ret) 4275 if (ret)
4064 goto out; 4276 goto out;
4065 rbd_update_mapping_size(rbd_dev); 4277
4278 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4279 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4280 rbd_dev->mapping.size = rbd_dev->header.image_size;
4066 4281
4067 ret = rbd_dev_v2_snap_context(rbd_dev); 4282 ret = rbd_dev_v2_snap_context(rbd_dev);
4068 dout("rbd_dev_v2_snap_context returned %d\n", ret); 4283 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4069 if (ret)
4070 goto out;
4071out: 4284out:
4072 up_write(&rbd_dev->header_rwsem); 4285 up_write(&rbd_dev->header_rwsem);
4073 4286
@@ -4490,10 +4703,10 @@ static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4490{ 4703{
4491 struct rbd_image_header *header; 4704 struct rbd_image_header *header;
4492 4705
4493 rbd_dev_remove_parent(rbd_dev); 4706 /* Drop parent reference unless it's already been done (or none) */
4494 rbd_spec_put(rbd_dev->parent_spec); 4707
4495 rbd_dev->parent_spec = NULL; 4708 if (rbd_dev->parent_overlap)
4496 rbd_dev->parent_overlap = 0; 4709 rbd_dev_parent_put(rbd_dev);
4497 4710
4498 /* Free dynamic fields from the header, then zero it out */ 4711 /* Free dynamic fields from the header, then zero it out */
4499 4712
@@ -4505,72 +4718,22 @@ static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4505 memset(header, 0, sizeof (*header)); 4718 memset(header, 0, sizeof (*header));
4506} 4719}
4507 4720
4508static int rbd_dev_v1_probe(struct rbd_device *rbd_dev) 4721static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4509{ 4722{
4510 int ret; 4723 int ret;
4511 4724
4512 /* Populate rbd image metadata */
4513
4514 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4515 if (ret < 0)
4516 goto out_err;
4517
4518 /* Version 1 images have no parent (no layering) */
4519
4520 rbd_dev->parent_spec = NULL;
4521 rbd_dev->parent_overlap = 0;
4522
4523 dout("discovered version 1 image, header name is %s\n",
4524 rbd_dev->header_name);
4525
4526 return 0;
4527
4528out_err:
4529 kfree(rbd_dev->header_name);
4530 rbd_dev->header_name = NULL;
4531 kfree(rbd_dev->spec->image_id);
4532 rbd_dev->spec->image_id = NULL;
4533
4534 return ret;
4535}
4536
4537static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4538{
4539 int ret;
4540
4541 ret = rbd_dev_v2_image_size(rbd_dev);
4542 if (ret)
4543 goto out_err;
4544
4545 /* Get the object prefix (a.k.a. block_name) for the image */
4546
4547 ret = rbd_dev_v2_object_prefix(rbd_dev); 4725 ret = rbd_dev_v2_object_prefix(rbd_dev);
4548 if (ret) 4726 if (ret)
4549 goto out_err; 4727 goto out_err;
4550 4728
4551 /* Get the and check features for the image */ 4729 /*
4552 4730 * Get the and check features for the image. Currently the
4731 * features are assumed to never change.
4732 */
4553 ret = rbd_dev_v2_features(rbd_dev); 4733 ret = rbd_dev_v2_features(rbd_dev);
4554 if (ret) 4734 if (ret)
4555 goto out_err; 4735 goto out_err;
4556 4736
4557 /* If the image supports layering, get the parent info */
4558
4559 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4560 ret = rbd_dev_v2_parent_info(rbd_dev);
4561 if (ret)
4562 goto out_err;
4563
4564 /*
4565 * Don't print a warning for parent images. We can
4566 * tell this point because we won't know its pool
4567 * name yet (just its pool id).
4568 */
4569 if (rbd_dev->spec->pool_name)
4570 rbd_warn(rbd_dev, "WARNING: kernel layering "
4571 "is EXPERIMENTAL!");
4572 }
4573
4574 /* If the image supports fancy striping, get its parameters */ 4737 /* If the image supports fancy striping, get its parameters */
4575 4738
4576 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) { 4739 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
@@ -4578,28 +4741,11 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4578 if (ret < 0) 4741 if (ret < 0)
4579 goto out_err; 4742 goto out_err;
4580 } 4743 }
4581 4744 /* No support for crypto and compression type format 2 images */
4582 /* crypto and compression type aren't (yet) supported for v2 images */
4583
4584 rbd_dev->header.crypt_type = 0;
4585 rbd_dev->header.comp_type = 0;
4586
4587 /* Get the snapshot context, plus the header version */
4588
4589 ret = rbd_dev_v2_snap_context(rbd_dev);
4590 if (ret)
4591 goto out_err;
4592
4593 dout("discovered version 2 image, header name is %s\n",
4594 rbd_dev->header_name);
4595 4745
4596 return 0; 4746 return 0;
4597out_err: 4747out_err:
4598 rbd_dev->parent_overlap = 0; 4748 rbd_dev->header.features = 0;
4599 rbd_spec_put(rbd_dev->parent_spec);
4600 rbd_dev->parent_spec = NULL;
4601 kfree(rbd_dev->header_name);
4602 rbd_dev->header_name = NULL;
4603 kfree(rbd_dev->header.object_prefix); 4749 kfree(rbd_dev->header.object_prefix);
4604 rbd_dev->header.object_prefix = NULL; 4750 rbd_dev->header.object_prefix = NULL;
4605 4751
@@ -4628,15 +4774,16 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4628 if (!parent) 4774 if (!parent)
4629 goto out_err; 4775 goto out_err;
4630 4776
4631 ret = rbd_dev_image_probe(parent); 4777 ret = rbd_dev_image_probe(parent, false);
4632 if (ret < 0) 4778 if (ret < 0)
4633 goto out_err; 4779 goto out_err;
4634 rbd_dev->parent = parent; 4780 rbd_dev->parent = parent;
4781 atomic_set(&rbd_dev->parent_ref, 1);
4635 4782
4636 return 0; 4783 return 0;
4637out_err: 4784out_err:
4638 if (parent) { 4785 if (parent) {
4639 rbd_spec_put(rbd_dev->parent_spec); 4786 rbd_dev_unparent(rbd_dev);
4640 kfree(rbd_dev->header_name); 4787 kfree(rbd_dev->header_name);
4641 rbd_dev_destroy(parent); 4788 rbd_dev_destroy(parent);
4642 } else { 4789 } else {
@@ -4651,10 +4798,6 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4651{ 4798{
4652 int ret; 4799 int ret;
4653 4800
4654 ret = rbd_dev_mapping_set(rbd_dev);
4655 if (ret)
4656 return ret;
4657
4658 /* generate unique id: find highest unique id, add one */ 4801 /* generate unique id: find highest unique id, add one */
4659 rbd_dev_id_get(rbd_dev); 4802 rbd_dev_id_get(rbd_dev);
4660 4803
@@ -4676,13 +4819,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4676 if (ret) 4819 if (ret)
4677 goto err_out_blkdev; 4820 goto err_out_blkdev;
4678 4821
4679 ret = rbd_bus_add_dev(rbd_dev); 4822 ret = rbd_dev_mapping_set(rbd_dev);
4680 if (ret) 4823 if (ret)
4681 goto err_out_disk; 4824 goto err_out_disk;
4825 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4826
4827 ret = rbd_bus_add_dev(rbd_dev);
4828 if (ret)
4829 goto err_out_mapping;
4682 4830
4683 /* Everything's ready. Announce the disk to the world. */ 4831 /* Everything's ready. Announce the disk to the world. */
4684 4832
4685 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4686 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 4833 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4687 add_disk(rbd_dev->disk); 4834 add_disk(rbd_dev->disk);
4688 4835
@@ -4691,6 +4838,8 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4691 4838
4692 return ret; 4839 return ret;
4693 4840
4841err_out_mapping:
4842 rbd_dev_mapping_clear(rbd_dev);
4694err_out_disk: 4843err_out_disk:
4695 rbd_free_disk(rbd_dev); 4844 rbd_free_disk(rbd_dev);
4696err_out_blkdev: 4845err_out_blkdev:
@@ -4731,12 +4880,7 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4731 4880
4732static void rbd_dev_image_release(struct rbd_device *rbd_dev) 4881static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4733{ 4882{
4734 int ret;
4735
4736 rbd_dev_unprobe(rbd_dev); 4883 rbd_dev_unprobe(rbd_dev);
4737 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4738 if (ret)
4739 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4740 kfree(rbd_dev->header_name); 4884 kfree(rbd_dev->header_name);
4741 rbd_dev->header_name = NULL; 4885 rbd_dev->header_name = NULL;
4742 rbd_dev->image_format = 0; 4886 rbd_dev->image_format = 0;
@@ -4748,10 +4892,11 @@ static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4748 4892
4749/* 4893/*
4750 * Probe for the existence of the header object for the given rbd 4894 * Probe for the existence of the header object for the given rbd
4751 * device. For format 2 images this includes determining the image 4895 * device. If this image is the one being mapped (i.e., not a
4752 * id. 4896 * parent), initiate a watch on its header object before using that
4897 * object to get detailed information about the rbd image.
4753 */ 4898 */
4754static int rbd_dev_image_probe(struct rbd_device *rbd_dev) 4899static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4755{ 4900{
4756 int ret; 4901 int ret;
4757 int tmp; 4902 int tmp;
@@ -4771,14 +4916,16 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4771 if (ret) 4916 if (ret)
4772 goto err_out_format; 4917 goto err_out_format;
4773 4918
4774 ret = rbd_dev_header_watch_sync(rbd_dev, 1); 4919 if (mapping) {
4775 if (ret) 4920 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4776 goto out_header_name; 4921 if (ret)
4922 goto out_header_name;
4923 }
4777 4924
4778 if (rbd_dev->image_format == 1) 4925 if (rbd_dev->image_format == 1)
4779 ret = rbd_dev_v1_probe(rbd_dev); 4926 ret = rbd_dev_v1_header_info(rbd_dev);
4780 else 4927 else
4781 ret = rbd_dev_v2_probe(rbd_dev); 4928 ret = rbd_dev_v2_header_info(rbd_dev);
4782 if (ret) 4929 if (ret)
4783 goto err_out_watch; 4930 goto err_out_watch;
4784 4931
@@ -4787,15 +4934,22 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4787 goto err_out_probe; 4934 goto err_out_probe;
4788 4935
4789 ret = rbd_dev_probe_parent(rbd_dev); 4936 ret = rbd_dev_probe_parent(rbd_dev);
4790 if (!ret) 4937 if (ret)
4791 return 0; 4938 goto err_out_probe;
4939
4940 dout("discovered format %u image, header name is %s\n",
4941 rbd_dev->image_format, rbd_dev->header_name);
4792 4942
4943 return 0;
4793err_out_probe: 4944err_out_probe:
4794 rbd_dev_unprobe(rbd_dev); 4945 rbd_dev_unprobe(rbd_dev);
4795err_out_watch: 4946err_out_watch:
4796 tmp = rbd_dev_header_watch_sync(rbd_dev, 0); 4947 if (mapping) {
4797 if (tmp) 4948 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4798 rbd_warn(rbd_dev, "unable to tear down watch request\n"); 4949 if (tmp)
4950 rbd_warn(rbd_dev, "unable to tear down "
4951 "watch request (%d)\n", tmp);
4952 }
4799out_header_name: 4953out_header_name:
4800 kfree(rbd_dev->header_name); 4954 kfree(rbd_dev->header_name);
4801 rbd_dev->header_name = NULL; 4955 rbd_dev->header_name = NULL;
@@ -4819,6 +4973,7 @@ static ssize_t rbd_add(struct bus_type *bus,
4819 struct rbd_spec *spec = NULL; 4973 struct rbd_spec *spec = NULL;
4820 struct rbd_client *rbdc; 4974 struct rbd_client *rbdc;
4821 struct ceph_osd_client *osdc; 4975 struct ceph_osd_client *osdc;
4976 bool read_only;
4822 int rc = -ENOMEM; 4977 int rc = -ENOMEM;
4823 4978
4824 if (!try_module_get(THIS_MODULE)) 4979 if (!try_module_get(THIS_MODULE))
@@ -4828,6 +4983,9 @@ static ssize_t rbd_add(struct bus_type *bus,
4828 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); 4983 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4829 if (rc < 0) 4984 if (rc < 0)
4830 goto err_out_module; 4985 goto err_out_module;
4986 read_only = rbd_opts->read_only;
4987 kfree(rbd_opts);
4988 rbd_opts = NULL; /* done with this */
4831 4989
4832 rbdc = rbd_get_client(ceph_opts); 4990 rbdc = rbd_get_client(ceph_opts);
4833 if (IS_ERR(rbdc)) { 4991 if (IS_ERR(rbdc)) {
@@ -4858,14 +5016,16 @@ static ssize_t rbd_add(struct bus_type *bus,
4858 rbdc = NULL; /* rbd_dev now owns this */ 5016 rbdc = NULL; /* rbd_dev now owns this */
4859 spec = NULL; /* rbd_dev now owns this */ 5017 spec = NULL; /* rbd_dev now owns this */
4860 5018
4861 rbd_dev->mapping.read_only = rbd_opts->read_only; 5019 rc = rbd_dev_image_probe(rbd_dev, true);
4862 kfree(rbd_opts);
4863 rbd_opts = NULL; /* done with this */
4864
4865 rc = rbd_dev_image_probe(rbd_dev);
4866 if (rc < 0) 5020 if (rc < 0)
4867 goto err_out_rbd_dev; 5021 goto err_out_rbd_dev;
4868 5022
5023 /* If we are mapping a snapshot it must be marked read-only */
5024
5025 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5026 read_only = true;
5027 rbd_dev->mapping.read_only = read_only;
5028
4869 rc = rbd_dev_device_setup(rbd_dev); 5029 rc = rbd_dev_device_setup(rbd_dev);
4870 if (!rc) 5030 if (!rc)
4871 return count; 5031 return count;
@@ -4911,7 +5071,7 @@ static void rbd_dev_device_release(struct device *dev)
4911 5071
4912 rbd_free_disk(rbd_dev); 5072 rbd_free_disk(rbd_dev);
4913 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 5073 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4914 rbd_dev_clear_mapping(rbd_dev); 5074 rbd_dev_mapping_clear(rbd_dev);
4915 unregister_blkdev(rbd_dev->major, rbd_dev->name); 5075 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4916 rbd_dev->major = 0; 5076 rbd_dev->major = 0;
4917 rbd_dev_id_put(rbd_dev); 5077 rbd_dev_id_put(rbd_dev);
@@ -4978,10 +5138,13 @@ static ssize_t rbd_remove(struct bus_type *bus,
4978 spin_unlock_irq(&rbd_dev->lock); 5138 spin_unlock_irq(&rbd_dev->lock);
4979 if (ret < 0) 5139 if (ret < 0)
4980 goto done; 5140 goto done;
4981 ret = count;
4982 rbd_bus_del_dev(rbd_dev); 5141 rbd_bus_del_dev(rbd_dev);
5142 ret = rbd_dev_header_watch_sync(rbd_dev, false);
5143 if (ret)
5144 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4983 rbd_dev_image_release(rbd_dev); 5145 rbd_dev_image_release(rbd_dev);
4984 module_put(THIS_MODULE); 5146 module_put(THIS_MODULE);
5147 ret = count;
4985done: 5148done:
4986 mutex_unlock(&ctl_mutex); 5149 mutex_unlock(&ctl_mutex);
4987 5150
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index a3395fdfbd4f..d5953b87918c 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1204,6 +1204,7 @@ void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
1204 mutex_lock(&osdc->request_mutex); 1204 mutex_lock(&osdc->request_mutex);
1205 if (req->r_linger) { 1205 if (req->r_linger) {
1206 __unregister_linger_request(osdc, req); 1206 __unregister_linger_request(osdc, req);
1207 req->r_linger = 0;
1207 ceph_osdc_put_request(req); 1208 ceph_osdc_put_request(req);
1208 } 1209 }
1209 mutex_unlock(&osdc->request_mutex); 1210 mutex_unlock(&osdc->request_mutex);
@@ -2120,7 +2121,9 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
2120 down_read(&osdc->map_sem); 2121 down_read(&osdc->map_sem);
2121 mutex_lock(&osdc->request_mutex); 2122 mutex_lock(&osdc->request_mutex);
2122 __register_request(osdc, req); 2123 __register_request(osdc, req);
2123 WARN_ON(req->r_sent); 2124 req->r_sent = 0;
2125 req->r_got_reply = 0;
2126 req->r_completed = 0;
2124 rc = __map_request(osdc, req, 0); 2127 rc = __map_request(osdc, req, 0);
2125 if (rc < 0) { 2128 if (rc < 0) {
2126 if (nofail) { 2129 if (nofail) {