summaryrefslogtreecommitdiffstats
path: root/drivers/block
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2019-06-05 13:25:11 -0400
committerIlya Dryomov <idryomov@gmail.com>2019-07-08 08:01:45 -0400
commit22e8bd51bb0469d1a524130a057f894ff632376a (patch)
tree1cf2eaf3fac621e7154f58a7915e94db5851744e /drivers/block
parentda5ef6be3467eb2d293790dea69b5b562490715a (diff)
rbd: support for object-map and fast-diff
Speed up reads, discards and zeroouts through RBD_OBJ_FLAG_MAY_EXIST and RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT based on object map. Invalid object maps are not trusted, but still updated. Note that we never iterate, resize or invalidate object maps. If object-map feature is enabled but object map fails to load, we just fail the requester (either "rbd map" or I/O, by way of post-acquire action). Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'drivers/block')
-rw-r--r--drivers/block/rbd.c720
-rw-r--r--drivers/block/rbd_types.h10
2 files changed, 727 insertions, 3 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 3d861d3013f8..0df91665c4eb 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -115,6 +115,8 @@ static int atomic_dec_return_safe(atomic_t *v)
115#define RBD_FEATURE_LAYERING (1ULL<<0) 115#define RBD_FEATURE_LAYERING (1ULL<<0)
116#define RBD_FEATURE_STRIPINGV2 (1ULL<<1) 116#define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
117#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2) 117#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
118#define RBD_FEATURE_OBJECT_MAP (1ULL<<3)
119#define RBD_FEATURE_FAST_DIFF (1ULL<<4)
118#define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5) 120#define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5)
119#define RBD_FEATURE_DATA_POOL (1ULL<<7) 121#define RBD_FEATURE_DATA_POOL (1ULL<<7)
120#define RBD_FEATURE_OPERATIONS (1ULL<<8) 122#define RBD_FEATURE_OPERATIONS (1ULL<<8)
@@ -122,6 +124,8 @@ static int atomic_dec_return_safe(atomic_t *v)
122#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ 124#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
123 RBD_FEATURE_STRIPINGV2 | \ 125 RBD_FEATURE_STRIPINGV2 | \
124 RBD_FEATURE_EXCLUSIVE_LOCK | \ 126 RBD_FEATURE_EXCLUSIVE_LOCK | \
127 RBD_FEATURE_OBJECT_MAP | \
128 RBD_FEATURE_FAST_DIFF | \
125 RBD_FEATURE_DEEP_FLATTEN | \ 129 RBD_FEATURE_DEEP_FLATTEN | \
126 RBD_FEATURE_DATA_POOL | \ 130 RBD_FEATURE_DATA_POOL | \
127 RBD_FEATURE_OPERATIONS) 131 RBD_FEATURE_OPERATIONS)
@@ -227,6 +231,8 @@ enum obj_operation_type {
227#define RBD_OBJ_FLAG_DELETION (1U << 0) 231#define RBD_OBJ_FLAG_DELETION (1U << 0)
228#define RBD_OBJ_FLAG_COPYUP_ENABLED (1U << 1) 232#define RBD_OBJ_FLAG_COPYUP_ENABLED (1U << 1)
229#define RBD_OBJ_FLAG_COPYUP_ZEROS (1U << 2) 233#define RBD_OBJ_FLAG_COPYUP_ZEROS (1U << 2)
234#define RBD_OBJ_FLAG_MAY_EXIST (1U << 3)
235#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT (1U << 4)
230 236
231enum rbd_obj_read_state { 237enum rbd_obj_read_state {
232 RBD_OBJ_READ_START = 1, 238 RBD_OBJ_READ_START = 1,
@@ -261,14 +267,18 @@ enum rbd_obj_read_state {
261 */ 267 */
262enum rbd_obj_write_state { 268enum rbd_obj_write_state {
263 RBD_OBJ_WRITE_START = 1, 269 RBD_OBJ_WRITE_START = 1,
270 RBD_OBJ_WRITE_PRE_OBJECT_MAP,
264 RBD_OBJ_WRITE_OBJECT, 271 RBD_OBJ_WRITE_OBJECT,
265 __RBD_OBJ_WRITE_COPYUP, 272 __RBD_OBJ_WRITE_COPYUP,
266 RBD_OBJ_WRITE_COPYUP, 273 RBD_OBJ_WRITE_COPYUP,
274 RBD_OBJ_WRITE_POST_OBJECT_MAP,
267}; 275};
268 276
269enum rbd_obj_copyup_state { 277enum rbd_obj_copyup_state {
270 RBD_OBJ_COPYUP_START = 1, 278 RBD_OBJ_COPYUP_START = 1,
271 RBD_OBJ_COPYUP_READ_PARENT, 279 RBD_OBJ_COPYUP_READ_PARENT,
280 __RBD_OBJ_COPYUP_OBJECT_MAPS,
281 RBD_OBJ_COPYUP_OBJECT_MAPS,
272 __RBD_OBJ_COPYUP_WRITE_OBJECT, 282 __RBD_OBJ_COPYUP_WRITE_OBJECT,
273 RBD_OBJ_COPYUP_WRITE_OBJECT, 283 RBD_OBJ_COPYUP_WRITE_OBJECT,
274}; 284};
@@ -419,6 +429,11 @@ struct rbd_device {
419 int acquire_err; 429 int acquire_err;
420 struct completion releasing_wait; 430 struct completion releasing_wait;
421 431
432 spinlock_t object_map_lock;
433 u8 *object_map;
434 u64 object_map_size; /* in objects */
435 u64 object_map_flags;
436
422 struct workqueue_struct *task_wq; 437 struct workqueue_struct *task_wq;
423 438
424 struct rbd_spec *parent_spec; 439 struct rbd_spec *parent_spec;
@@ -620,6 +635,7 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
620 u8 *order, u64 *snap_size); 635 u8 *order, u64 *snap_size);
621static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 636static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
622 u64 *snap_features); 637 u64 *snap_features);
638static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
623 639
624static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result); 640static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
625static void rbd_img_handle_request(struct rbd_img_request *img_req, int result); 641static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
@@ -1768,6 +1784,466 @@ static void rbd_img_request_destroy(struct kref *kref)
1768 kmem_cache_free(rbd_img_request_cache, img_request); 1784 kmem_cache_free(rbd_img_request_cache, img_request);
1769} 1785}
1770 1786
1787#define BITS_PER_OBJ 2
1788#define OBJS_PER_BYTE (BITS_PER_BYTE / BITS_PER_OBJ)
1789#define OBJ_MASK ((1 << BITS_PER_OBJ) - 1)
1790
1791static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1792 u64 *index, u8 *shift)
1793{
1794 u32 off;
1795
1796 rbd_assert(objno < rbd_dev->object_map_size);
1797 *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1798 *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1799}
1800
1801static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1802{
1803 u64 index;
1804 u8 shift;
1805
1806 lockdep_assert_held(&rbd_dev->object_map_lock);
1807 __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1808 return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1809}
1810
1811static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1812{
1813 u64 index;
1814 u8 shift;
1815 u8 *p;
1816
1817 lockdep_assert_held(&rbd_dev->object_map_lock);
1818 rbd_assert(!(val & ~OBJ_MASK));
1819
1820 __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1821 p = &rbd_dev->object_map[index];
1822 *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1823}
1824
1825static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1826{
1827 u8 state;
1828
1829 spin_lock(&rbd_dev->object_map_lock);
1830 state = __rbd_object_map_get(rbd_dev, objno);
1831 spin_unlock(&rbd_dev->object_map_lock);
1832 return state;
1833}
1834
1835static bool use_object_map(struct rbd_device *rbd_dev)
1836{
1837 return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1838 !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1839}
1840
1841static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1842{
1843 u8 state;
1844
1845 /* fall back to default logic if object map is disabled or invalid */
1846 if (!use_object_map(rbd_dev))
1847 return true;
1848
1849 state = rbd_object_map_get(rbd_dev, objno);
1850 return state != OBJECT_NONEXISTENT;
1851}
1852
1853static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1854 struct ceph_object_id *oid)
1855{
1856 if (snap_id == CEPH_NOSNAP)
1857 ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1858 rbd_dev->spec->image_id);
1859 else
1860 ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1861 rbd_dev->spec->image_id, snap_id);
1862}
1863
1864static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1865{
1866 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1867 CEPH_DEFINE_OID_ONSTACK(oid);
1868 u8 lock_type;
1869 char *lock_tag;
1870 struct ceph_locker *lockers;
1871 u32 num_lockers;
1872 bool broke_lock = false;
1873 int ret;
1874
1875 rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1876
1877again:
1878 ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1879 CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1880 if (ret != -EBUSY || broke_lock) {
1881 if (ret == -EEXIST)
1882 ret = 0; /* already locked by myself */
1883 if (ret)
1884 rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1885 return ret;
1886 }
1887
1888 ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1889 RBD_LOCK_NAME, &lock_type, &lock_tag,
1890 &lockers, &num_lockers);
1891 if (ret) {
1892 if (ret == -ENOENT)
1893 goto again;
1894
1895 rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1896 return ret;
1897 }
1898
1899 kfree(lock_tag);
1900 if (num_lockers == 0)
1901 goto again;
1902
1903 rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1904 ENTITY_NAME(lockers[0].id.name));
1905
1906 ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1907 RBD_LOCK_NAME, lockers[0].id.cookie,
1908 &lockers[0].id.name);
1909 ceph_free_lockers(lockers, num_lockers);
1910 if (ret) {
1911 if (ret == -ENOENT)
1912 goto again;
1913
1914 rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1915 return ret;
1916 }
1917
1918 broke_lock = true;
1919 goto again;
1920}
1921
1922static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1923{
1924 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1925 CEPH_DEFINE_OID_ONSTACK(oid);
1926 int ret;
1927
1928 rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1929
1930 ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1931 "");
1932 if (ret && ret != -ENOENT)
1933 rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1934}
1935
1936static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1937{
1938 u8 struct_v;
1939 u32 struct_len;
1940 u32 header_len;
1941 void *header_end;
1942 int ret;
1943
1944 ceph_decode_32_safe(p, end, header_len, e_inval);
1945 header_end = *p + header_len;
1946
1947 ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1948 &struct_len);
1949 if (ret)
1950 return ret;
1951
1952 ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1953
1954 *p = header_end;
1955 return 0;
1956
1957e_inval:
1958 return -EINVAL;
1959}
1960
1961static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1962{
1963 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1964 CEPH_DEFINE_OID_ONSTACK(oid);
1965 struct page **pages;
1966 void *p, *end;
1967 size_t reply_len;
1968 u64 num_objects;
1969 u64 object_map_bytes;
1970 u64 object_map_size;
1971 int num_pages;
1972 int ret;
1973
1974 rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1975
1976 num_objects = ceph_get_num_objects(&rbd_dev->layout,
1977 rbd_dev->mapping.size);
1978 object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1979 BITS_PER_BYTE);
1980 num_pages = calc_pages_for(0, object_map_bytes) + 1;
1981 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1982 if (IS_ERR(pages))
1983 return PTR_ERR(pages);
1984
1985 reply_len = num_pages * PAGE_SIZE;
1986 rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1987 ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1988 "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1989 NULL, 0, pages, &reply_len);
1990 if (ret)
1991 goto out;
1992
1993 p = page_address(pages[0]);
1994 end = p + min(reply_len, (size_t)PAGE_SIZE);
1995 ret = decode_object_map_header(&p, end, &object_map_size);
1996 if (ret)
1997 goto out;
1998
1999 if (object_map_size != num_objects) {
2000 rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
2001 object_map_size, num_objects);
2002 ret = -EINVAL;
2003 goto out;
2004 }
2005
2006 if (offset_in_page(p) + object_map_bytes > reply_len) {
2007 ret = -EINVAL;
2008 goto out;
2009 }
2010
2011 rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
2012 if (!rbd_dev->object_map) {
2013 ret = -ENOMEM;
2014 goto out;
2015 }
2016
2017 rbd_dev->object_map_size = object_map_size;
2018 ceph_copy_from_page_vector(pages, rbd_dev->object_map,
2019 offset_in_page(p), object_map_bytes);
2020
2021out:
2022 ceph_release_page_vector(pages, num_pages);
2023 return ret;
2024}
2025
2026static void rbd_object_map_free(struct rbd_device *rbd_dev)
2027{
2028 kvfree(rbd_dev->object_map);
2029 rbd_dev->object_map = NULL;
2030 rbd_dev->object_map_size = 0;
2031}
2032
2033static int rbd_object_map_load(struct rbd_device *rbd_dev)
2034{
2035 int ret;
2036
2037 ret = __rbd_object_map_load(rbd_dev);
2038 if (ret)
2039 return ret;
2040
2041 ret = rbd_dev_v2_get_flags(rbd_dev);
2042 if (ret) {
2043 rbd_object_map_free(rbd_dev);
2044 return ret;
2045 }
2046
2047 if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
2048 rbd_warn(rbd_dev, "object map is invalid");
2049
2050 return 0;
2051}
2052
2053static int rbd_object_map_open(struct rbd_device *rbd_dev)
2054{
2055 int ret;
2056
2057 ret = rbd_object_map_lock(rbd_dev);
2058 if (ret)
2059 return ret;
2060
2061 ret = rbd_object_map_load(rbd_dev);
2062 if (ret) {
2063 rbd_object_map_unlock(rbd_dev);
2064 return ret;
2065 }
2066
2067 return 0;
2068}
2069
2070static void rbd_object_map_close(struct rbd_device *rbd_dev)
2071{
2072 rbd_object_map_free(rbd_dev);
2073 rbd_object_map_unlock(rbd_dev);
2074}
2075
2076/*
2077 * This function needs snap_id (or more precisely just something to
2078 * distinguish between HEAD and snapshot object maps), new_state and
2079 * current_state that were passed to rbd_object_map_update().
2080 *
2081 * To avoid allocating and stashing a context we piggyback on the OSD
2082 * request. A HEAD update has two ops (assert_locked). For new_state
2083 * and current_state we decode our own object_map_update op, encoded in
2084 * rbd_cls_object_map_update().
2085 */
2086static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
2087 struct ceph_osd_request *osd_req)
2088{
2089 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2090 struct ceph_osd_data *osd_data;
2091 u64 objno;
2092 u8 state, new_state, current_state;
2093 bool has_current_state;
2094 void *p;
2095
2096 if (osd_req->r_result)
2097 return osd_req->r_result;
2098
2099 /*
2100 * Nothing to do for a snapshot object map.
2101 */
2102 if (osd_req->r_num_ops == 1)
2103 return 0;
2104
2105 /*
2106 * Update in-memory HEAD object map.
2107 */
2108 rbd_assert(osd_req->r_num_ops == 2);
2109 osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
2110 rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
2111
2112 p = page_address(osd_data->pages[0]);
2113 objno = ceph_decode_64(&p);
2114 rbd_assert(objno == obj_req->ex.oe_objno);
2115 rbd_assert(ceph_decode_64(&p) == objno + 1);
2116 new_state = ceph_decode_8(&p);
2117 has_current_state = ceph_decode_8(&p);
2118 if (has_current_state)
2119 current_state = ceph_decode_8(&p);
2120
2121 spin_lock(&rbd_dev->object_map_lock);
2122 state = __rbd_object_map_get(rbd_dev, objno);
2123 if (!has_current_state || current_state == state ||
2124 (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
2125 __rbd_object_map_set(rbd_dev, objno, new_state);
2126 spin_unlock(&rbd_dev->object_map_lock);
2127
2128 return 0;
2129}
2130
2131static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
2132{
2133 struct rbd_obj_request *obj_req = osd_req->r_priv;
2134 int result;
2135
2136 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
2137 osd_req->r_result, obj_req);
2138
2139 result = rbd_object_map_update_finish(obj_req, osd_req);
2140 rbd_obj_handle_request(obj_req, result);
2141}
2142
2143static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
2144{
2145 u8 state = rbd_object_map_get(rbd_dev, objno);
2146
2147 if (state == new_state ||
2148 (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2149 (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2150 return false;
2151
2152 return true;
2153}
2154
2155static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2156 int which, u64 objno, u8 new_state,
2157 const u8 *current_state)
2158{
2159 struct page **pages;
2160 void *p, *start;
2161 int ret;
2162
2163 ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2164 if (ret)
2165 return ret;
2166
2167 pages = ceph_alloc_page_vector(1, GFP_NOIO);
2168 if (IS_ERR(pages))
2169 return PTR_ERR(pages);
2170
2171 p = start = page_address(pages[0]);
2172 ceph_encode_64(&p, objno);
2173 ceph_encode_64(&p, objno + 1);
2174 ceph_encode_8(&p, new_state);
2175 if (current_state) {
2176 ceph_encode_8(&p, 1);
2177 ceph_encode_8(&p, *current_state);
2178 } else {
2179 ceph_encode_8(&p, 0);
2180 }
2181
2182 osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2183 false, true);
2184 return 0;
2185}
2186
2187/*
2188 * Return:
2189 * 0 - object map update sent
2190 * 1 - object map update isn't needed
2191 * <0 - error
2192 */
2193static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2194 u8 new_state, const u8 *current_state)
2195{
2196 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2197 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2198 struct ceph_osd_request *req;
2199 int num_ops = 1;
2200 int which = 0;
2201 int ret;
2202
2203 if (snap_id == CEPH_NOSNAP) {
2204 if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2205 return 1;
2206
2207 num_ops++; /* assert_locked */
2208 }
2209
2210 req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2211 if (!req)
2212 return -ENOMEM;
2213
2214 list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2215 req->r_callback = rbd_object_map_callback;
2216 req->r_priv = obj_req;
2217
2218 rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2219 ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2220 req->r_flags = CEPH_OSD_FLAG_WRITE;
2221 ktime_get_real_ts64(&req->r_mtime);
2222
2223 if (snap_id == CEPH_NOSNAP) {
2224 /*
2225 * Protect against possible race conditions during lock
2226 * ownership transitions.
2227 */
2228 ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2229 CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2230 if (ret)
2231 return ret;
2232 }
2233
2234 ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2235 new_state, current_state);
2236 if (ret)
2237 return ret;
2238
2239 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2240 if (ret)
2241 return ret;
2242
2243 ceph_osdc_start_request(osdc, req, false);
2244 return 0;
2245}
2246
1771static void prune_extents(struct ceph_file_extent *img_extents, 2247static void prune_extents(struct ceph_file_extent *img_extents,
1772 u32 *num_img_extents, u64 overlap) 2248 u32 *num_img_extents, u64 overlap)
1773{ 2249{
@@ -1975,6 +2451,7 @@ static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
1975 if (ret) 2451 if (ret)
1976 return ret; 2452 return ret;
1977 2453
2454 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
1978 if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) 2455 if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
1979 obj_req->flags |= RBD_OBJ_FLAG_DELETION; 2456 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
1980 2457
@@ -2022,6 +2499,7 @@ static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2022 if (rbd_obj_copyup_enabled(obj_req)) 2499 if (rbd_obj_copyup_enabled(obj_req))
2023 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED; 2500 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2024 if (!obj_req->num_img_extents) { 2501 if (!obj_req->num_img_extents) {
2502 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2025 if (rbd_obj_is_entire(obj_req)) 2503 if (rbd_obj_is_entire(obj_req))
2026 obj_req->flags |= RBD_OBJ_FLAG_DELETION; 2504 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2027 } 2505 }
@@ -2407,6 +2885,20 @@ static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2407 queue_work(rbd_wq, &img_req->work); 2885 queue_work(rbd_wq, &img_req->work);
2408} 2886}
2409 2887
2888static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2889{
2890 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2891
2892 if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2893 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2894 return true;
2895 }
2896
2897 dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2898 obj_req->ex.oe_objno);
2899 return false;
2900}
2901
2410static int rbd_obj_read_object(struct rbd_obj_request *obj_req) 2902static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2411{ 2903{
2412 struct ceph_osd_request *osd_req; 2904 struct ceph_osd_request *osd_req;
@@ -2482,10 +2974,17 @@ static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2482 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 2974 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2483 int ret; 2975 int ret;
2484 2976
2977again:
2485 switch (obj_req->read_state) { 2978 switch (obj_req->read_state) {
2486 case RBD_OBJ_READ_START: 2979 case RBD_OBJ_READ_START:
2487 rbd_assert(!*result); 2980 rbd_assert(!*result);
2488 2981
2982 if (!rbd_obj_may_exist(obj_req)) {
2983 *result = -ENOENT;
2984 obj_req->read_state = RBD_OBJ_READ_OBJECT;
2985 goto again;
2986 }
2987
2489 ret = rbd_obj_read_object(obj_req); 2988 ret = rbd_obj_read_object(obj_req);
2490 if (ret) { 2989 if (ret) {
2491 *result = ret; 2990 *result = ret;
@@ -2536,6 +3035,44 @@ static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2536 } 3035 }
2537} 3036}
2538 3037
3038static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
3039{
3040 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3041
3042 if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
3043 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
3044
3045 if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
3046 (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
3047 dout("%s %p noop for nonexistent\n", __func__, obj_req);
3048 return true;
3049 }
3050
3051 return false;
3052}
3053
3054/*
3055 * Return:
3056 * 0 - object map update sent
3057 * 1 - object map update isn't needed
3058 * <0 - error
3059 */
3060static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
3061{
3062 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3063 u8 new_state;
3064
3065 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3066 return 1;
3067
3068 if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3069 new_state = OBJECT_PENDING;
3070 else
3071 new_state = OBJECT_EXISTS;
3072
3073 return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
3074}
3075
2539static int rbd_obj_write_object(struct rbd_obj_request *obj_req) 3076static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
2540{ 3077{
2541 struct ceph_osd_request *osd_req; 3078 struct ceph_osd_request *osd_req;
@@ -2706,6 +3243,41 @@ static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
2706 return rbd_obj_read_from_parent(obj_req); 3243 return rbd_obj_read_from_parent(obj_req);
2707} 3244}
2708 3245
3246static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3247{
3248 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3249 struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3250 u8 new_state;
3251 u32 i;
3252 int ret;
3253
3254 rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3255
3256 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3257 return;
3258
3259 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3260 return;
3261
3262 for (i = 0; i < snapc->num_snaps; i++) {
3263 if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3264 i + 1 < snapc->num_snaps)
3265 new_state = OBJECT_EXISTS_CLEAN;
3266 else
3267 new_state = OBJECT_EXISTS;
3268
3269 ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3270 new_state, NULL);
3271 if (ret < 0) {
3272 obj_req->pending.result = ret;
3273 return;
3274 }
3275
3276 rbd_assert(!ret);
3277 obj_req->pending.num_pending++;
3278 }
3279}
3280
2709static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req) 3281static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
2710{ 3282{
2711 u32 bytes = rbd_obj_img_extents_bytes(obj_req); 3283 u32 bytes = rbd_obj_img_extents_bytes(obj_req);
@@ -2749,6 +3321,7 @@ static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
2749 3321
2750static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result) 3322static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
2751{ 3323{
3324 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2752 int ret; 3325 int ret;
2753 3326
2754again: 3327again:
@@ -2776,6 +3349,25 @@ again:
2776 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS; 3349 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
2777 } 3350 }
2778 3351
3352 rbd_obj_copyup_object_maps(obj_req);
3353 if (!obj_req->pending.num_pending) {
3354 *result = obj_req->pending.result;
3355 obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3356 goto again;
3357 }
3358 obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3359 return false;
3360 case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3361 if (!pending_result_dec(&obj_req->pending, result))
3362 return false;
3363 /* fall through */
3364 case RBD_OBJ_COPYUP_OBJECT_MAPS:
3365 if (*result) {
3366 rbd_warn(rbd_dev, "snap object map update failed: %d",
3367 *result);
3368 return true;
3369 }
3370
2779 rbd_obj_copyup_write_object(obj_req); 3371 rbd_obj_copyup_write_object(obj_req);
2780 if (!obj_req->pending.num_pending) { 3372 if (!obj_req->pending.num_pending) {
2781 *result = obj_req->pending.result; 3373 *result = obj_req->pending.result;
@@ -2795,6 +3387,27 @@ again:
2795 } 3387 }
2796} 3388}
2797 3389
3390/*
3391 * Return:
3392 * 0 - object map update sent
3393 * 1 - object map update isn't needed
3394 * <0 - error
3395 */
3396static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3397{
3398 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3399 u8 current_state = OBJECT_PENDING;
3400
3401 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3402 return 1;
3403
3404 if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3405 return 1;
3406
3407 return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3408 &current_state);
3409}
3410
2798static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result) 3411static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
2799{ 3412{
2800 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; 3413 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
@@ -2805,6 +3418,24 @@ again:
2805 case RBD_OBJ_WRITE_START: 3418 case RBD_OBJ_WRITE_START:
2806 rbd_assert(!*result); 3419 rbd_assert(!*result);
2807 3420
3421 if (rbd_obj_write_is_noop(obj_req))
3422 return true;
3423
3424 ret = rbd_obj_write_pre_object_map(obj_req);
3425 if (ret < 0) {
3426 *result = ret;
3427 return true;
3428 }
3429 obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3430 if (ret > 0)
3431 goto again;
3432 return false;
3433 case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3434 if (*result) {
3435 rbd_warn(rbd_dev, "pre object map update failed: %d",
3436 *result);
3437 return true;
3438 }
2808 ret = rbd_obj_write_object(obj_req); 3439 ret = rbd_obj_write_object(obj_req);
2809 if (ret) { 3440 if (ret) {
2810 *result = ret; 3441 *result = ret;
@@ -2837,8 +3468,23 @@ again:
2837 return false; 3468 return false;
2838 /* fall through */ 3469 /* fall through */
2839 case RBD_OBJ_WRITE_COPYUP: 3470 case RBD_OBJ_WRITE_COPYUP:
2840 if (*result) 3471 if (*result) {
2841 rbd_warn(rbd_dev, "copyup failed: %d", *result); 3472 rbd_warn(rbd_dev, "copyup failed: %d", *result);
3473 return true;
3474 }
3475 ret = rbd_obj_write_post_object_map(obj_req);
3476 if (ret < 0) {
3477 *result = ret;
3478 return true;
3479 }
3480 obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3481 if (ret > 0)
3482 goto again;
3483 return false;
3484 case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3485 if (*result)
3486 rbd_warn(rbd_dev, "post object map update failed: %d",
3487 *result);
2842 return true; 3488 return true;
2843 default: 3489 default:
2844 BUG(); 3490 BUG();
@@ -2892,7 +3538,8 @@ static bool need_exclusive_lock(struct rbd_img_request *img_req)
2892 return false; 3538 return false;
2893 3539
2894 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags)); 3540 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
2895 if (rbd_dev->opts->lock_on_read) 3541 if (rbd_dev->opts->lock_on_read ||
3542 (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
2896 return true; 3543 return true;
2897 3544
2898 return rbd_img_is_write(img_req); 3545 return rbd_img_is_write(img_req);
@@ -3431,7 +4078,7 @@ static int rbd_try_lock(struct rbd_device *rbd_dev)
3431 if (ret) 4078 if (ret)
3432 goto out; /* request lock or error */ 4079 goto out; /* request lock or error */
3433 4080
3434 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", 4081 rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
3435 ENTITY_NAME(lockers[0].id.name)); 4082 ENTITY_NAME(lockers[0].id.name));
3436 4083
3437 ret = ceph_monc_blacklist_add(&client->monc, 4084 ret = ceph_monc_blacklist_add(&client->monc,
@@ -3458,6 +4105,19 @@ out:
3458 return ret; 4105 return ret;
3459} 4106}
3460 4107
4108static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
4109{
4110 int ret;
4111
4112 if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
4113 ret = rbd_object_map_open(rbd_dev);
4114 if (ret)
4115 return ret;
4116 }
4117
4118 return 0;
4119}
4120
3461/* 4121/*
3462 * Return: 4122 * Return:
3463 * 0 - lock acquired 4123 * 0 - lock acquired
@@ -3501,6 +4161,17 @@ static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
3501 rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED); 4161 rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
3502 rbd_assert(list_empty(&rbd_dev->running_list)); 4162 rbd_assert(list_empty(&rbd_dev->running_list));
3503 4163
4164 ret = rbd_post_acquire_action(rbd_dev);
4165 if (ret) {
4166 rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4167 /*
4168 * Can't stay in RBD_LOCK_STATE_LOCKED because
4169 * rbd_lock_add_request() would let the request through,
4170 * assuming that e.g. object map is locked and loaded.
4171 */
4172 rbd_unlock(rbd_dev);
4173 }
4174
3504out: 4175out:
3505 wake_lock_waiters(rbd_dev, ret); 4176 wake_lock_waiters(rbd_dev, ret);
3506 up_write(&rbd_dev->lock_rwsem); 4177 up_write(&rbd_dev->lock_rwsem);
@@ -3574,10 +4245,17 @@ static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
3574 return true; 4245 return true;
3575} 4246}
3576 4247
4248static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4249{
4250 if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4251 rbd_object_map_close(rbd_dev);
4252}
4253
3577static void __rbd_release_lock(struct rbd_device *rbd_dev) 4254static void __rbd_release_lock(struct rbd_device *rbd_dev)
3578{ 4255{
3579 rbd_assert(list_empty(&rbd_dev->running_list)); 4256 rbd_assert(list_empty(&rbd_dev->running_list));
3580 4257
4258 rbd_pre_release_action(rbd_dev);
3581 rbd_unlock(rbd_dev); 4259 rbd_unlock(rbd_dev);
3582} 4260}
3583 4261
@@ -4864,6 +5542,8 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4864 init_completion(&rbd_dev->acquire_wait); 5542 init_completion(&rbd_dev->acquire_wait);
4865 init_completion(&rbd_dev->releasing_wait); 5543 init_completion(&rbd_dev->releasing_wait);
4866 5544
5545 spin_lock_init(&rbd_dev->object_map_lock);
5546
4867 rbd_dev->dev.bus = &rbd_bus_type; 5547 rbd_dev->dev.bus = &rbd_bus_type;
4868 rbd_dev->dev.type = &rbd_device_type; 5548 rbd_dev->dev.type = &rbd_device_type;
4869 rbd_dev->dev.parent = &rbd_root_dev; 5549 rbd_dev->dev.parent = &rbd_root_dev;
@@ -5045,6 +5725,32 @@ static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5045 &rbd_dev->header.features); 5725 &rbd_dev->header.features);
5046} 5726}
5047 5727
5728/*
5729 * These are generic image flags, but since they are used only for
5730 * object map, store them in rbd_dev->object_map_flags.
5731 *
5732 * For the same reason, this function is called only on object map
5733 * (re)load and not on header refresh.
5734 */
5735static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5736{
5737 __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5738 __le64 flags;
5739 int ret;
5740
5741 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5742 &rbd_dev->header_oloc, "get_flags",
5743 &snapid, sizeof(snapid),
5744 &flags, sizeof(flags));
5745 if (ret < 0)
5746 return ret;
5747 if (ret < sizeof(flags))
5748 return -EBADMSG;
5749
5750 rbd_dev->object_map_flags = le64_to_cpu(flags);
5751 return 0;
5752}
5753
5048struct parent_image_info { 5754struct parent_image_info {
5049 u64 pool_id; 5755 u64 pool_id;
5050 const char *pool_ns; 5756 const char *pool_ns;
@@ -6018,6 +6724,7 @@ static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6018 struct rbd_image_header *header; 6724 struct rbd_image_header *header;
6019 6725
6020 rbd_dev_parent_put(rbd_dev); 6726 rbd_dev_parent_put(rbd_dev);
6727 rbd_object_map_free(rbd_dev);
6021 rbd_dev_mapping_clear(rbd_dev); 6728 rbd_dev_mapping_clear(rbd_dev);
6022 6729
6023 /* Free dynamic fields from the header, then zero it out */ 6730 /* Free dynamic fields from the header, then zero it out */
@@ -6267,6 +6974,13 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6267 if (ret) 6974 if (ret)
6268 goto err_out_probe; 6975 goto err_out_probe;
6269 6976
6977 if (rbd_dev->spec->snap_id != CEPH_NOSNAP &&
6978 (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
6979 ret = rbd_object_map_load(rbd_dev);
6980 if (ret)
6981 goto err_out_probe;
6982 }
6983
6270 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { 6984 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6271 ret = rbd_dev_v2_parent_info(rbd_dev); 6985 ret = rbd_dev_v2_parent_info(rbd_dev);
6272 if (ret) 6986 if (ret)
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index 62ff50d3e7a6..ac98ab6ccd3b 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -18,6 +18,7 @@
18/* For format version 2, rbd image 'foo' consists of objects 18/* For format version 2, rbd image 'foo' consists of objects
19 * rbd_id.foo - id of image 19 * rbd_id.foo - id of image
20 * rbd_header.<id> - image metadata 20 * rbd_header.<id> - image metadata
21 * rbd_object_map.<id> - optional image object map
21 * rbd_data.<id>.0000000000000000 22 * rbd_data.<id>.0000000000000000
22 * rbd_data.<id>.0000000000000001 23 * rbd_data.<id>.0000000000000001
23 * ... - data 24 * ... - data
@@ -25,6 +26,7 @@
25 */ 26 */
26 27
27#define RBD_HEADER_PREFIX "rbd_header." 28#define RBD_HEADER_PREFIX "rbd_header."
29#define RBD_OBJECT_MAP_PREFIX "rbd_object_map."
28#define RBD_ID_PREFIX "rbd_id." 30#define RBD_ID_PREFIX "rbd_id."
29#define RBD_V2_DATA_FORMAT "%s.%016llx" 31#define RBD_V2_DATA_FORMAT "%s.%016llx"
30 32
@@ -39,6 +41,14 @@ enum rbd_notify_op {
39 RBD_NOTIFY_OP_HEADER_UPDATE = 3, 41 RBD_NOTIFY_OP_HEADER_UPDATE = 3,
40}; 42};
41 43
44#define OBJECT_NONEXISTENT 0
45#define OBJECT_EXISTS 1
46#define OBJECT_PENDING 2
47#define OBJECT_EXISTS_CLEAN 3
48
49#define RBD_FLAG_OBJECT_MAP_INVALID (1ULL << 0)
50#define RBD_FLAG_FAST_DIFF_INVALID (1ULL << 1)
51
42/* 52/*
43 * For format version 1, rbd image 'foo' consists of objects 53 * For format version 1, rbd image 'foo' consists of objects
44 * foo.rbd - image metadata 54 * foo.rbd - image metadata