aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/block/rbd.c
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-04-19 16:34:50 -0400
committerSage Weil <sage@inktank.com>2013-05-02 00:19:13 -0400
commit3d7efd18d9df628e30ff36e9e488a8f0e782b678 (patch)
treee4d6f87aedc0f2803920c32994ebe10e5994032d /drivers/block/rbd.c
parentd98df63ea7e87d5df4dce0cece0210e2a777ac00 (diff)
rbd: implement full object parent reads
As a step toward implementing layered writes, implement reading the data for a target object from the parent image for a write request whose target object is known to not exist. Add a copyup_pages field to an image request to track the page array used (only) for such a request. Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'drivers/block/rbd.c')
-rw-r--r--drivers/block/rbd.c152
1 files changed, 143 insertions, 9 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index b2819deced6b..639dd91e7dab 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -250,6 +250,7 @@ struct rbd_img_request {
250 struct request *rq; /* block request */ 250 struct request *rq; /* block request */
251 struct rbd_obj_request *obj_request; /* obj req initiator */ 251 struct rbd_obj_request *obj_request; /* obj req initiator */
252 }; 252 };
253 struct page **copyup_pages;
253 spinlock_t completion_lock;/* protects next_completion */ 254 spinlock_t completion_lock;/* protects next_completion */
254 u32 next_completion; 255 u32 next_completion;
255 rbd_img_callback_t callback; 256 rbd_img_callback_t callback;
@@ -350,6 +351,8 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock);
350static LIST_HEAD(rbd_client_list); /* clients */ 351static LIST_HEAD(rbd_client_list); /* clients */
351static DEFINE_SPINLOCK(rbd_client_list_lock); 352static DEFINE_SPINLOCK(rbd_client_list_lock);
352 353
354static int rbd_img_request_submit(struct rbd_img_request *img_request);
355
353static int rbd_dev_snaps_update(struct rbd_device *rbd_dev); 356static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
354static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); 357static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
355 358
@@ -1956,6 +1959,133 @@ out_unwind:
1956 return -ENOMEM; 1959 return -ENOMEM;
1957} 1960}
1958 1961
1962static void
1963rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
1964{
1965 struct rbd_obj_request *orig_request;
1966 struct page **pages;
1967 u32 page_count;
1968 int result;
1969 u64 obj_size;
1970 u64 xferred;
1971
1972 rbd_assert(img_request_child_test(img_request));
1973
1974 /* First get what we need from the image request */
1975
1976 pages = img_request->copyup_pages;
1977 rbd_assert(pages != NULL);
1978 img_request->copyup_pages = NULL;
1979
1980 orig_request = img_request->obj_request;
1981 rbd_assert(orig_request != NULL);
1982
1983 result = img_request->result;
1984 obj_size = img_request->length;
1985 xferred = img_request->xferred;
1986
1987 rbd_img_request_put(img_request);
1988
1989 obj_request_existence_set(orig_request, true);
1990
1991 page_count = (u32)calc_pages_for(0, obj_size);
1992 ceph_release_page_vector(pages, page_count);
1993
1994 /* Resubmit the original request (for now). */
1995
1996 orig_request->result = rbd_img_obj_request_submit(orig_request);
1997 if (orig_request->result) {
1998 obj_request_done_set(orig_request);
1999 rbd_obj_request_complete(orig_request);
2000 }
2001}
2002
2003/*
2004 * Read from the parent image the range of data that covers the
2005 * entire target of the given object request. This is used for
2006 * satisfying a layered image write request when the target of an
2007 * object request from the image request does not exist.
2008 *
2009 * A page array big enough to hold the returned data is allocated
2010 * and supplied to rbd_img_request_fill() as the "data descriptor."
2011 * When the read completes, this page array will be transferred to
2012 * the original object request for the copyup operation.
2013 *
2014 * If an error occurs, record it as the result of the original
2015 * object request and mark it done so it gets completed.
2016 */
2017static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2018{
2019 struct rbd_img_request *img_request = NULL;
2020 struct rbd_img_request *parent_request = NULL;
2021 struct rbd_device *rbd_dev;
2022 u64 img_offset;
2023 u64 length;
2024 struct page **pages = NULL;
2025 u32 page_count;
2026 int result;
2027
2028 rbd_assert(obj_request_img_data_test(obj_request));
2029 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2030
2031 img_request = obj_request->img_request;
2032 rbd_assert(img_request != NULL);
2033 rbd_dev = img_request->rbd_dev;
2034 rbd_assert(rbd_dev->parent != NULL);
2035
2036 /*
2037 * Determine the byte range covered by the object in the
2038 * child image to which the original request was to be sent.
2039 */
2040 img_offset = obj_request->img_offset - obj_request->offset;
2041 length = (u64)1 << rbd_dev->header.obj_order;
2042
2043 /*
2044 * Allocate a page array big enough to receive the data read
2045 * from the parent.
2046 */
2047 page_count = (u32)calc_pages_for(0, length);
2048 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2049 if (IS_ERR(pages)) {
2050 result = PTR_ERR(pages);
2051 pages = NULL;
2052 goto out_err;
2053 }
2054
2055 result = -ENOMEM;
2056 parent_request = rbd_img_request_create(rbd_dev->parent,
2057 img_offset, length,
2058 false, true);
2059 if (!parent_request)
2060 goto out_err;
2061 rbd_obj_request_get(obj_request);
2062 parent_request->obj_request = obj_request;
2063
2064 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2065 if (result)
2066 goto out_err;
2067 parent_request->copyup_pages = pages;
2068
2069 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2070 result = rbd_img_request_submit(parent_request);
2071 if (!result)
2072 return 0;
2073
2074 parent_request->copyup_pages = NULL;
2075 parent_request->obj_request = NULL;
2076 rbd_obj_request_put(obj_request);
2077out_err:
2078 if (pages)
2079 ceph_release_page_vector(pages, page_count);
2080 if (parent_request)
2081 rbd_img_request_put(parent_request);
2082 obj_request->result = result;
2083 obj_request->xferred = 0;
2084 obj_request_done_set(obj_request);
2085
2086 return result;
2087}
2088
1959static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) 2089static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
1960{ 2090{
1961 struct rbd_obj_request *orig_request; 2091 struct rbd_obj_request *orig_request;
@@ -1996,7 +2126,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
1996 obj_request_existence_set(orig_request, false); 2126 obj_request_existence_set(orig_request, false);
1997 } else if (result) { 2127 } else if (result) {
1998 orig_request->result = result; 2128 orig_request->result = result;
1999 goto out_err; 2129 goto out;
2000 } 2130 }
2001 2131
2002 /* 2132 /*
@@ -2004,7 +2134,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2004 * whether the target object exists. 2134 * whether the target object exists.
2005 */ 2135 */
2006 orig_request->result = rbd_img_obj_request_submit(orig_request); 2136 orig_request->result = rbd_img_obj_request_submit(orig_request);
2007out_err: 2137out:
2008 if (orig_request->result) 2138 if (orig_request->result)
2009 rbd_obj_request_complete(orig_request); 2139 rbd_obj_request_complete(orig_request);
2010 rbd_obj_request_put(orig_request); 2140 rbd_obj_request_put(orig_request);
@@ -2070,15 +2200,13 @@ out:
2070static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) 2200static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2071{ 2201{
2072 struct rbd_img_request *img_request; 2202 struct rbd_img_request *img_request;
2203 bool known;
2073 2204
2074 rbd_assert(obj_request_img_data_test(obj_request)); 2205 rbd_assert(obj_request_img_data_test(obj_request));
2075 2206
2076 img_request = obj_request->img_request; 2207 img_request = obj_request->img_request;
2077 rbd_assert(img_request); 2208 rbd_assert(img_request);
2078 2209
2079 /* (At the moment we don't care whether it exists or not...) */
2080 (void) obj_request_exists_test;
2081
2082 /* 2210 /*
2083 * Only layered writes need special handling. If it's not a 2211 * Only layered writes need special handling. If it's not a
2084 * layered write, or it is a layered write but we know the 2212 * layered write, or it is a layered write but we know the
@@ -2087,7 +2215,8 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2087 */ 2215 */
2088 if (!img_request_write_test(img_request) || 2216 if (!img_request_write_test(img_request) ||
2089 !img_request_layered_test(img_request) || 2217 !img_request_layered_test(img_request) ||
2090 obj_request_known_test(obj_request)) { 2218 ((known = obj_request_known_test(obj_request)) &&
2219 obj_request_exists_test(obj_request))) {
2091 2220
2092 struct rbd_device *rbd_dev; 2221 struct rbd_device *rbd_dev;
2093 struct ceph_osd_client *osdc; 2222 struct ceph_osd_client *osdc;
@@ -2099,10 +2228,15 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2099 } 2228 }
2100 2229
2101 /* 2230 /*
2102 * It's a layered write and we don't know whether the target 2231 * It's a layered write. The target object might exist but
2103 * exists. Issue existence check; once that completes the 2232 * we may not know that yet. If we know it doesn't exist,
2104 * original request will be submitted again. 2233 * start by reading the data for the full target object from
2234 * the parent so we can use it for a copyup to the target.
2105 */ 2235 */
2236 if (known)
2237 return rbd_img_obj_parent_read_full(obj_request);
2238
2239 /* We don't know whether the target exists. Go find out. */
2106 2240
2107 return rbd_img_obj_exists_submit(obj_request); 2241 return rbd_img_obj_exists_submit(obj_request);
2108} 2242}