summaryrefslogtreecommitdiffstats
path: root/drivers/block/rbd.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2018-04-10 15:25:30 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2018-04-10 15:25:30 -0400
commitb284d4d5a6785f8cd07eda2646a95782373cd01e (patch)
tree62d835dcb6a6eb30fe9b0ebad7aeba4b4234b1d6 /drivers/block/rbd.c
parenta7726f6b61e8917e73101bb8071facdff7ec5d72 (diff)
parent9122eed5281e89bdb02162a8ecb3cc13ffc8985e (diff)
Merge tag 'ceph-for-4.17-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov: "The big ticket items are: - support for rbd "fancy" striping (myself). The striping feature bit is now fully implemented, allowing mapping v2 images with non-default striping patterns. This completes support for --image-format 2. - CephFS quota support (Luis Henriques and Zheng Yan). This set is based on the new SnapRealm code in the upcoming v13.y.z ("Mimic") release. Quota handling will be rejected on older filesystems. - memory usage improvements in CephFS (Chengguang Xu). Directory specific bits have been split out of ceph_file_info and some effort went into improving cap reservation code to avoid OOM crashes. Also included a bunch of assorted fixes all over the place from Chengguang and others" * tag 'ceph-for-4.17-rc1' of git://github.com/ceph/ceph-client: (67 commits) ceph: quota: report root dir quota usage in statfs ceph: quota: add counter for snaprealms with quota ceph: quota: cache inode pointer in ceph_snap_realm ceph: fix root quota realm check ceph: don't check quota for snap inode ceph: quota: update MDS when max_bytes is approaching ceph: quota: support for ceph.quota.max_bytes ceph: quota: don't allow cross-quota renames ceph: quota: support for ceph.quota.max_files ceph: quota: add initial infrastructure to support cephfs quotas rbd: remove VLA usage rbd: fix spelling mistake: "reregisteration" -> "reregistration" ceph: rename function drop_leases() to a more descriptive name ceph: fix invalid point dereference for error case in mdsc destroy ceph: return proper bool type to caller instead of pointer ceph: optimize memory usage ceph: optimize mds session register libceph, ceph: add __init attribution to init funcitons ceph: filter out used flags when printing unused open flags ceph: don't wait on writeback when there is no more dirty pages ...
Diffstat (limited to 'drivers/block/rbd.c')
-rw-r--r--drivers/block/rbd.c2452
1 files changed, 980 insertions, 1472 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 1e03b04819c8..07dc5419bd63 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -32,6 +32,7 @@
32#include <linux/ceph/osd_client.h> 32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h> 33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/cls_lock_client.h> 34#include <linux/ceph/cls_lock_client.h>
35#include <linux/ceph/striper.h>
35#include <linux/ceph/decode.h> 36#include <linux/ceph/decode.h>
36#include <linux/parser.h> 37#include <linux/parser.h>
37#include <linux/bsearch.h> 38#include <linux/bsearch.h>
@@ -200,95 +201,81 @@ struct rbd_client {
200}; 201};
201 202
202struct rbd_img_request; 203struct rbd_img_request;
203typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
204
205#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
206
207struct rbd_obj_request;
208typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
209 204
210enum obj_request_type { 205enum obj_request_type {
211 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES 206 OBJ_REQUEST_NODATA = 1,
207 OBJ_REQUEST_BIO, /* pointer into provided bio (list) */
208 OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */
209 OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */
212}; 210};
213 211
214enum obj_operation_type { 212enum obj_operation_type {
213 OBJ_OP_READ = 1,
215 OBJ_OP_WRITE, 214 OBJ_OP_WRITE,
216 OBJ_OP_READ,
217 OBJ_OP_DISCARD, 215 OBJ_OP_DISCARD,
218}; 216};
219 217
220enum obj_req_flags { 218/*
221 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */ 219 * Writes go through the following state machine to deal with
222 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */ 220 * layering:
223 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */ 221 *
224 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */ 222 * need copyup
223 * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
224 * | ^ |
225 * v \------------------------------/
226 * done
227 * ^
228 * |
229 * RBD_OBJ_WRITE_FLAT
230 *
231 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
232 * there is a parent or not.
233 */
234enum rbd_obj_write_state {
235 RBD_OBJ_WRITE_FLAT = 1,
236 RBD_OBJ_WRITE_GUARD,
237 RBD_OBJ_WRITE_COPYUP,
225}; 238};
226 239
227struct rbd_obj_request { 240struct rbd_obj_request {
228 u64 object_no; 241 struct ceph_object_extent ex;
229 u64 offset; /* object start byte */
230 u64 length; /* bytes from offset */
231 unsigned long flags;
232
233 /*
234 * An object request associated with an image will have its
235 * img_data flag set; a standalone object request will not.
236 *
237 * A standalone object request will have which == BAD_WHICH
238 * and a null obj_request pointer.
239 *
240 * An object request initiated in support of a layered image
241 * object (to check for its existence before a write) will
242 * have which == BAD_WHICH and a non-null obj_request pointer.
243 *
244 * Finally, an object request for rbd image data will have
245 * which != BAD_WHICH, and will have a non-null img_request
246 * pointer. The value of which will be in the range
247 * 0..(img_request->obj_request_count-1).
248 */
249 union { 242 union {
250 struct rbd_obj_request *obj_request; /* STAT op */ 243 bool tried_parent; /* for reads */
251 struct { 244 enum rbd_obj_write_state write_state; /* for writes */
252 struct rbd_img_request *img_request;
253 u64 img_offset;
254 /* links for img_request->obj_requests list */
255 struct list_head links;
256 };
257 }; 245 };
258 u32 which; /* posn image request list */
259 246
260 enum obj_request_type type; 247 struct rbd_img_request *img_request;
248 struct ceph_file_extent *img_extents;
249 u32 num_img_extents;
250
261 union { 251 union {
262 struct bio *bio_list; 252 struct ceph_bio_iter bio_pos;
263 struct { 253 struct {
264 struct page **pages; 254 struct ceph_bvec_iter bvec_pos;
265 u32 page_count; 255 u32 bvec_count;
256 u32 bvec_idx;
266 }; 257 };
267 }; 258 };
268 struct page **copyup_pages; 259 struct bio_vec *copyup_bvecs;
269 u32 copyup_page_count; 260 u32 copyup_bvec_count;
270 261
271 struct ceph_osd_request *osd_req; 262 struct ceph_osd_request *osd_req;
272 263
273 u64 xferred; /* bytes transferred */ 264 u64 xferred; /* bytes transferred */
274 int result; 265 int result;
275 266
276 rbd_obj_callback_t callback;
277
278 struct kref kref; 267 struct kref kref;
279}; 268};
280 269
281enum img_req_flags { 270enum img_req_flags {
282 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
283 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */ 271 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
284 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ 272 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
285 IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */
286}; 273};
287 274
288struct rbd_img_request { 275struct rbd_img_request {
289 struct rbd_device *rbd_dev; 276 struct rbd_device *rbd_dev;
290 u64 offset; /* starting image byte offset */ 277 enum obj_operation_type op_type;
291 u64 length; /* byte count from offset */ 278 enum obj_request_type data_type;
292 unsigned long flags; 279 unsigned long flags;
293 union { 280 union {
294 u64 snap_id; /* for reads */ 281 u64 snap_id; /* for reads */
@@ -298,26 +285,21 @@ struct rbd_img_request {
298 struct request *rq; /* block request */ 285 struct request *rq; /* block request */
299 struct rbd_obj_request *obj_request; /* obj req initiator */ 286 struct rbd_obj_request *obj_request; /* obj req initiator */
300 }; 287 };
301 struct page **copyup_pages; 288 spinlock_t completion_lock;
302 u32 copyup_page_count;
303 spinlock_t completion_lock;/* protects next_completion */
304 u32 next_completion;
305 rbd_img_callback_t callback;
306 u64 xferred;/* aggregate bytes transferred */ 289 u64 xferred;/* aggregate bytes transferred */
307 int result; /* first nonzero obj_request result */ 290 int result; /* first nonzero obj_request result */
308 291
292 struct list_head object_extents; /* obj_req.ex structs */
309 u32 obj_request_count; 293 u32 obj_request_count;
310 struct list_head obj_requests; /* rbd_obj_request structs */ 294 u32 pending_count;
311 295
312 struct kref kref; 296 struct kref kref;
313}; 297};
314 298
315#define for_each_obj_request(ireq, oreq) \ 299#define for_each_obj_request(ireq, oreq) \
316 list_for_each_entry(oreq, &(ireq)->obj_requests, links) 300 list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
317#define for_each_obj_request_from(ireq, oreq) \
318 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
319#define for_each_obj_request_safe(ireq, oreq, n) \ 301#define for_each_obj_request_safe(ireq, oreq, n) \
320 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) 302 list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
321 303
322enum rbd_watch_state { 304enum rbd_watch_state {
323 RBD_WATCH_STATE_UNREGISTERED, 305 RBD_WATCH_STATE_UNREGISTERED,
@@ -433,8 +415,6 @@ static DEFINE_SPINLOCK(rbd_client_list_lock);
433static struct kmem_cache *rbd_img_request_cache; 415static struct kmem_cache *rbd_img_request_cache;
434static struct kmem_cache *rbd_obj_request_cache; 416static struct kmem_cache *rbd_obj_request_cache;
435 417
436static struct bio_set *rbd_bio_clone;
437
438static int rbd_major; 418static int rbd_major;
439static DEFINE_IDA(rbd_dev_id_ida); 419static DEFINE_IDA(rbd_dev_id_ida);
440 420
@@ -447,8 +427,6 @@ static bool single_major = true;
447module_param(single_major, bool, S_IRUGO); 427module_param(single_major, bool, S_IRUGO);
448MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)"); 428MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
449 429
450static int rbd_img_request_submit(struct rbd_img_request *img_request);
451
452static ssize_t rbd_add(struct bus_type *bus, const char *buf, 430static ssize_t rbd_add(struct bus_type *bus, const char *buf,
453 size_t count); 431 size_t count);
454static ssize_t rbd_remove(struct bus_type *bus, const char *buf, 432static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
@@ -458,7 +436,6 @@ static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
458static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf, 436static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
459 size_t count); 437 size_t count);
460static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth); 438static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
461static void rbd_spec_put(struct rbd_spec *spec);
462 439
463static int rbd_dev_id_to_minor(int dev_id) 440static int rbd_dev_id_to_minor(int dev_id)
464{ 441{
@@ -577,9 +554,6 @@ void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
577# define rbd_assert(expr) ((void) 0) 554# define rbd_assert(expr) ((void) 0)
578#endif /* !RBD_DEBUG */ 555#endif /* !RBD_DEBUG */
579 556
580static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
581static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
582static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
583static void rbd_dev_remove_parent(struct rbd_device *rbd_dev); 557static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
584 558
585static int rbd_dev_refresh(struct rbd_device *rbd_dev); 559static int rbd_dev_refresh(struct rbd_device *rbd_dev);
@@ -857,26 +831,6 @@ static char* obj_op_name(enum obj_operation_type op_type)
857} 831}
858 832
859/* 833/*
860 * Get a ceph client with specific addr and configuration, if one does
861 * not exist create it. Either way, ceph_opts is consumed by this
862 * function.
863 */
864static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
865{
866 struct rbd_client *rbdc;
867
868 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
869 rbdc = rbd_client_find(ceph_opts);
870 if (rbdc) /* using an existing client */
871 ceph_destroy_options(ceph_opts);
872 else
873 rbdc = rbd_client_create(ceph_opts);
874 mutex_unlock(&client_mutex);
875
876 return rbdc;
877}
878
879/*
880 * Destroy ceph client 834 * Destroy ceph client
881 * 835 *
882 * Caller must hold rbd_client_list_lock. 836 * Caller must hold rbd_client_list_lock.
@@ -904,6 +858,56 @@ static void rbd_put_client(struct rbd_client *rbdc)
904 kref_put(&rbdc->kref, rbd_client_release); 858 kref_put(&rbdc->kref, rbd_client_release);
905} 859}
906 860
861static int wait_for_latest_osdmap(struct ceph_client *client)
862{
863 u64 newest_epoch;
864 int ret;
865
866 ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch);
867 if (ret)
868 return ret;
869
870 if (client->osdc.osdmap->epoch >= newest_epoch)
871 return 0;
872
873 ceph_osdc_maybe_request_map(&client->osdc);
874 return ceph_monc_wait_osdmap(&client->monc, newest_epoch,
875 client->options->mount_timeout);
876}
877
878/*
879 * Get a ceph client with specific addr and configuration, if one does
880 * not exist create it. Either way, ceph_opts is consumed by this
881 * function.
882 */
883static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
884{
885 struct rbd_client *rbdc;
886 int ret;
887
888 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
889 rbdc = rbd_client_find(ceph_opts);
890 if (rbdc) {
891 ceph_destroy_options(ceph_opts);
892
893 /*
894 * Using an existing client. Make sure ->pg_pools is up to
895 * date before we look up the pool id in do_rbd_add().
896 */
897 ret = wait_for_latest_osdmap(rbdc->client);
898 if (ret) {
899 rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
900 rbd_put_client(rbdc);
901 rbdc = ERR_PTR(ret);
902 }
903 } else {
904 rbdc = rbd_client_create(ceph_opts);
905 }
906 mutex_unlock(&client_mutex);
907
908 return rbdc;
909}
910
907static bool rbd_image_format_valid(u32 image_format) 911static bool rbd_image_format_valid(u32 image_format)
908{ 912{
909 return image_format == 1 || image_format == 2; 913 return image_format == 1 || image_format == 2;
@@ -1223,272 +1227,59 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1223 rbd_dev->mapping.features = 0; 1227 rbd_dev->mapping.features = 0;
1224} 1228}
1225 1229
1226static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) 1230static void zero_bvec(struct bio_vec *bv)
1227{
1228 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1229
1230 return offset & (segment_size - 1);
1231}
1232
1233static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1234 u64 offset, u64 length)
1235{
1236 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1237
1238 offset &= segment_size - 1;
1239
1240 rbd_assert(length <= U64_MAX - offset);
1241 if (offset + length > segment_size)
1242 length = segment_size - offset;
1243
1244 return length;
1245}
1246
1247/*
1248 * bio helpers
1249 */
1250
1251static void bio_chain_put(struct bio *chain)
1252{
1253 struct bio *tmp;
1254
1255 while (chain) {
1256 tmp = chain;
1257 chain = chain->bi_next;
1258 bio_put(tmp);
1259 }
1260}
1261
1262/*
1263 * zeros a bio chain, starting at specific offset
1264 */
1265static void zero_bio_chain(struct bio *chain, int start_ofs)
1266{ 1231{
1267 struct bio_vec bv;
1268 struct bvec_iter iter;
1269 unsigned long flags;
1270 void *buf; 1232 void *buf;
1271 int pos = 0; 1233 unsigned long flags;
1272
1273 while (chain) {
1274 bio_for_each_segment(bv, chain, iter) {
1275 if (pos + bv.bv_len > start_ofs) {
1276 int remainder = max(start_ofs - pos, 0);
1277 buf = bvec_kmap_irq(&bv, &flags);
1278 memset(buf + remainder, 0,
1279 bv.bv_len - remainder);
1280 flush_dcache_page(bv.bv_page);
1281 bvec_kunmap_irq(buf, &flags);
1282 }
1283 pos += bv.bv_len;
1284 }
1285 1234
1286 chain = chain->bi_next; 1235 buf = bvec_kmap_irq(bv, &flags);
1287 } 1236 memset(buf, 0, bv->bv_len);
1237 flush_dcache_page(bv->bv_page);
1238 bvec_kunmap_irq(buf, &flags);
1288} 1239}
1289 1240
1290/* 1241static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1291 * similar to zero_bio_chain(), zeros data defined by a page array,
1292 * starting at the given byte offset from the start of the array and
1293 * continuing up to the given end offset. The pages array is
1294 * assumed to be big enough to hold all bytes up to the end.
1295 */
1296static void zero_pages(struct page **pages, u64 offset, u64 end)
1297{ 1242{
1298 struct page **page = &pages[offset >> PAGE_SHIFT]; 1243 struct ceph_bio_iter it = *bio_pos;
1299 1244
1300 rbd_assert(end > offset); 1245 ceph_bio_iter_advance(&it, off);
1301 rbd_assert(end - offset <= (u64)SIZE_MAX); 1246 ceph_bio_iter_advance_step(&it, bytes, ({
1302 while (offset < end) { 1247 zero_bvec(&bv);
1303 size_t page_offset; 1248 }));
1304 size_t length;
1305 unsigned long flags;
1306 void *kaddr;
1307
1308 page_offset = offset & ~PAGE_MASK;
1309 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1310 local_irq_save(flags);
1311 kaddr = kmap_atomic(*page);
1312 memset(kaddr + page_offset, 0, length);
1313 flush_dcache_page(*page);
1314 kunmap_atomic(kaddr);
1315 local_irq_restore(flags);
1316
1317 offset += length;
1318 page++;
1319 }
1320} 1249}
1321 1250
1322/* 1251static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1323 * Clone a portion of a bio, starting at the given byte offset
1324 * and continuing for the number of bytes indicated.
1325 */
1326static struct bio *bio_clone_range(struct bio *bio_src,
1327 unsigned int offset,
1328 unsigned int len,
1329 gfp_t gfpmask)
1330{ 1252{
1331 struct bio *bio; 1253 struct ceph_bvec_iter it = *bvec_pos;
1332
1333 bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone);
1334 if (!bio)
1335 return NULL; /* ENOMEM */
1336 1254
1337 bio_advance(bio, offset); 1255 ceph_bvec_iter_advance(&it, off);
1338 bio->bi_iter.bi_size = len; 1256 ceph_bvec_iter_advance_step(&it, bytes, ({
1339 1257 zero_bvec(&bv);
1340 return bio; 1258 }));
1341} 1259}
1342 1260
1343/* 1261/*
1344 * Clone a portion of a bio chain, starting at the given byte offset 1262 * Zero a range in @obj_req data buffer defined by a bio (list) or
1345 * into the first bio in the source chain and continuing for the 1263 * (private) bio_vec array.
1346 * number of bytes indicated. The result is another bio chain of
1347 * exactly the given length, or a null pointer on error.
1348 *
1349 * The bio_src and offset parameters are both in-out. On entry they
1350 * refer to the first source bio and the offset into that bio where
1351 * the start of data to be cloned is located.
1352 * 1264 *
1353 * On return, bio_src is updated to refer to the bio in the source 1265 * @off is relative to the start of the data buffer.
1354 * chain that contains first un-cloned byte, and *offset will
1355 * contain the offset of that byte within that bio.
1356 */ 1266 */
1357static struct bio *bio_chain_clone_range(struct bio **bio_src, 1267static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1358 unsigned int *offset, 1268 u32 bytes)
1359 unsigned int len,
1360 gfp_t gfpmask)
1361{ 1269{
1362 struct bio *bi = *bio_src; 1270 switch (obj_req->img_request->data_type) {
1363 unsigned int off = *offset; 1271 case OBJ_REQUEST_BIO:
1364 struct bio *chain = NULL; 1272 zero_bios(&obj_req->bio_pos, off, bytes);
1365 struct bio **end; 1273 break;
1366 1274 case OBJ_REQUEST_BVECS:
1367 /* Build up a chain of clone bios up to the limit */ 1275 case OBJ_REQUEST_OWN_BVECS:
1368 1276 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1369 if (!bi || off >= bi->bi_iter.bi_size || !len) 1277 break;
1370 return NULL; /* Nothing to clone */ 1278 default:
1371 1279 rbd_assert(0);
1372 end = &chain;
1373 while (len) {
1374 unsigned int bi_size;
1375 struct bio *bio;
1376
1377 if (!bi) {
1378 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1379 goto out_err; /* EINVAL; ran out of bio's */
1380 }
1381 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1382 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1383 if (!bio)
1384 goto out_err; /* ENOMEM */
1385
1386 *end = bio;
1387 end = &bio->bi_next;
1388
1389 off += bi_size;
1390 if (off == bi->bi_iter.bi_size) {
1391 bi = bi->bi_next;
1392 off = 0;
1393 }
1394 len -= bi_size;
1395 }
1396 *bio_src = bi;
1397 *offset = off;
1398
1399 return chain;
1400out_err:
1401 bio_chain_put(chain);
1402
1403 return NULL;
1404}
1405
1406/*
1407 * The default/initial value for all object request flags is 0. For
1408 * each flag, once its value is set to 1 it is never reset to 0
1409 * again.
1410 */
1411static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1412{
1413 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1414 struct rbd_device *rbd_dev;
1415
1416 rbd_dev = obj_request->img_request->rbd_dev;
1417 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1418 obj_request);
1419 }
1420}
1421
1422static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1423{
1424 smp_mb();
1425 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1426}
1427
1428static void obj_request_done_set(struct rbd_obj_request *obj_request)
1429{
1430 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1431 struct rbd_device *rbd_dev = NULL;
1432
1433 if (obj_request_img_data_test(obj_request))
1434 rbd_dev = obj_request->img_request->rbd_dev;
1435 rbd_warn(rbd_dev, "obj_request %p already marked done",
1436 obj_request);
1437 } 1280 }
1438} 1281}
1439 1282
1440static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1441{
1442 smp_mb();
1443 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1444}
1445
1446/*
1447 * This sets the KNOWN flag after (possibly) setting the EXISTS
1448 * flag. The latter is set based on the "exists" value provided.
1449 *
1450 * Note that for our purposes once an object exists it never goes
1451 * away again. It's possible that the response from two existence
1452 * checks are separated by the creation of the target object, and
1453 * the first ("doesn't exist") response arrives *after* the second
1454 * ("does exist"). In that case we ignore the second one.
1455 */
1456static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1457 bool exists)
1458{
1459 if (exists)
1460 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1461 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1462 smp_mb();
1463}
1464
1465static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1466{
1467 smp_mb();
1468 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1469}
1470
1471static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1472{
1473 smp_mb();
1474 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1475}
1476
1477static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1478{
1479 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1480
1481 return obj_request->img_offset <
1482 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1483}
1484
1485static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1486{
1487 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1488 kref_read(&obj_request->kref));
1489 kref_get(&obj_request->kref);
1490}
1491
1492static void rbd_obj_request_destroy(struct kref *kref); 1283static void rbd_obj_request_destroy(struct kref *kref);
1493static void rbd_obj_request_put(struct rbd_obj_request *obj_request) 1284static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1494{ 1285{
@@ -1505,18 +1296,13 @@ static void rbd_img_request_get(struct rbd_img_request *img_request)
1505 kref_get(&img_request->kref); 1296 kref_get(&img_request->kref);
1506} 1297}
1507 1298
1508static bool img_request_child_test(struct rbd_img_request *img_request);
1509static void rbd_parent_request_destroy(struct kref *kref);
1510static void rbd_img_request_destroy(struct kref *kref); 1299static void rbd_img_request_destroy(struct kref *kref);
1511static void rbd_img_request_put(struct rbd_img_request *img_request) 1300static void rbd_img_request_put(struct rbd_img_request *img_request)
1512{ 1301{
1513 rbd_assert(img_request != NULL); 1302 rbd_assert(img_request != NULL);
1514 dout("%s: img %p (was %d)\n", __func__, img_request, 1303 dout("%s: img %p (was %d)\n", __func__, img_request,
1515 kref_read(&img_request->kref)); 1304 kref_read(&img_request->kref));
1516 if (img_request_child_test(img_request)) 1305 kref_put(&img_request->kref, rbd_img_request_destroy);
1517 kref_put(&img_request->kref, rbd_parent_request_destroy);
1518 else
1519 kref_put(&img_request->kref, rbd_img_request_destroy);
1520} 1306}
1521 1307
1522static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, 1308static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
@@ -1526,139 +1312,37 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1526 1312
1527 /* Image request now owns object's original reference */ 1313 /* Image request now owns object's original reference */
1528 obj_request->img_request = img_request; 1314 obj_request->img_request = img_request;
1529 obj_request->which = img_request->obj_request_count;
1530 rbd_assert(!obj_request_img_data_test(obj_request));
1531 obj_request_img_data_set(obj_request);
1532 rbd_assert(obj_request->which != BAD_WHICH);
1533 img_request->obj_request_count++; 1315 img_request->obj_request_count++;
1534 list_add_tail(&obj_request->links, &img_request->obj_requests); 1316 img_request->pending_count++;
1535 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, 1317 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1536 obj_request->which);
1537} 1318}
1538 1319
1539static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, 1320static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1540 struct rbd_obj_request *obj_request) 1321 struct rbd_obj_request *obj_request)
1541{ 1322{
1542 rbd_assert(obj_request->which != BAD_WHICH); 1323 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1543 1324 list_del(&obj_request->ex.oe_item);
1544 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1545 obj_request->which);
1546 list_del(&obj_request->links);
1547 rbd_assert(img_request->obj_request_count > 0); 1325 rbd_assert(img_request->obj_request_count > 0);
1548 img_request->obj_request_count--; 1326 img_request->obj_request_count--;
1549 rbd_assert(obj_request->which == img_request->obj_request_count);
1550 obj_request->which = BAD_WHICH;
1551 rbd_assert(obj_request_img_data_test(obj_request));
1552 rbd_assert(obj_request->img_request == img_request); 1327 rbd_assert(obj_request->img_request == img_request);
1553 obj_request->img_request = NULL;
1554 obj_request->callback = NULL;
1555 rbd_obj_request_put(obj_request); 1328 rbd_obj_request_put(obj_request);
1556} 1329}
1557 1330
1558static bool obj_request_type_valid(enum obj_request_type type)
1559{
1560 switch (type) {
1561 case OBJ_REQUEST_NODATA:
1562 case OBJ_REQUEST_BIO:
1563 case OBJ_REQUEST_PAGES:
1564 return true;
1565 default:
1566 return false;
1567 }
1568}
1569
1570static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1571
1572static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) 1331static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1573{ 1332{
1574 struct ceph_osd_request *osd_req = obj_request->osd_req; 1333 struct ceph_osd_request *osd_req = obj_request->osd_req;
1575 1334
1576 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__, 1335 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1577 obj_request, obj_request->object_no, obj_request->offset, 1336 obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
1578 obj_request->length, osd_req); 1337 obj_request->ex.oe_len, osd_req);
1579 if (obj_request_img_data_test(obj_request)) {
1580 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1581 rbd_img_request_get(obj_request->img_request);
1582 }
1583 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); 1338 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1584} 1339}
1585 1340
1586static void rbd_img_request_complete(struct rbd_img_request *img_request)
1587{
1588
1589 dout("%s: img %p\n", __func__, img_request);
1590
1591 /*
1592 * If no error occurred, compute the aggregate transfer
1593 * count for the image request. We could instead use
1594 * atomic64_cmpxchg() to update it as each object request
1595 * completes; not clear which way is better off hand.
1596 */
1597 if (!img_request->result) {
1598 struct rbd_obj_request *obj_request;
1599 u64 xferred = 0;
1600
1601 for_each_obj_request(img_request, obj_request)
1602 xferred += obj_request->xferred;
1603 img_request->xferred = xferred;
1604 }
1605
1606 if (img_request->callback)
1607 img_request->callback(img_request);
1608 else
1609 rbd_img_request_put(img_request);
1610}
1611
1612/* 1341/*
1613 * The default/initial value for all image request flags is 0. Each 1342 * The default/initial value for all image request flags is 0. Each
1614 * is conditionally set to 1 at image request initialization time 1343 * is conditionally set to 1 at image request initialization time
1615 * and currently never change thereafter. 1344 * and currently never change thereafter.
1616 */ 1345 */
1617static void img_request_write_set(struct rbd_img_request *img_request)
1618{
1619 set_bit(IMG_REQ_WRITE, &img_request->flags);
1620 smp_mb();
1621}
1622
1623static bool img_request_write_test(struct rbd_img_request *img_request)
1624{
1625 smp_mb();
1626 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1627}
1628
1629/*
1630 * Set the discard flag when the img_request is an discard request
1631 */
1632static void img_request_discard_set(struct rbd_img_request *img_request)
1633{
1634 set_bit(IMG_REQ_DISCARD, &img_request->flags);
1635 smp_mb();
1636}
1637
1638static bool img_request_discard_test(struct rbd_img_request *img_request)
1639{
1640 smp_mb();
1641 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1642}
1643
1644static void img_request_child_set(struct rbd_img_request *img_request)
1645{
1646 set_bit(IMG_REQ_CHILD, &img_request->flags);
1647 smp_mb();
1648}
1649
1650static void img_request_child_clear(struct rbd_img_request *img_request)
1651{
1652 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1653 smp_mb();
1654}
1655
1656static bool img_request_child_test(struct rbd_img_request *img_request)
1657{
1658 smp_mb();
1659 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1660}
1661
1662static void img_request_layered_set(struct rbd_img_request *img_request) 1346static void img_request_layered_set(struct rbd_img_request *img_request)
1663{ 1347{
1664 set_bit(IMG_REQ_LAYERED, &img_request->flags); 1348 set_bit(IMG_REQ_LAYERED, &img_request->flags);
@@ -1677,209 +1361,70 @@ static bool img_request_layered_test(struct rbd_img_request *img_request)
1677 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; 1361 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1678} 1362}
1679 1363
1680static enum obj_operation_type 1364static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1681rbd_img_request_op_type(struct rbd_img_request *img_request)
1682{
1683 if (img_request_write_test(img_request))
1684 return OBJ_OP_WRITE;
1685 else if (img_request_discard_test(img_request))
1686 return OBJ_OP_DISCARD;
1687 else
1688 return OBJ_OP_READ;
1689}
1690
1691static void
1692rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1693{
1694 u64 xferred = obj_request->xferred;
1695 u64 length = obj_request->length;
1696
1697 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1698 obj_request, obj_request->img_request, obj_request->result,
1699 xferred, length);
1700 /*
1701 * ENOENT means a hole in the image. We zero-fill the entire
1702 * length of the request. A short read also implies zero-fill
1703 * to the end of the request. An error requires the whole
1704 * length of the request to be reported finished with an error
1705 * to the block layer. In each case we update the xferred
1706 * count to indicate the whole request was satisfied.
1707 */
1708 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1709 if (obj_request->result == -ENOENT) {
1710 if (obj_request->type == OBJ_REQUEST_BIO)
1711 zero_bio_chain(obj_request->bio_list, 0);
1712 else
1713 zero_pages(obj_request->pages, 0, length);
1714 obj_request->result = 0;
1715 } else if (xferred < length && !obj_request->result) {
1716 if (obj_request->type == OBJ_REQUEST_BIO)
1717 zero_bio_chain(obj_request->bio_list, xferred);
1718 else
1719 zero_pages(obj_request->pages, xferred, length);
1720 }
1721 obj_request->xferred = length;
1722 obj_request_done_set(obj_request);
1723}
1724
1725static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1726{ 1365{
1727 dout("%s: obj %p cb %p\n", __func__, obj_request, 1366 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1728 obj_request->callback);
1729 obj_request->callback(obj_request);
1730}
1731 1367
1732static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err) 1368 return !obj_req->ex.oe_off &&
1733{ 1369 obj_req->ex.oe_len == rbd_dev->layout.object_size;
1734 obj_request->result = err;
1735 obj_request->xferred = 0;
1736 /*
1737 * kludge - mirror rbd_obj_request_submit() to match a put in
1738 * rbd_img_obj_callback()
1739 */
1740 if (obj_request_img_data_test(obj_request)) {
1741 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1742 rbd_img_request_get(obj_request->img_request);
1743 }
1744 obj_request_done_set(obj_request);
1745 rbd_obj_request_complete(obj_request);
1746} 1370}
1747 1371
1748static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) 1372static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1749{ 1373{
1750 struct rbd_img_request *img_request = NULL; 1374 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1751 struct rbd_device *rbd_dev = NULL;
1752 bool layered = false;
1753
1754 if (obj_request_img_data_test(obj_request)) {
1755 img_request = obj_request->img_request;
1756 layered = img_request && img_request_layered_test(img_request);
1757 rbd_dev = img_request->rbd_dev;
1758 }
1759
1760 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1761 obj_request, img_request, obj_request->result,
1762 obj_request->xferred, obj_request->length);
1763 if (layered && obj_request->result == -ENOENT &&
1764 obj_request->img_offset < rbd_dev->parent_overlap)
1765 rbd_img_parent_read(obj_request);
1766 else if (img_request)
1767 rbd_img_obj_request_read_callback(obj_request);
1768 else
1769 obj_request_done_set(obj_request);
1770}
1771 1375
1772static void rbd_osd_write_callback(struct rbd_obj_request *obj_request) 1376 return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1773{ 1377 rbd_dev->layout.object_size;
1774 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1775 obj_request->result, obj_request->length);
1776 /*
1777 * There is no such thing as a successful short write. Set
1778 * it to our originally-requested length.
1779 */
1780 obj_request->xferred = obj_request->length;
1781 obj_request_done_set(obj_request);
1782} 1378}
1783 1379
1784static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request) 1380static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1785{ 1381{
1786 dout("%s: obj %p result %d %llu\n", __func__, obj_request, 1382 return ceph_file_extents_bytes(obj_req->img_extents,
1787 obj_request->result, obj_request->length); 1383 obj_req->num_img_extents);
1788 /*
1789 * There is no such thing as a successful short discard. Set
1790 * it to our originally-requested length.
1791 */
1792 obj_request->xferred = obj_request->length;
1793 /* discarding a non-existent object is not a problem */
1794 if (obj_request->result == -ENOENT)
1795 obj_request->result = 0;
1796 obj_request_done_set(obj_request);
1797} 1384}
1798 1385
1799/* 1386static bool rbd_img_is_write(struct rbd_img_request *img_req)
1800 * For a simple stat call there's nothing to do. We'll do more if
1801 * this is part of a write sequence for a layered image.
1802 */
1803static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1804{ 1387{
1805 dout("%s: obj %p\n", __func__, obj_request); 1388 switch (img_req->op_type) {
1806 obj_request_done_set(obj_request); 1389 case OBJ_OP_READ:
1390 return false;
1391 case OBJ_OP_WRITE:
1392 case OBJ_OP_DISCARD:
1393 return true;
1394 default:
1395 rbd_assert(0);
1396 }
1807} 1397}
1808 1398
1809static void rbd_osd_call_callback(struct rbd_obj_request *obj_request) 1399static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
1810{
1811 dout("%s: obj %p\n", __func__, obj_request);
1812
1813 if (obj_request_img_data_test(obj_request))
1814 rbd_osd_copyup_callback(obj_request);
1815 else
1816 obj_request_done_set(obj_request);
1817}
1818 1400
1819static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) 1401static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1820{ 1402{
1821 struct rbd_obj_request *obj_request = osd_req->r_priv; 1403 struct rbd_obj_request *obj_req = osd_req->r_priv;
1822 u16 opcode;
1823 1404
1824 dout("%s: osd_req %p\n", __func__, osd_req); 1405 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1825 rbd_assert(osd_req == obj_request->osd_req); 1406 osd_req->r_result, obj_req);
1826 if (obj_request_img_data_test(obj_request)) { 1407 rbd_assert(osd_req == obj_req->osd_req);
1827 rbd_assert(obj_request->img_request);
1828 rbd_assert(obj_request->which != BAD_WHICH);
1829 } else {
1830 rbd_assert(obj_request->which == BAD_WHICH);
1831 }
1832
1833 if (osd_req->r_result < 0)
1834 obj_request->result = osd_req->r_result;
1835
1836 /*
1837 * We support a 64-bit length, but ultimately it has to be
1838 * passed to the block layer, which just supports a 32-bit
1839 * length field.
1840 */
1841 obj_request->xferred = osd_req->r_ops[0].outdata_len;
1842 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1843 1408
1844 opcode = osd_req->r_ops[0].op; 1409 obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
1845 switch (opcode) { 1410 if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
1846 case CEPH_OSD_OP_READ: 1411 obj_req->xferred = osd_req->r_result;
1847 rbd_osd_read_callback(obj_request); 1412 else
1848 break; 1413 /*
1849 case CEPH_OSD_OP_SETALLOCHINT: 1414 * Writes aren't allowed to return a data payload. In some
1850 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE || 1415 * guarded write cases (e.g. stat + zero on an empty object)
1851 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL); 1416 * a stat response makes it through, but we don't care.
1852 /* fall through */ 1417 */
1853 case CEPH_OSD_OP_WRITE: 1418 obj_req->xferred = 0;
1854 case CEPH_OSD_OP_WRITEFULL:
1855 rbd_osd_write_callback(obj_request);
1856 break;
1857 case CEPH_OSD_OP_STAT:
1858 rbd_osd_stat_callback(obj_request);
1859 break;
1860 case CEPH_OSD_OP_DELETE:
1861 case CEPH_OSD_OP_TRUNCATE:
1862 case CEPH_OSD_OP_ZERO:
1863 rbd_osd_discard_callback(obj_request);
1864 break;
1865 case CEPH_OSD_OP_CALL:
1866 rbd_osd_call_callback(obj_request);
1867 break;
1868 default:
1869 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1870 obj_request->object_no, opcode);
1871 break;
1872 }
1873 1419
1874 if (obj_request_done_test(obj_request)) 1420 rbd_obj_handle_request(obj_req);
1875 rbd_obj_request_complete(obj_request);
1876} 1421}
1877 1422
1878static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) 1423static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1879{ 1424{
1880 struct ceph_osd_request *osd_req = obj_request->osd_req; 1425 struct ceph_osd_request *osd_req = obj_request->osd_req;
1881 1426
1882 rbd_assert(obj_request_img_data_test(obj_request)); 1427 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1883 osd_req->r_snapid = obj_request->img_request->snap_id; 1428 osd_req->r_snapid = obj_request->img_request->snap_id;
1884} 1429}
1885 1430
@@ -1887,32 +1432,33 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1887{ 1432{
1888 struct ceph_osd_request *osd_req = obj_request->osd_req; 1433 struct ceph_osd_request *osd_req = obj_request->osd_req;
1889 1434
1435 osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1890 ktime_get_real_ts(&osd_req->r_mtime); 1436 ktime_get_real_ts(&osd_req->r_mtime);
1891 osd_req->r_data_offset = obj_request->offset; 1437 osd_req->r_data_offset = obj_request->ex.oe_off;
1892} 1438}
1893 1439
1894static struct ceph_osd_request * 1440static struct ceph_osd_request *
1895__rbd_osd_req_create(struct rbd_device *rbd_dev, 1441rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
1896 struct ceph_snap_context *snapc,
1897 int num_ops, unsigned int flags,
1898 struct rbd_obj_request *obj_request)
1899{ 1442{
1443 struct rbd_img_request *img_req = obj_req->img_request;
1444 struct rbd_device *rbd_dev = img_req->rbd_dev;
1900 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1445 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1901 struct ceph_osd_request *req; 1446 struct ceph_osd_request *req;
1902 const char *name_format = rbd_dev->image_format == 1 ? 1447 const char *name_format = rbd_dev->image_format == 1 ?
1903 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; 1448 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1904 1449
1905 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO); 1450 req = ceph_osdc_alloc_request(osdc,
1451 (rbd_img_is_write(img_req) ? img_req->snapc : NULL),
1452 num_ops, false, GFP_NOIO);
1906 if (!req) 1453 if (!req)
1907 return NULL; 1454 return NULL;
1908 1455
1909 req->r_flags = flags;
1910 req->r_callback = rbd_osd_req_callback; 1456 req->r_callback = rbd_osd_req_callback;
1911 req->r_priv = obj_request; 1457 req->r_priv = obj_req;
1912 1458
1913 req->r_base_oloc.pool = rbd_dev->layout.pool_id; 1459 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1914 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, 1460 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1915 rbd_dev->header.object_prefix, obj_request->object_no)) 1461 rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
1916 goto err_req; 1462 goto err_req;
1917 1463
1918 if (ceph_osdc_alloc_messages(req, GFP_NOIO)) 1464 if (ceph_osdc_alloc_messages(req, GFP_NOIO))
@@ -1925,83 +1471,20 @@ err_req:
1925 return NULL; 1471 return NULL;
1926} 1472}
1927 1473
1928/*
1929 * Create an osd request. A read request has one osd op (read).
1930 * A write request has either one (watch) or two (hint+write) osd ops.
1931 * (All rbd data writes are prefixed with an allocation hint op, but
1932 * technically osd watch is a write request, hence this distinction.)
1933 */
1934static struct ceph_osd_request *rbd_osd_req_create(
1935 struct rbd_device *rbd_dev,
1936 enum obj_operation_type op_type,
1937 unsigned int num_ops,
1938 struct rbd_obj_request *obj_request)
1939{
1940 struct ceph_snap_context *snapc = NULL;
1941
1942 if (obj_request_img_data_test(obj_request) &&
1943 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1944 struct rbd_img_request *img_request = obj_request->img_request;
1945 if (op_type == OBJ_OP_WRITE) {
1946 rbd_assert(img_request_write_test(img_request));
1947 } else {
1948 rbd_assert(img_request_discard_test(img_request));
1949 }
1950 snapc = img_request->snapc;
1951 }
1952
1953 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1954
1955 return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1956 (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
1957 CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
1958}
1959
1960/*
1961 * Create a copyup osd request based on the information in the object
1962 * request supplied. A copyup request has two or three osd ops, a
1963 * copyup method call, potentially a hint op, and a write or truncate
1964 * or zero op.
1965 */
1966static struct ceph_osd_request *
1967rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1968{
1969 struct rbd_img_request *img_request;
1970 int num_osd_ops = 3;
1971
1972 rbd_assert(obj_request_img_data_test(obj_request));
1973 img_request = obj_request->img_request;
1974 rbd_assert(img_request);
1975 rbd_assert(img_request_write_test(img_request) ||
1976 img_request_discard_test(img_request));
1977
1978 if (img_request_discard_test(img_request))
1979 num_osd_ops = 2;
1980
1981 return __rbd_osd_req_create(img_request->rbd_dev,
1982 img_request->snapc, num_osd_ops,
1983 CEPH_OSD_FLAG_WRITE, obj_request);
1984}
1985
1986static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) 1474static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1987{ 1475{
1988 ceph_osdc_put_request(osd_req); 1476 ceph_osdc_put_request(osd_req);
1989} 1477}
1990 1478
1991static struct rbd_obj_request * 1479static struct rbd_obj_request *rbd_obj_request_create(void)
1992rbd_obj_request_create(enum obj_request_type type)
1993{ 1480{
1994 struct rbd_obj_request *obj_request; 1481 struct rbd_obj_request *obj_request;
1995 1482
1996 rbd_assert(obj_request_type_valid(type));
1997
1998 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO); 1483 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1999 if (!obj_request) 1484 if (!obj_request)
2000 return NULL; 1485 return NULL;
2001 1486
2002 obj_request->which = BAD_WHICH; 1487 ceph_object_extent_init(&obj_request->ex);
2003 obj_request->type = type;
2004 INIT_LIST_HEAD(&obj_request->links);
2005 kref_init(&obj_request->kref); 1488 kref_init(&obj_request->kref);
2006 1489
2007 dout("%s %p\n", __func__, obj_request); 1490 dout("%s %p\n", __func__, obj_request);
@@ -2011,32 +1494,34 @@ rbd_obj_request_create(enum obj_request_type type)
2011static void rbd_obj_request_destroy(struct kref *kref) 1494static void rbd_obj_request_destroy(struct kref *kref)
2012{ 1495{
2013 struct rbd_obj_request *obj_request; 1496 struct rbd_obj_request *obj_request;
1497 u32 i;
2014 1498
2015 obj_request = container_of(kref, struct rbd_obj_request, kref); 1499 obj_request = container_of(kref, struct rbd_obj_request, kref);
2016 1500
2017 dout("%s: obj %p\n", __func__, obj_request); 1501 dout("%s: obj %p\n", __func__, obj_request);
2018 1502
2019 rbd_assert(obj_request->img_request == NULL);
2020 rbd_assert(obj_request->which == BAD_WHICH);
2021
2022 if (obj_request->osd_req) 1503 if (obj_request->osd_req)
2023 rbd_osd_req_destroy(obj_request->osd_req); 1504 rbd_osd_req_destroy(obj_request->osd_req);
2024 1505
2025 rbd_assert(obj_request_type_valid(obj_request->type)); 1506 switch (obj_request->img_request->data_type) {
2026 switch (obj_request->type) {
2027 case OBJ_REQUEST_NODATA: 1507 case OBJ_REQUEST_NODATA:
2028 break; /* Nothing to do */
2029 case OBJ_REQUEST_BIO: 1508 case OBJ_REQUEST_BIO:
2030 if (obj_request->bio_list) 1509 case OBJ_REQUEST_BVECS:
2031 bio_chain_put(obj_request->bio_list); 1510 break; /* Nothing to do */
2032 break; 1511 case OBJ_REQUEST_OWN_BVECS:
2033 case OBJ_REQUEST_PAGES: 1512 kfree(obj_request->bvec_pos.bvecs);
2034 /* img_data requests don't own their page array */
2035 if (obj_request->pages &&
2036 !obj_request_img_data_test(obj_request))
2037 ceph_release_page_vector(obj_request->pages,
2038 obj_request->page_count);
2039 break; 1513 break;
1514 default:
1515 rbd_assert(0);
1516 }
1517
1518 kfree(obj_request->img_extents);
1519 if (obj_request->copyup_bvecs) {
1520 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1521 if (obj_request->copyup_bvecs[i].bv_page)
1522 __free_page(obj_request->copyup_bvecs[i].bv_page);
1523 }
1524 kfree(obj_request->copyup_bvecs);
2040 } 1525 }
2041 1526
2042 kmem_cache_free(rbd_obj_request_cache, obj_request); 1527 kmem_cache_free(rbd_obj_request_cache, obj_request);
@@ -2111,7 +1596,6 @@ static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2111 */ 1596 */
2112static struct rbd_img_request *rbd_img_request_create( 1597static struct rbd_img_request *rbd_img_request_create(
2113 struct rbd_device *rbd_dev, 1598 struct rbd_device *rbd_dev,
2114 u64 offset, u64 length,
2115 enum obj_operation_type op_type, 1599 enum obj_operation_type op_type,
2116 struct ceph_snap_context *snapc) 1600 struct ceph_snap_context *snapc)
2117{ 1601{
@@ -2122,27 +1606,21 @@ static struct rbd_img_request *rbd_img_request_create(
2122 return NULL; 1606 return NULL;
2123 1607
2124 img_request->rbd_dev = rbd_dev; 1608 img_request->rbd_dev = rbd_dev;
2125 img_request->offset = offset; 1609 img_request->op_type = op_type;
2126 img_request->length = length; 1610 if (!rbd_img_is_write(img_request))
2127 if (op_type == OBJ_OP_DISCARD) {
2128 img_request_discard_set(img_request);
2129 img_request->snapc = snapc;
2130 } else if (op_type == OBJ_OP_WRITE) {
2131 img_request_write_set(img_request);
2132 img_request->snapc = snapc;
2133 } else {
2134 img_request->snap_id = rbd_dev->spec->snap_id; 1611 img_request->snap_id = rbd_dev->spec->snap_id;
2135 } 1612 else
1613 img_request->snapc = snapc;
1614
2136 if (rbd_dev_parent_get(rbd_dev)) 1615 if (rbd_dev_parent_get(rbd_dev))
2137 img_request_layered_set(img_request); 1616 img_request_layered_set(img_request);
2138 1617
2139 spin_lock_init(&img_request->completion_lock); 1618 spin_lock_init(&img_request->completion_lock);
2140 INIT_LIST_HEAD(&img_request->obj_requests); 1619 INIT_LIST_HEAD(&img_request->object_extents);
2141 kref_init(&img_request->kref); 1620 kref_init(&img_request->kref);
2142 1621
2143 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev, 1622 dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
2144 obj_op_name(op_type), offset, length, img_request); 1623 obj_op_name(op_type), img_request);
2145
2146 return img_request; 1624 return img_request;
2147} 1625}
2148 1626
@@ -2165,829 +1643,934 @@ static void rbd_img_request_destroy(struct kref *kref)
2165 rbd_dev_parent_put(img_request->rbd_dev); 1643 rbd_dev_parent_put(img_request->rbd_dev);
2166 } 1644 }
2167 1645
2168 if (img_request_write_test(img_request) || 1646 if (rbd_img_is_write(img_request))
2169 img_request_discard_test(img_request))
2170 ceph_put_snap_context(img_request->snapc); 1647 ceph_put_snap_context(img_request->snapc);
2171 1648
2172 kmem_cache_free(rbd_img_request_cache, img_request); 1649 kmem_cache_free(rbd_img_request_cache, img_request);
2173} 1650}
2174 1651
2175static struct rbd_img_request *rbd_parent_request_create( 1652static void prune_extents(struct ceph_file_extent *img_extents,
2176 struct rbd_obj_request *obj_request, 1653 u32 *num_img_extents, u64 overlap)
2177 u64 img_offset, u64 length)
2178{ 1654{
2179 struct rbd_img_request *parent_request; 1655 u32 cnt = *num_img_extents;
2180 struct rbd_device *rbd_dev;
2181 1656
2182 rbd_assert(obj_request->img_request); 1657 /* drop extents completely beyond the overlap */
2183 rbd_dev = obj_request->img_request->rbd_dev; 1658 while (cnt && img_extents[cnt - 1].fe_off >= overlap)
1659 cnt--;
2184 1660
2185 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset, 1661 if (cnt) {
2186 length, OBJ_OP_READ, NULL); 1662 struct ceph_file_extent *ex = &img_extents[cnt - 1];
2187 if (!parent_request)
2188 return NULL;
2189 1663
2190 img_request_child_set(parent_request); 1664 /* trim final overlapping extent */
2191 rbd_obj_request_get(obj_request); 1665 if (ex->fe_off + ex->fe_len > overlap)
2192 parent_request->obj_request = obj_request; 1666 ex->fe_len = overlap - ex->fe_off;
1667 }
2193 1668
2194 return parent_request; 1669 *num_img_extents = cnt;
2195} 1670}
2196 1671
2197static void rbd_parent_request_destroy(struct kref *kref) 1672/*
1673 * Determine the byte range(s) covered by either just the object extent
1674 * or the entire object in the parent image.
1675 */
1676static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
1677 bool entire)
2198{ 1678{
2199 struct rbd_img_request *parent_request; 1679 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2200 struct rbd_obj_request *orig_request; 1680 int ret;
2201 1681
2202 parent_request = container_of(kref, struct rbd_img_request, kref); 1682 if (!rbd_dev->parent_overlap)
2203 orig_request = parent_request->obj_request; 1683 return 0;
2204 1684
2205 parent_request->obj_request = NULL; 1685 ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2206 rbd_obj_request_put(orig_request); 1686 entire ? 0 : obj_req->ex.oe_off,
2207 img_request_child_clear(parent_request); 1687 entire ? rbd_dev->layout.object_size :
1688 obj_req->ex.oe_len,
1689 &obj_req->img_extents,
1690 &obj_req->num_img_extents);
1691 if (ret)
1692 return ret;
2208 1693
2209 rbd_img_request_destroy(kref); 1694 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
1695 rbd_dev->parent_overlap);
1696 return 0;
2210} 1697}
2211 1698
2212static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) 1699static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
2213{ 1700{
2214 struct rbd_img_request *img_request; 1701 switch (obj_req->img_request->data_type) {
2215 unsigned int xferred; 1702 case OBJ_REQUEST_BIO:
2216 int result; 1703 osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
2217 bool more; 1704 &obj_req->bio_pos,
2218 1705 obj_req->ex.oe_len);
2219 rbd_assert(obj_request_img_data_test(obj_request)); 1706 break;
2220 img_request = obj_request->img_request; 1707 case OBJ_REQUEST_BVECS:
2221 1708 case OBJ_REQUEST_OWN_BVECS:
2222 rbd_assert(obj_request->xferred <= (u64)UINT_MAX); 1709 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2223 xferred = (unsigned int)obj_request->xferred; 1710 obj_req->ex.oe_len);
2224 result = obj_request->result; 1711 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2225 if (result) { 1712 osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
2226 struct rbd_device *rbd_dev = img_request->rbd_dev; 1713 &obj_req->bvec_pos);
2227 enum obj_operation_type op_type; 1714 break;
2228 1715 default:
2229 if (img_request_discard_test(img_request)) 1716 rbd_assert(0);
2230 op_type = OBJ_OP_DISCARD;
2231 else if (img_request_write_test(img_request))
2232 op_type = OBJ_OP_WRITE;
2233 else
2234 op_type = OBJ_OP_READ;
2235
2236 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2237 obj_op_name(op_type), obj_request->length,
2238 obj_request->img_offset, obj_request->offset);
2239 rbd_warn(rbd_dev, " result %d xferred %x",
2240 result, xferred);
2241 if (!img_request->result)
2242 img_request->result = result;
2243 /*
2244 * Need to end I/O on the entire obj_request worth of
2245 * bytes in case of error.
2246 */
2247 xferred = obj_request->length;
2248 } 1717 }
1718}
2249 1719
2250 if (img_request_child_test(img_request)) { 1720static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
2251 rbd_assert(img_request->obj_request != NULL); 1721{
2252 more = obj_request->which < img_request->obj_request_count - 1; 1722 obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
2253 } else { 1723 if (!obj_req->osd_req)
2254 blk_status_t status = errno_to_blk_status(result); 1724 return -ENOMEM;
2255 1725
2256 rbd_assert(img_request->rq != NULL); 1726 osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
1727 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
1728 rbd_osd_req_setup_data(obj_req, 0);
2257 1729
2258 more = blk_update_request(img_request->rq, status, xferred); 1730 rbd_osd_req_format_read(obj_req);
2259 if (!more) 1731 return 0;
2260 __blk_mq_end_request(img_request->rq, status); 1732}
2261 } 1733
1734static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
1735 unsigned int which)
1736{
1737 struct page **pages;
2262 1738
2263 return more; 1739 /*
1740 * The response data for a STAT call consists of:
1741 * le64 length;
1742 * struct {
1743 * le32 tv_sec;
1744 * le32 tv_nsec;
1745 * } mtime;
1746 */
1747 pages = ceph_alloc_page_vector(1, GFP_NOIO);
1748 if (IS_ERR(pages))
1749 return PTR_ERR(pages);
1750
1751 osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
1752 osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
1753 8 + sizeof(struct ceph_timespec),
1754 0, false, true);
1755 return 0;
2264} 1756}
2265 1757
2266static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) 1758static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
1759 unsigned int which)
2267{ 1760{
2268 struct rbd_img_request *img_request; 1761 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2269 u32 which = obj_request->which; 1762 u16 opcode;
2270 bool more = true;
2271 1763
2272 rbd_assert(obj_request_img_data_test(obj_request)); 1764 osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
2273 img_request = obj_request->img_request; 1765 rbd_dev->layout.object_size,
1766 rbd_dev->layout.object_size);
2274 1767
2275 dout("%s: img %p obj %p\n", __func__, img_request, obj_request); 1768 if (rbd_obj_is_entire(obj_req))
2276 rbd_assert(img_request != NULL); 1769 opcode = CEPH_OSD_OP_WRITEFULL;
2277 rbd_assert(img_request->obj_request_count > 0); 1770 else
2278 rbd_assert(which != BAD_WHICH); 1771 opcode = CEPH_OSD_OP_WRITE;
2279 rbd_assert(which < img_request->obj_request_count);
2280 1772
2281 spin_lock_irq(&img_request->completion_lock); 1773 osd_req_op_extent_init(obj_req->osd_req, which, opcode,
2282 if (which != img_request->next_completion) 1774 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2283 goto out; 1775 rbd_osd_req_setup_data(obj_req, which++);
1776
1777 rbd_assert(which == obj_req->osd_req->r_num_ops);
1778 rbd_osd_req_format_write(obj_req);
1779}
2284 1780
2285 for_each_obj_request_from(img_request, obj_request) { 1781static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
2286 rbd_assert(more); 1782{
2287 rbd_assert(which < img_request->obj_request_count); 1783 unsigned int num_osd_ops, which = 0;
1784 int ret;
2288 1785
2289 if (!obj_request_done_test(obj_request)) 1786 /* reverse map the entire object onto the parent */
2290 break; 1787 ret = rbd_obj_calc_img_extents(obj_req, true);
2291 more = rbd_img_obj_end_request(obj_request); 1788 if (ret)
2292 which++; 1789 return ret;
1790
1791 if (obj_req->num_img_extents) {
1792 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1793 num_osd_ops = 3; /* stat + setallochint + write/writefull */
1794 } else {
1795 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1796 num_osd_ops = 2; /* setallochint + write/writefull */
2293 } 1797 }
2294 1798
2295 rbd_assert(more ^ (which == img_request->obj_request_count)); 1799 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
2296 img_request->next_completion = which; 1800 if (!obj_req->osd_req)
2297out: 1801 return -ENOMEM;
2298 spin_unlock_irq(&img_request->completion_lock);
2299 rbd_img_request_put(img_request);
2300 1802
2301 if (!more) 1803 if (obj_req->num_img_extents) {
2302 rbd_img_request_complete(img_request); 1804 ret = __rbd_obj_setup_stat(obj_req, which++);
1805 if (ret)
1806 return ret;
1807 }
1808
1809 __rbd_obj_setup_write(obj_req, which);
1810 return 0;
2303} 1811}
2304 1812
2305/* 1813static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
2306 * Add individual osd ops to the given ceph_osd_request and prepare 1814 unsigned int which)
2307 * them for submission. num_ops is the current number of 1815{
2308 * osd operations already to the object request.
2309 */
2310static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2311 struct ceph_osd_request *osd_request,
2312 enum obj_operation_type op_type,
2313 unsigned int num_ops)
2314{
2315 struct rbd_img_request *img_request = obj_request->img_request;
2316 struct rbd_device *rbd_dev = img_request->rbd_dev;
2317 u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2318 u64 offset = obj_request->offset;
2319 u64 length = obj_request->length;
2320 u64 img_end;
2321 u16 opcode; 1816 u16 opcode;
2322 1817
2323 if (op_type == OBJ_OP_DISCARD) { 1818 if (rbd_obj_is_entire(obj_req)) {
2324 if (!offset && length == object_size && 1819 if (obj_req->num_img_extents) {
2325 (!img_request_layered_test(img_request) || 1820 osd_req_op_init(obj_req->osd_req, which++,
2326 !obj_request_overlaps_parent(obj_request))) { 1821 CEPH_OSD_OP_CREATE, 0);
2327 opcode = CEPH_OSD_OP_DELETE;
2328 } else if ((offset + length == object_size)) {
2329 opcode = CEPH_OSD_OP_TRUNCATE; 1822 opcode = CEPH_OSD_OP_TRUNCATE;
2330 } else { 1823 } else {
2331 down_read(&rbd_dev->header_rwsem); 1824 osd_req_op_init(obj_req->osd_req, which++,
2332 img_end = rbd_dev->header.image_size; 1825 CEPH_OSD_OP_DELETE, 0);
2333 up_read(&rbd_dev->header_rwsem); 1826 opcode = 0;
2334
2335 if (obj_request->img_offset + length == img_end)
2336 opcode = CEPH_OSD_OP_TRUNCATE;
2337 else
2338 opcode = CEPH_OSD_OP_ZERO;
2339 } 1827 }
2340 } else if (op_type == OBJ_OP_WRITE) { 1828 } else if (rbd_obj_is_tail(obj_req)) {
2341 if (!offset && length == object_size) 1829 opcode = CEPH_OSD_OP_TRUNCATE;
2342 opcode = CEPH_OSD_OP_WRITEFULL;
2343 else
2344 opcode = CEPH_OSD_OP_WRITE;
2345 osd_req_op_alloc_hint_init(osd_request, num_ops,
2346 object_size, object_size);
2347 num_ops++;
2348 } else { 1830 } else {
2349 opcode = CEPH_OSD_OP_READ; 1831 opcode = CEPH_OSD_OP_ZERO;
2350 } 1832 }
2351 1833
2352 if (opcode == CEPH_OSD_OP_DELETE) 1834 if (opcode)
2353 osd_req_op_init(osd_request, num_ops, opcode, 0); 1835 osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
2354 else 1836 obj_req->ex.oe_off, obj_req->ex.oe_len,
2355 osd_req_op_extent_init(osd_request, num_ops, opcode, 1837 0, 0);
2356 offset, length, 0, 0); 1838
2357 1839 rbd_assert(which == obj_req->osd_req->r_num_ops);
2358 if (obj_request->type == OBJ_REQUEST_BIO) 1840 rbd_osd_req_format_write(obj_req);
2359 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2360 obj_request->bio_list, length);
2361 else if (obj_request->type == OBJ_REQUEST_PAGES)
2362 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2363 obj_request->pages, length,
2364 offset & ~PAGE_MASK, false, false);
2365
2366 /* Discards are also writes */
2367 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2368 rbd_osd_req_format_write(obj_request);
2369 else
2370 rbd_osd_req_format_read(obj_request);
2371} 1841}
2372 1842
2373/* 1843static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
2374 * Split up an image request into one or more object requests, each
2375 * to a different object. The "type" parameter indicates whether
2376 * "data_desc" is the pointer to the head of a list of bio
2377 * structures, or the base of a page array. In either case this
2378 * function assumes data_desc describes memory sufficient to hold
2379 * all data described by the image request.
2380 */
2381static int rbd_img_request_fill(struct rbd_img_request *img_request,
2382 enum obj_request_type type,
2383 void *data_desc)
2384{ 1844{
2385 struct rbd_device *rbd_dev = img_request->rbd_dev; 1845 unsigned int num_osd_ops, which = 0;
2386 struct rbd_obj_request *obj_request = NULL; 1846 int ret;
2387 struct rbd_obj_request *next_obj_request;
2388 struct bio *bio_list = NULL;
2389 unsigned int bio_offset = 0;
2390 struct page **pages = NULL;
2391 enum obj_operation_type op_type;
2392 u64 img_offset;
2393 u64 resid;
2394
2395 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2396 (int)type, data_desc);
2397 1847
2398 img_offset = img_request->offset; 1848 /* reverse map the entire object onto the parent */
2399 resid = img_request->length; 1849 ret = rbd_obj_calc_img_extents(obj_req, true);
2400 rbd_assert(resid > 0); 1850 if (ret)
2401 op_type = rbd_img_request_op_type(img_request); 1851 return ret;
2402 1852
2403 if (type == OBJ_REQUEST_BIO) { 1853 if (rbd_obj_is_entire(obj_req)) {
2404 bio_list = data_desc; 1854 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2405 rbd_assert(img_offset == 1855 if (obj_req->num_img_extents)
2406 bio_list->bi_iter.bi_sector << SECTOR_SHIFT); 1856 num_osd_ops = 2; /* create + truncate */
2407 } else if (type == OBJ_REQUEST_PAGES) { 1857 else
2408 pages = data_desc; 1858 num_osd_ops = 1; /* delete */
1859 } else {
1860 if (obj_req->num_img_extents) {
1861 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1862 num_osd_ops = 2; /* stat + truncate/zero */
1863 } else {
1864 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1865 num_osd_ops = 1; /* truncate/zero */
1866 }
2409 } 1867 }
2410 1868
2411 while (resid) { 1869 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
2412 struct ceph_osd_request *osd_req; 1870 if (!obj_req->osd_req)
2413 u64 object_no = img_offset >> rbd_dev->header.obj_order; 1871 return -ENOMEM;
2414 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2415 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
2416
2417 obj_request = rbd_obj_request_create(type);
2418 if (!obj_request)
2419 goto out_unwind;
2420
2421 obj_request->object_no = object_no;
2422 obj_request->offset = offset;
2423 obj_request->length = length;
2424
2425 /*
2426 * set obj_request->img_request before creating the
2427 * osd_request so that it gets the right snapc
2428 */
2429 rbd_img_obj_request_add(img_request, obj_request);
2430
2431 if (type == OBJ_REQUEST_BIO) {
2432 unsigned int clone_size;
2433
2434 rbd_assert(length <= (u64)UINT_MAX);
2435 clone_size = (unsigned int)length;
2436 obj_request->bio_list =
2437 bio_chain_clone_range(&bio_list,
2438 &bio_offset,
2439 clone_size,
2440 GFP_NOIO);
2441 if (!obj_request->bio_list)
2442 goto out_unwind;
2443 } else if (type == OBJ_REQUEST_PAGES) {
2444 unsigned int page_count;
2445
2446 obj_request->pages = pages;
2447 page_count = (u32)calc_pages_for(offset, length);
2448 obj_request->page_count = page_count;
2449 if ((offset + length) & ~PAGE_MASK)
2450 page_count--; /* more on last page */
2451 pages += page_count;
2452 }
2453 1872
2454 osd_req = rbd_osd_req_create(rbd_dev, op_type, 1873 if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
2455 (op_type == OBJ_OP_WRITE) ? 2 : 1, 1874 ret = __rbd_obj_setup_stat(obj_req, which++);
2456 obj_request); 1875 if (ret)
2457 if (!osd_req) 1876 return ret;
2458 goto out_unwind; 1877 }
2459 1878
2460 obj_request->osd_req = osd_req; 1879 __rbd_obj_setup_discard(obj_req, which);
2461 obj_request->callback = rbd_img_obj_callback; 1880 return 0;
2462 obj_request->img_offset = img_offset; 1881}
2463 1882
2464 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0); 1883/*
1884 * For each object request in @img_req, allocate an OSD request, add
1885 * individual OSD ops and prepare them for submission. The number of
1886 * OSD ops depends on op_type and the overlap point (if any).
1887 */
1888static int __rbd_img_fill_request(struct rbd_img_request *img_req)
1889{
1890 struct rbd_obj_request *obj_req;
1891 int ret;
2465 1892
2466 img_offset += length; 1893 for_each_obj_request(img_req, obj_req) {
2467 resid -= length; 1894 switch (img_req->op_type) {
1895 case OBJ_OP_READ:
1896 ret = rbd_obj_setup_read(obj_req);
1897 break;
1898 case OBJ_OP_WRITE:
1899 ret = rbd_obj_setup_write(obj_req);
1900 break;
1901 case OBJ_OP_DISCARD:
1902 ret = rbd_obj_setup_discard(obj_req);
1903 break;
1904 default:
1905 rbd_assert(0);
1906 }
1907 if (ret)
1908 return ret;
2468 } 1909 }
2469 1910
2470 return 0; 1911 return 0;
1912}
2471 1913
2472out_unwind: 1914union rbd_img_fill_iter {
2473 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 1915 struct ceph_bio_iter bio_iter;
2474 rbd_img_obj_request_del(img_request, obj_request); 1916 struct ceph_bvec_iter bvec_iter;
1917};
2475 1918
2476 return -ENOMEM; 1919struct rbd_img_fill_ctx {
2477} 1920 enum obj_request_type pos_type;
1921 union rbd_img_fill_iter *pos;
1922 union rbd_img_fill_iter iter;
1923 ceph_object_extent_fn_t set_pos_fn;
1924 ceph_object_extent_fn_t count_fn;
1925 ceph_object_extent_fn_t copy_fn;
1926};
2478 1927
2479static void 1928static struct ceph_object_extent *alloc_object_extent(void *arg)
2480rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2481{ 1929{
2482 struct rbd_img_request *img_request; 1930 struct rbd_img_request *img_req = arg;
2483 struct rbd_device *rbd_dev; 1931 struct rbd_obj_request *obj_req;
2484 struct page **pages;
2485 u32 page_count;
2486 1932
2487 dout("%s: obj %p\n", __func__, obj_request); 1933 obj_req = rbd_obj_request_create();
1934 if (!obj_req)
1935 return NULL;
2488 1936
2489 rbd_assert(obj_request->type == OBJ_REQUEST_BIO || 1937 rbd_img_obj_request_add(img_req, obj_req);
2490 obj_request->type == OBJ_REQUEST_NODATA); 1938 return &obj_req->ex;
2491 rbd_assert(obj_request_img_data_test(obj_request)); 1939}
2492 img_request = obj_request->img_request;
2493 rbd_assert(img_request);
2494 1940
2495 rbd_dev = img_request->rbd_dev; 1941/*
2496 rbd_assert(rbd_dev); 1942 * While su != os && sc == 1 is technically not fancy (it's the same
1943 * layout as su == os && sc == 1), we can't use the nocopy path for it
1944 * because ->set_pos_fn() should be called only once per object.
1945 * ceph_file_to_extents() invokes action_fn once per stripe unit, so
1946 * treat su != os && sc == 1 as fancy.
1947 */
1948static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
1949{
1950 return l->stripe_unit != l->object_size;
1951}
2497 1952
2498 pages = obj_request->copyup_pages; 1953static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2499 rbd_assert(pages != NULL); 1954 struct ceph_file_extent *img_extents,
2500 obj_request->copyup_pages = NULL; 1955 u32 num_img_extents,
2501 page_count = obj_request->copyup_page_count; 1956 struct rbd_img_fill_ctx *fctx)
2502 rbd_assert(page_count); 1957{
2503 obj_request->copyup_page_count = 0; 1958 u32 i;
2504 ceph_release_page_vector(pages, page_count); 1959 int ret;
1960
1961 img_req->data_type = fctx->pos_type;
2505 1962
2506 /* 1963 /*
2507 * We want the transfer count to reflect the size of the 1964 * Create object requests and set each object request's starting
2508 * original write request. There is no such thing as a 1965 * position in the provided bio (list) or bio_vec array.
2509 * successful short write, so if the request was successful
2510 * we can just set it to the originally-requested length.
2511 */ 1966 */
2512 if (!obj_request->result) 1967 fctx->iter = *fctx->pos;
2513 obj_request->xferred = obj_request->length; 1968 for (i = 0; i < num_img_extents; i++) {
1969 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
1970 img_extents[i].fe_off,
1971 img_extents[i].fe_len,
1972 &img_req->object_extents,
1973 alloc_object_extent, img_req,
1974 fctx->set_pos_fn, &fctx->iter);
1975 if (ret)
1976 return ret;
1977 }
2514 1978
2515 obj_request_done_set(obj_request); 1979 return __rbd_img_fill_request(img_req);
2516} 1980}
2517 1981
2518static void 1982/*
2519rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) 1983 * Map a list of image extents to a list of object extents, create the
1984 * corresponding object requests (normally each to a different object,
1985 * but not always) and add them to @img_req. For each object request,
1986 * set up its data descriptor to point to the corresponding chunk(s) of
1987 * @fctx->pos data buffer.
1988 *
1989 * Because ceph_file_to_extents() will merge adjacent object extents
1990 * together, each object request's data descriptor may point to multiple
1991 * different chunks of @fctx->pos data buffer.
1992 *
1993 * @fctx->pos data buffer is assumed to be large enough.
1994 */
1995static int rbd_img_fill_request(struct rbd_img_request *img_req,
1996 struct ceph_file_extent *img_extents,
1997 u32 num_img_extents,
1998 struct rbd_img_fill_ctx *fctx)
2520{ 1999{
2521 struct rbd_obj_request *orig_request; 2000 struct rbd_device *rbd_dev = img_req->rbd_dev;
2522 struct ceph_osd_request *osd_req; 2001 struct rbd_obj_request *obj_req;
2523 struct rbd_device *rbd_dev; 2002 u32 i;
2524 struct page **pages; 2003 int ret;
2525 enum obj_operation_type op_type;
2526 u32 page_count;
2527 int img_result;
2528 u64 parent_length;
2529
2530 rbd_assert(img_request_child_test(img_request));
2531
2532 /* First get what we need from the image request */
2533
2534 pages = img_request->copyup_pages;
2535 rbd_assert(pages != NULL);
2536 img_request->copyup_pages = NULL;
2537 page_count = img_request->copyup_page_count;
2538 rbd_assert(page_count);
2539 img_request->copyup_page_count = 0;
2540
2541 orig_request = img_request->obj_request;
2542 rbd_assert(orig_request != NULL);
2543 rbd_assert(obj_request_type_valid(orig_request->type));
2544 img_result = img_request->result;
2545 parent_length = img_request->length;
2546 rbd_assert(img_result || parent_length == img_request->xferred);
2547 rbd_img_request_put(img_request);
2548 2004
2549 rbd_assert(orig_request->img_request); 2005 if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2550 rbd_dev = orig_request->img_request->rbd_dev; 2006 !rbd_layout_is_fancy(&rbd_dev->layout))
2551 rbd_assert(rbd_dev); 2007 return rbd_img_fill_request_nocopy(img_req, img_extents,
2008 num_img_extents, fctx);
2009
2010 img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2552 2011
2553 /* 2012 /*
2554 * If the overlap has become 0 (most likely because the 2013 * Create object requests and determine ->bvec_count for each object
2555 * image has been flattened) we need to free the pages 2014 * request. Note that ->bvec_count sum over all object requests may
2556 * and re-submit the original write request. 2015 * be greater than the number of bio_vecs in the provided bio (list)
2016 * or bio_vec array because when mapped, those bio_vecs can straddle
2017 * stripe unit boundaries.
2557 */ 2018 */
2558 if (!rbd_dev->parent_overlap) { 2019 fctx->iter = *fctx->pos;
2559 ceph_release_page_vector(pages, page_count); 2020 for (i = 0; i < num_img_extents; i++) {
2560 rbd_obj_request_submit(orig_request); 2021 ret = ceph_file_to_extents(&rbd_dev->layout,
2561 return; 2022 img_extents[i].fe_off,
2023 img_extents[i].fe_len,
2024 &img_req->object_extents,
2025 alloc_object_extent, img_req,
2026 fctx->count_fn, &fctx->iter);
2027 if (ret)
2028 return ret;
2562 } 2029 }
2563 2030
2564 if (img_result) 2031 for_each_obj_request(img_req, obj_req) {
2565 goto out_err; 2032 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2033 sizeof(*obj_req->bvec_pos.bvecs),
2034 GFP_NOIO);
2035 if (!obj_req->bvec_pos.bvecs)
2036 return -ENOMEM;
2037 }
2566 2038
2567 /* 2039 /*
2568 * The original osd request is of no use to use any more. 2040 * Fill in each object request's private bio_vec array, splitting and
2569 * We need a new one that can hold the three ops in a copyup 2041 * rearranging the provided bio_vecs in stripe unit chunks as needed.
2570 * request. Allocate the new copyup osd request for the
2571 * original request, and release the old one.
2572 */ 2042 */
2573 img_result = -ENOMEM; 2043 fctx->iter = *fctx->pos;
2574 osd_req = rbd_osd_req_create_copyup(orig_request); 2044 for (i = 0; i < num_img_extents; i++) {
2575 if (!osd_req) 2045 ret = ceph_iterate_extents(&rbd_dev->layout,
2576 goto out_err; 2046 img_extents[i].fe_off,
2577 rbd_osd_req_destroy(orig_request->osd_req); 2047 img_extents[i].fe_len,
2578 orig_request->osd_req = osd_req; 2048 &img_req->object_extents,
2579 orig_request->copyup_pages = pages; 2049 fctx->copy_fn, &fctx->iter);
2580 orig_request->copyup_page_count = page_count; 2050 if (ret)
2051 return ret;
2052 }
2581 2053
2582 /* Initialize the copyup op */ 2054 return __rbd_img_fill_request(img_req);
2055}
2583 2056
2584 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup"); 2057static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2585 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0, 2058 u64 off, u64 len)
2586 false, false); 2059{
2060 struct ceph_file_extent ex = { off, len };
2061 union rbd_img_fill_iter dummy;
2062 struct rbd_img_fill_ctx fctx = {
2063 .pos_type = OBJ_REQUEST_NODATA,
2064 .pos = &dummy,
2065 };
2587 2066
2588 /* Add the other op(s) */ 2067 return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2068}
2589 2069
2590 op_type = rbd_img_request_op_type(orig_request->img_request); 2070static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2591 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1); 2071{
2072 struct rbd_obj_request *obj_req =
2073 container_of(ex, struct rbd_obj_request, ex);
2074 struct ceph_bio_iter *it = arg;
2592 2075
2593 /* All set, send it off. */ 2076 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2077 obj_req->bio_pos = *it;
2078 ceph_bio_iter_advance(it, bytes);
2079}
2594 2080
2595 rbd_obj_request_submit(orig_request); 2081static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2596 return; 2082{
2083 struct rbd_obj_request *obj_req =
2084 container_of(ex, struct rbd_obj_request, ex);
2085 struct ceph_bio_iter *it = arg;
2086
2087 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2088 ceph_bio_iter_advance_step(it, bytes, ({
2089 obj_req->bvec_count++;
2090 }));
2597 2091
2598out_err:
2599 ceph_release_page_vector(pages, page_count);
2600 rbd_obj_request_error(orig_request, img_result);
2601} 2092}
2602 2093
2603/* 2094static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2604 * Read from the parent image the range of data that covers the 2095{
2605 * entire target of the given object request. This is used for 2096 struct rbd_obj_request *obj_req =
2606 * satisfying a layered image write request when the target of an 2097 container_of(ex, struct rbd_obj_request, ex);
2607 * object request from the image request does not exist. 2098 struct ceph_bio_iter *it = arg;
2608 *
2609 * A page array big enough to hold the returned data is allocated
2610 * and supplied to rbd_img_request_fill() as the "data descriptor."
2611 * When the read completes, this page array will be transferred to
2612 * the original object request for the copyup operation.
2613 *
2614 * If an error occurs, it is recorded as the result of the original
2615 * object request in rbd_img_obj_exists_callback().
2616 */
2617static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2618{
2619 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2620 struct rbd_img_request *parent_request = NULL;
2621 u64 img_offset;
2622 u64 length;
2623 struct page **pages = NULL;
2624 u32 page_count;
2625 int result;
2626 2099
2627 rbd_assert(rbd_dev->parent != NULL); 2100 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2101 ceph_bio_iter_advance_step(it, bytes, ({
2102 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2103 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2104 }));
2105}
2628 2106
2629 /* 2107static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2630 * Determine the byte range covered by the object in the 2108 struct ceph_file_extent *img_extents,
2631 * child image to which the original request was to be sent. 2109 u32 num_img_extents,
2632 */ 2110 struct ceph_bio_iter *bio_pos)
2633 img_offset = obj_request->img_offset - obj_request->offset; 2111{
2634 length = rbd_obj_bytes(&rbd_dev->header); 2112 struct rbd_img_fill_ctx fctx = {
2113 .pos_type = OBJ_REQUEST_BIO,
2114 .pos = (union rbd_img_fill_iter *)bio_pos,
2115 .set_pos_fn = set_bio_pos,
2116 .count_fn = count_bio_bvecs,
2117 .copy_fn = copy_bio_bvecs,
2118 };
2635 2119
2636 /* 2120 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2637 * There is no defined parent data beyond the parent 2121 &fctx);
2638 * overlap, so limit what we read at that boundary if 2122}
2639 * necessary.
2640 */
2641 if (img_offset + length > rbd_dev->parent_overlap) {
2642 rbd_assert(img_offset < rbd_dev->parent_overlap);
2643 length = rbd_dev->parent_overlap - img_offset;
2644 }
2645 2123
2646 /* 2124static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2647 * Allocate a page array big enough to receive the data read 2125 u64 off, u64 len, struct bio *bio)
2648 * from the parent. 2126{
2649 */ 2127 struct ceph_file_extent ex = { off, len };
2650 page_count = (u32)calc_pages_for(0, length); 2128 struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2651 pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2652 if (IS_ERR(pages)) {
2653 result = PTR_ERR(pages);
2654 pages = NULL;
2655 goto out_err;
2656 }
2657 2129
2658 result = -ENOMEM; 2130 return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2659 parent_request = rbd_parent_request_create(obj_request, 2131}
2660 img_offset, length);
2661 if (!parent_request)
2662 goto out_err;
2663 2132
2664 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); 2133static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2665 if (result) 2134{
2666 goto out_err; 2135 struct rbd_obj_request *obj_req =
2136 container_of(ex, struct rbd_obj_request, ex);
2137 struct ceph_bvec_iter *it = arg;
2667 2138
2668 parent_request->copyup_pages = pages; 2139 obj_req->bvec_pos = *it;
2669 parent_request->copyup_page_count = page_count; 2140 ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2670 parent_request->callback = rbd_img_obj_parent_read_full_callback; 2141 ceph_bvec_iter_advance(it, bytes);
2142}
2671 2143
2672 result = rbd_img_request_submit(parent_request); 2144static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2673 if (!result) 2145{
2674 return 0; 2146 struct rbd_obj_request *obj_req =
2147 container_of(ex, struct rbd_obj_request, ex);
2148 struct ceph_bvec_iter *it = arg;
2675 2149
2676 parent_request->copyup_pages = NULL; 2150 ceph_bvec_iter_advance_step(it, bytes, ({
2677 parent_request->copyup_page_count = 0; 2151 obj_req->bvec_count++;
2678out_err: 2152 }));
2679 if (pages)
2680 ceph_release_page_vector(pages, page_count);
2681 if (parent_request)
2682 rbd_img_request_put(parent_request);
2683 return result;
2684} 2153}
2685 2154
2686static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) 2155static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2687{ 2156{
2688 struct rbd_obj_request *orig_request; 2157 struct rbd_obj_request *obj_req =
2689 struct rbd_device *rbd_dev; 2158 container_of(ex, struct rbd_obj_request, ex);
2690 int result; 2159 struct ceph_bvec_iter *it = arg;
2691 2160
2692 rbd_assert(!obj_request_img_data_test(obj_request)); 2161 ceph_bvec_iter_advance_step(it, bytes, ({
2162 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2163 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2164 }));
2165}
2693 2166
2694 /* 2167static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2695 * All we need from the object request is the original 2168 struct ceph_file_extent *img_extents,
2696 * request and the result of the STAT op. Grab those, then 2169 u32 num_img_extents,
2697 * we're done with the request. 2170 struct ceph_bvec_iter *bvec_pos)
2698 */ 2171{
2699 orig_request = obj_request->obj_request; 2172 struct rbd_img_fill_ctx fctx = {
2700 obj_request->obj_request = NULL; 2173 .pos_type = OBJ_REQUEST_BVECS,
2701 rbd_obj_request_put(orig_request); 2174 .pos = (union rbd_img_fill_iter *)bvec_pos,
2702 rbd_assert(orig_request); 2175 .set_pos_fn = set_bvec_pos,
2703 rbd_assert(orig_request->img_request); 2176 .count_fn = count_bvecs,
2704 2177 .copy_fn = copy_bvecs,
2705 result = obj_request->result; 2178 };
2706 obj_request->result = 0;
2707
2708 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2709 obj_request, orig_request, result,
2710 obj_request->xferred, obj_request->length);
2711 rbd_obj_request_put(obj_request);
2712 2179
2713 /* 2180 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2714 * If the overlap has become 0 (most likely because the 2181 &fctx);
2715 * image has been flattened) we need to re-submit the 2182}
2716 * original request.
2717 */
2718 rbd_dev = orig_request->img_request->rbd_dev;
2719 if (!rbd_dev->parent_overlap) {
2720 rbd_obj_request_submit(orig_request);
2721 return;
2722 }
2723 2183
2724 /* 2184static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2725 * Our only purpose here is to determine whether the object 2185 struct ceph_file_extent *img_extents,
2726 * exists, and we don't want to treat the non-existence as 2186 u32 num_img_extents,
2727 * an error. If something else comes back, transfer the 2187 struct bio_vec *bvecs)
2728 * error to the original request and complete it now. 2188{
2729 */ 2189 struct ceph_bvec_iter it = {
2730 if (!result) { 2190 .bvecs = bvecs,
2731 obj_request_existence_set(orig_request, true); 2191 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2732 } else if (result == -ENOENT) { 2192 num_img_extents) },
2733 obj_request_existence_set(orig_request, false); 2193 };
2734 } else {
2735 goto fail_orig_request;
2736 }
2737 2194
2738 /* 2195 return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2739 * Resubmit the original request now that we have recorded 2196 &it);
2740 * whether the target object exists. 2197}
2741 */
2742 result = rbd_img_obj_request_submit(orig_request);
2743 if (result)
2744 goto fail_orig_request;
2745 2198
2746 return; 2199static void rbd_img_request_submit(struct rbd_img_request *img_request)
2200{
2201 struct rbd_obj_request *obj_request;
2202
2203 dout("%s: img %p\n", __func__, img_request);
2204
2205 rbd_img_request_get(img_request);
2206 for_each_obj_request(img_request, obj_request)
2207 rbd_obj_request_submit(obj_request);
2747 2208
2748fail_orig_request: 2209 rbd_img_request_put(img_request);
2749 rbd_obj_request_error(orig_request, result);
2750} 2210}
2751 2211
2752static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) 2212static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2753{ 2213{
2754 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; 2214 struct rbd_img_request *img_req = obj_req->img_request;
2755 struct rbd_obj_request *stat_request; 2215 struct rbd_img_request *child_img_req;
2756 struct page **pages;
2757 u32 page_count;
2758 size_t size;
2759 int ret; 2216 int ret;
2760 2217
2761 stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES); 2218 child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2762 if (!stat_request) 2219 OBJ_OP_READ, NULL);
2220 if (!child_img_req)
2763 return -ENOMEM; 2221 return -ENOMEM;
2764 2222
2765 stat_request->object_no = obj_request->object_no; 2223 __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2224 child_img_req->obj_request = obj_req;
2766 2225
2767 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, 2226 if (!rbd_img_is_write(img_req)) {
2768 stat_request); 2227 switch (img_req->data_type) {
2769 if (!stat_request->osd_req) { 2228 case OBJ_REQUEST_BIO:
2770 ret = -ENOMEM; 2229 ret = __rbd_img_fill_from_bio(child_img_req,
2771 goto fail_stat_request; 2230 obj_req->img_extents,
2231 obj_req->num_img_extents,
2232 &obj_req->bio_pos);
2233 break;
2234 case OBJ_REQUEST_BVECS:
2235 case OBJ_REQUEST_OWN_BVECS:
2236 ret = __rbd_img_fill_from_bvecs(child_img_req,
2237 obj_req->img_extents,
2238 obj_req->num_img_extents,
2239 &obj_req->bvec_pos);
2240 break;
2241 default:
2242 rbd_assert(0);
2243 }
2244 } else {
2245 ret = rbd_img_fill_from_bvecs(child_img_req,
2246 obj_req->img_extents,
2247 obj_req->num_img_extents,
2248 obj_req->copyup_bvecs);
2249 }
2250 if (ret) {
2251 rbd_img_request_put(child_img_req);
2252 return ret;
2253 }
2254
2255 rbd_img_request_submit(child_img_req);
2256 return 0;
2257}
2258
2259static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
2260{
2261 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2262 int ret;
2263
2264 if (obj_req->result == -ENOENT &&
2265 rbd_dev->parent_overlap && !obj_req->tried_parent) {
2266 /* reverse map this object extent onto the parent */
2267 ret = rbd_obj_calc_img_extents(obj_req, false);
2268 if (ret) {
2269 obj_req->result = ret;
2270 return true;
2271 }
2272
2273 if (obj_req->num_img_extents) {
2274 obj_req->tried_parent = true;
2275 ret = rbd_obj_read_from_parent(obj_req);
2276 if (ret) {
2277 obj_req->result = ret;
2278 return true;
2279 }
2280 return false;
2281 }
2772 } 2282 }
2773 2283
2774 /* 2284 /*
2775 * The response data for a STAT call consists of: 2285 * -ENOENT means a hole in the image -- zero-fill the entire
2776 * le64 length; 2286 * length of the request. A short read also implies zero-fill
2777 * struct { 2287 * to the end of the request. In both cases we update xferred
2778 * le32 tv_sec; 2288 * count to indicate the whole request was satisfied.
2779 * le32 tv_nsec;
2780 * } mtime;
2781 */ 2289 */
2782 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32); 2290 if (obj_req->result == -ENOENT ||
2783 page_count = (u32)calc_pages_for(0, size); 2291 (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) {
2784 pages = ceph_alloc_page_vector(page_count, GFP_NOIO); 2292 rbd_assert(!obj_req->xferred || !obj_req->result);
2785 if (IS_ERR(pages)) { 2293 rbd_obj_zero_range(obj_req, obj_req->xferred,
2786 ret = PTR_ERR(pages); 2294 obj_req->ex.oe_len - obj_req->xferred);
2787 goto fail_stat_request; 2295 obj_req->result = 0;
2296 obj_req->xferred = obj_req->ex.oe_len;
2788 } 2297 }
2789 2298
2790 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0); 2299 return true;
2791 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, 2300}
2792 false, false);
2793
2794 rbd_obj_request_get(obj_request);
2795 stat_request->obj_request = obj_request;
2796 stat_request->pages = pages;
2797 stat_request->page_count = page_count;
2798 stat_request->callback = rbd_img_obj_exists_callback;
2799 2301
2800 rbd_obj_request_submit(stat_request); 2302/*
2801 return 0; 2303 * copyup_bvecs pages are never highmem pages
2304 */
2305static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2306{
2307 struct ceph_bvec_iter it = {
2308 .bvecs = bvecs,
2309 .iter = { .bi_size = bytes },
2310 };
2802 2311
2803fail_stat_request: 2312 ceph_bvec_iter_advance_step(&it, bytes, ({
2804 rbd_obj_request_put(stat_request); 2313 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
2805 return ret; 2314 bv.bv_len))
2315 return false;
2316 }));
2317 return true;
2806} 2318}
2807 2319
2808static bool img_obj_request_simple(struct rbd_obj_request *obj_request) 2320static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
2809{ 2321{
2810 struct rbd_img_request *img_request = obj_request->img_request; 2322 unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
2811 struct rbd_device *rbd_dev = img_request->rbd_dev;
2812 2323
2813 /* Reads */ 2324 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
2814 if (!img_request_write_test(img_request) && 2325 rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
2815 !img_request_discard_test(img_request)) 2326 rbd_osd_req_destroy(obj_req->osd_req);
2816 return true;
2817
2818 /* Non-layered writes */
2819 if (!img_request_layered_test(img_request))
2820 return true;
2821 2327
2822 /* 2328 /*
2823 * Layered writes outside of the parent overlap range don't 2329 * Create a copyup request with the same number of OSD ops as
2824 * share any data with the parent. 2330 * the original request. The original request was stat + op(s),
2331 * the new copyup request will be copyup + the same op(s).
2825 */ 2332 */
2826 if (!obj_request_overlaps_parent(obj_request)) 2333 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
2827 return true; 2334 if (!obj_req->osd_req)
2335 return -ENOMEM;
2828 2336
2829 /* 2337 /*
2830 * Entire-object layered writes - we will overwrite whatever 2338 * Only send non-zero copyup data to save some I/O and network
2831 * parent data there is anyway. 2339 * bandwidth -- zero copyup data is equivalent to the object not
2340 * existing.
2832 */ 2341 */
2833 if (!obj_request->offset && 2342 if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
2834 obj_request->length == rbd_obj_bytes(&rbd_dev->header)) 2343 dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
2835 return true; 2344 bytes = 0;
2345 }
2836 2346
2837 /* 2347 osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
2838 * If the object is known to already exist, its parent data has 2348 "copyup");
2839 * already been copied. 2349 osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
2840 */ 2350 obj_req->copyup_bvecs, bytes);
2841 if (obj_request_known_test(obj_request) && 2351
2842 obj_request_exists_test(obj_request)) 2352 switch (obj_req->img_request->op_type) {
2843 return true; 2353 case OBJ_OP_WRITE:
2354 __rbd_obj_setup_write(obj_req, 1);
2355 break;
2356 case OBJ_OP_DISCARD:
2357 rbd_assert(!rbd_obj_is_entire(obj_req));
2358 __rbd_obj_setup_discard(obj_req, 1);
2359 break;
2360 default:
2361 rbd_assert(0);
2362 }
2844 2363
2845 return false; 2364 rbd_obj_request_submit(obj_req);
2365 return 0;
2846} 2366}
2847 2367
2848static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) 2368static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
2849{ 2369{
2850 rbd_assert(obj_request_img_data_test(obj_request)); 2370 u32 i;
2851 rbd_assert(obj_request_type_valid(obj_request->type));
2852 rbd_assert(obj_request->img_request);
2853 2371
2854 if (img_obj_request_simple(obj_request)) { 2372 rbd_assert(!obj_req->copyup_bvecs);
2855 rbd_obj_request_submit(obj_request); 2373 obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
2856 return 0; 2374 obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
2857 } 2375 sizeof(*obj_req->copyup_bvecs),
2376 GFP_NOIO);
2377 if (!obj_req->copyup_bvecs)
2378 return -ENOMEM;
2858 2379
2859 /* 2380 for (i = 0; i < obj_req->copyup_bvec_count; i++) {
2860 * It's a layered write. The target object might exist but 2381 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
2861 * we may not know that yet. If we know it doesn't exist,
2862 * start by reading the data for the full target object from
2863 * the parent so we can use it for a copyup to the target.
2864 */
2865 if (obj_request_known_test(obj_request))
2866 return rbd_img_obj_parent_read_full(obj_request);
2867 2382
2868 /* We don't know whether the target exists. Go find out. */ 2383 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
2384 if (!obj_req->copyup_bvecs[i].bv_page)
2385 return -ENOMEM;
2386
2387 obj_req->copyup_bvecs[i].bv_offset = 0;
2388 obj_req->copyup_bvecs[i].bv_len = len;
2389 obj_overlap -= len;
2390 }
2869 2391
2870 return rbd_img_obj_exists_submit(obj_request); 2392 rbd_assert(!obj_overlap);
2393 return 0;
2871} 2394}
2872 2395
2873static int rbd_img_request_submit(struct rbd_img_request *img_request) 2396static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
2874{ 2397{
2875 struct rbd_obj_request *obj_request; 2398 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2876 struct rbd_obj_request *next_obj_request; 2399 int ret;
2877 int ret = 0;
2878
2879 dout("%s: img %p\n", __func__, img_request);
2880 2400
2881 rbd_img_request_get(img_request); 2401 rbd_assert(obj_req->num_img_extents);
2882 for_each_obj_request_safe(img_request, obj_request, next_obj_request) { 2402 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2883 ret = rbd_img_obj_request_submit(obj_request); 2403 rbd_dev->parent_overlap);
2884 if (ret) 2404 if (!obj_req->num_img_extents) {
2885 goto out_put_ireq; 2405 /*
2406 * The overlap has become 0 (most likely because the
2407 * image has been flattened). Use rbd_obj_issue_copyup()
2408 * to re-submit the original write request -- the copyup
2409 * operation itself will be a no-op, since someone must
2410 * have populated the child object while we weren't
2411 * looking. Move to WRITE_FLAT state as we'll be done
2412 * with the operation once the null copyup completes.
2413 */
2414 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2415 return rbd_obj_issue_copyup(obj_req, 0);
2886 } 2416 }
2887 2417
2888out_put_ireq: 2418 ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
2889 rbd_img_request_put(img_request); 2419 if (ret)
2890 return ret; 2420 return ret;
2421
2422 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
2423 return rbd_obj_read_from_parent(obj_req);
2891} 2424}
2892 2425
2893static void rbd_img_parent_read_callback(struct rbd_img_request *img_request) 2426static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
2894{ 2427{
2895 struct rbd_obj_request *obj_request; 2428 int ret;
2896 struct rbd_device *rbd_dev;
2897 u64 obj_end;
2898 u64 img_xferred;
2899 int img_result;
2900 2429
2901 rbd_assert(img_request_child_test(img_request)); 2430again:
2431 switch (obj_req->write_state) {
2432 case RBD_OBJ_WRITE_GUARD:
2433 rbd_assert(!obj_req->xferred);
2434 if (obj_req->result == -ENOENT) {
2435 /*
2436 * The target object doesn't exist. Read the data for
2437 * the entire target object up to the overlap point (if
2438 * any) from the parent, so we can use it for a copyup.
2439 */
2440 ret = rbd_obj_handle_write_guard(obj_req);
2441 if (ret) {
2442 obj_req->result = ret;
2443 return true;
2444 }
2445 return false;
2446 }
2447 /* fall through */
2448 case RBD_OBJ_WRITE_FLAT:
2449 if (!obj_req->result)
2450 /*
2451 * There is no such thing as a successful short
2452 * write -- indicate the whole request was satisfied.
2453 */
2454 obj_req->xferred = obj_req->ex.oe_len;
2455 return true;
2456 case RBD_OBJ_WRITE_COPYUP:
2457 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2458 if (obj_req->result)
2459 goto again;
2902 2460
2903 /* First get what we need from the image request and release it */ 2461 rbd_assert(obj_req->xferred);
2462 ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
2463 if (ret) {
2464 obj_req->result = ret;
2465 return true;
2466 }
2467 return false;
2468 default:
2469 rbd_assert(0);
2470 }
2471}
2904 2472
2905 obj_request = img_request->obj_request; 2473/*
2906 img_xferred = img_request->xferred; 2474 * Returns true if @obj_req is completed, or false otherwise.
2907 img_result = img_request->result; 2475 */
2908 rbd_img_request_put(img_request); 2476static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2477{
2478 switch (obj_req->img_request->op_type) {
2479 case OBJ_OP_READ:
2480 return rbd_obj_handle_read(obj_req);
2481 case OBJ_OP_WRITE:
2482 return rbd_obj_handle_write(obj_req);
2483 case OBJ_OP_DISCARD:
2484 if (rbd_obj_handle_write(obj_req)) {
2485 /*
2486 * Hide -ENOENT from delete/truncate/zero -- discarding
2487 * a non-existent object is not a problem.
2488 */
2489 if (obj_req->result == -ENOENT) {
2490 obj_req->result = 0;
2491 obj_req->xferred = obj_req->ex.oe_len;
2492 }
2493 return true;
2494 }
2495 return false;
2496 default:
2497 rbd_assert(0);
2498 }
2499}
2909 2500
2910 /* 2501static void rbd_obj_end_request(struct rbd_obj_request *obj_req)
2911 * If the overlap has become 0 (most likely because the 2502{
2912 * image has been flattened) we need to re-submit the 2503 struct rbd_img_request *img_req = obj_req->img_request;
2913 * original request. 2504
2914 */ 2505 rbd_assert((!obj_req->result &&
2915 rbd_assert(obj_request); 2506 obj_req->xferred == obj_req->ex.oe_len) ||
2916 rbd_assert(obj_request->img_request); 2507 (obj_req->result < 0 && !obj_req->xferred));
2917 rbd_dev = obj_request->img_request->rbd_dev; 2508 if (!obj_req->result) {
2918 if (!rbd_dev->parent_overlap) { 2509 img_req->xferred += obj_req->xferred;
2919 rbd_obj_request_submit(obj_request);
2920 return; 2510 return;
2921 } 2511 }
2922 2512
2923 obj_request->result = img_result; 2513 rbd_warn(img_req->rbd_dev,
2924 if (obj_request->result) 2514 "%s at objno %llu %llu~%llu result %d xferred %llu",
2925 goto out; 2515 obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
2516 obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result,
2517 obj_req->xferred);
2518 if (!img_req->result) {
2519 img_req->result = obj_req->result;
2520 img_req->xferred = 0;
2521 }
2522}
2926 2523
2927 /* 2524static void rbd_img_end_child_request(struct rbd_img_request *img_req)
2928 * We need to zero anything beyond the parent overlap 2525{
2929 * boundary. Since rbd_img_obj_request_read_callback() 2526 struct rbd_obj_request *obj_req = img_req->obj_request;
2930 * will zero anything beyond the end of a short read, an
2931 * easy way to do this is to pretend the data from the
2932 * parent came up short--ending at the overlap boundary.
2933 */
2934 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2935 obj_end = obj_request->img_offset + obj_request->length;
2936 if (obj_end > rbd_dev->parent_overlap) {
2937 u64 xferred = 0;
2938 2527
2939 if (obj_request->img_offset < rbd_dev->parent_overlap) 2528 rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
2940 xferred = rbd_dev->parent_overlap - 2529 rbd_assert((!img_req->result &&
2941 obj_request->img_offset; 2530 img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) ||
2531 (img_req->result < 0 && !img_req->xferred));
2942 2532
2943 obj_request->xferred = min(img_xferred, xferred); 2533 obj_req->result = img_req->result;
2944 } else { 2534 obj_req->xferred = img_req->xferred;
2945 obj_request->xferred = img_xferred; 2535 rbd_img_request_put(img_req);
2946 }
2947out:
2948 rbd_img_obj_request_read_callback(obj_request);
2949 rbd_obj_request_complete(obj_request);
2950} 2536}
2951 2537
2952static void rbd_img_parent_read(struct rbd_obj_request *obj_request) 2538static void rbd_img_end_request(struct rbd_img_request *img_req)
2953{ 2539{
2954 struct rbd_img_request *img_request; 2540 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
2955 int result; 2541 rbd_assert((!img_req->result &&
2542 img_req->xferred == blk_rq_bytes(img_req->rq)) ||
2543 (img_req->result < 0 && !img_req->xferred));
2956 2544
2957 rbd_assert(obj_request_img_data_test(obj_request)); 2545 blk_mq_end_request(img_req->rq,
2958 rbd_assert(obj_request->img_request != NULL); 2546 errno_to_blk_status(img_req->result));
2959 rbd_assert(obj_request->result == (s32) -ENOENT); 2547 rbd_img_request_put(img_req);
2960 rbd_assert(obj_request_type_valid(obj_request->type)); 2548}
2961 2549
2962 /* rbd_read_finish(obj_request, obj_request->length); */ 2550static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2963 img_request = rbd_parent_request_create(obj_request, 2551{
2964 obj_request->img_offset, 2552 struct rbd_img_request *img_req;
2965 obj_request->length);
2966 result = -ENOMEM;
2967 if (!img_request)
2968 goto out_err;
2969 2553
2970 if (obj_request->type == OBJ_REQUEST_BIO) 2554again:
2971 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, 2555 if (!__rbd_obj_handle_request(obj_req))
2972 obj_request->bio_list); 2556 return;
2973 else
2974 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2975 obj_request->pages);
2976 if (result)
2977 goto out_err;
2978 2557
2979 img_request->callback = rbd_img_parent_read_callback; 2558 img_req = obj_req->img_request;
2980 result = rbd_img_request_submit(img_request); 2559 spin_lock(&img_req->completion_lock);
2981 if (result) 2560 rbd_obj_end_request(obj_req);
2982 goto out_err; 2561 rbd_assert(img_req->pending_count);
2562 if (--img_req->pending_count) {
2563 spin_unlock(&img_req->completion_lock);
2564 return;
2565 }
2983 2566
2984 return; 2567 spin_unlock(&img_req->completion_lock);
2985out_err: 2568 if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
2986 if (img_request) 2569 obj_req = img_req->obj_request;
2987 rbd_img_request_put(img_request); 2570 rbd_img_end_child_request(img_req);
2988 obj_request->result = result; 2571 goto again;
2989 obj_request->xferred = 0; 2572 }
2990 obj_request_done_set(obj_request); 2573 rbd_img_end_request(img_req);
2991} 2574}
2992 2575
2993static const struct rbd_client_id rbd_empty_cid; 2576static const struct rbd_client_id rbd_empty_cid;
@@ -3091,8 +2674,8 @@ static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3091{ 2674{
3092 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2675 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3093 struct rbd_client_id cid = rbd_get_cid(rbd_dev); 2676 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3094 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN; 2677 char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3095 char buf[buf_size]; 2678 int buf_size = sizeof(buf);
3096 void *p = buf; 2679 void *p = buf;
3097 2680
3098 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op); 2681 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
@@ -3610,8 +3193,8 @@ static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3610 u64 notify_id, u64 cookie, s32 *result) 3193 u64 notify_id, u64 cookie, s32 *result)
3611{ 3194{
3612 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3195 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3613 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN; 3196 char buf[4 + CEPH_ENCODING_START_BLK_LEN];
3614 char buf[buf_size]; 3197 int buf_size = sizeof(buf);
3615 int ret; 3198 int ret;
3616 3199
3617 if (result) { 3200 if (result) {
@@ -3887,7 +3470,7 @@ static void rbd_reregister_watch(struct work_struct *work)
3887 3470
3888 ret = rbd_dev_refresh(rbd_dev); 3471 ret = rbd_dev_refresh(rbd_dev);
3889 if (ret) 3472 if (ret)
3890 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); 3473 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
3891} 3474}
3892 3475
3893/* 3476/*
@@ -4070,8 +3653,7 @@ static void rbd_queue_workfn(struct work_struct *work)
4070 } 3653 }
4071 } 3654 }
4072 3655
4073 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type, 3656 img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
4074 snapc);
4075 if (!img_request) { 3657 if (!img_request) {
4076 result = -ENOMEM; 3658 result = -ENOMEM;
4077 goto err_unlock; 3659 goto err_unlock;
@@ -4080,18 +3662,14 @@ static void rbd_queue_workfn(struct work_struct *work)
4080 snapc = NULL; /* img_request consumes a ref */ 3662 snapc = NULL; /* img_request consumes a ref */
4081 3663
4082 if (op_type == OBJ_OP_DISCARD) 3664 if (op_type == OBJ_OP_DISCARD)
4083 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA, 3665 result = rbd_img_fill_nodata(img_request, offset, length);
4084 NULL);
4085 else 3666 else
4086 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, 3667 result = rbd_img_fill_from_bio(img_request, offset, length,
4087 rq->bio); 3668 rq->bio);
4088 if (result)
4089 goto err_img_request;
4090
4091 result = rbd_img_request_submit(img_request);
4092 if (result) 3669 if (result)
4093 goto err_img_request; 3670 goto err_img_request;
4094 3671
3672 rbd_img_request_submit(img_request);
4095 if (must_be_locked) 3673 if (must_be_locked)
4096 up_read(&rbd_dev->lock_rwsem); 3674 up_read(&rbd_dev->lock_rwsem);
4097 return; 3675 return;
@@ -4369,7 +3947,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
4369 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE); 3947 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
4370 q->limits.max_sectors = queue_max_hw_sectors(q); 3948 q->limits.max_sectors = queue_max_hw_sectors(q);
4371 blk_queue_max_segments(q, USHRT_MAX); 3949 blk_queue_max_segments(q, USHRT_MAX);
4372 blk_queue_max_segment_size(q, segment_size); 3950 blk_queue_max_segment_size(q, UINT_MAX);
4373 blk_queue_io_min(q, segment_size); 3951 blk_queue_io_min(q, segment_size);
4374 blk_queue_io_opt(q, segment_size); 3952 blk_queue_io_opt(q, segment_size);
4375 3953
@@ -5057,9 +4635,6 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5057 } __attribute__ ((packed)) striping_info_buf = { 0 }; 4635 } __attribute__ ((packed)) striping_info_buf = { 0 };
5058 size_t size = sizeof (striping_info_buf); 4636 size_t size = sizeof (striping_info_buf);
5059 void *p; 4637 void *p;
5060 u64 obj_size;
5061 u64 stripe_unit;
5062 u64 stripe_count;
5063 int ret; 4638 int ret;
5064 4639
5065 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, 4640 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
@@ -5071,31 +4646,9 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5071 if (ret < size) 4646 if (ret < size)
5072 return -ERANGE; 4647 return -ERANGE;
5073 4648
5074 /*
5075 * We don't actually support the "fancy striping" feature
5076 * (STRIPINGV2) yet, but if the striping sizes are the
5077 * defaults the behavior is the same as before. So find
5078 * out, and only fail if the image has non-default values.
5079 */
5080 ret = -EINVAL;
5081 obj_size = rbd_obj_bytes(&rbd_dev->header);
5082 p = &striping_info_buf; 4649 p = &striping_info_buf;
5083 stripe_unit = ceph_decode_64(&p); 4650 rbd_dev->header.stripe_unit = ceph_decode_64(&p);
5084 if (stripe_unit != obj_size) { 4651 rbd_dev->header.stripe_count = ceph_decode_64(&p);
5085 rbd_warn(rbd_dev, "unsupported stripe unit "
5086 "(got %llu want %llu)",
5087 stripe_unit, obj_size);
5088 return -EINVAL;
5089 }
5090 stripe_count = ceph_decode_64(&p);
5091 if (stripe_count != 1) {
5092 rbd_warn(rbd_dev, "unsupported stripe count "
5093 "(got %llu want 1)", stripe_count);
5094 return -EINVAL;
5095 }
5096 rbd_dev->header.stripe_unit = stripe_unit;
5097 rbd_dev->header.stripe_count = stripe_count;
5098
5099 return 0; 4652 return 0;
5100} 4653}
5101 4654
@@ -5653,39 +5206,6 @@ out_err:
5653 return ret; 5206 return ret;
5654} 5207}
5655 5208
5656/*
5657 * Return pool id (>= 0) or a negative error code.
5658 */
5659static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5660{
5661 struct ceph_options *opts = rbdc->client->options;
5662 u64 newest_epoch;
5663 int tries = 0;
5664 int ret;
5665
5666again:
5667 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5668 if (ret == -ENOENT && tries++ < 1) {
5669 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5670 &newest_epoch);
5671 if (ret < 0)
5672 return ret;
5673
5674 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5675 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
5676 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5677 newest_epoch,
5678 opts->mount_timeout);
5679 goto again;
5680 } else {
5681 /* the osdmap we have is new enough */
5682 return -ENOENT;
5683 }
5684 }
5685
5686 return ret;
5687}
5688
5689static void rbd_dev_image_unlock(struct rbd_device *rbd_dev) 5209static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5690{ 5210{
5691 down_write(&rbd_dev->lock_rwsem); 5211 down_write(&rbd_dev->lock_rwsem);
@@ -6114,7 +5634,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
6114 } 5634 }
6115 5635
6116 /* pick the pool */ 5636 /* pick the pool */
6117 rc = rbd_add_get_pool_id(rbdc, spec->pool_name); 5637 rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
6118 if (rc < 0) { 5638 if (rc < 0) {
6119 if (rc == -ENOENT) 5639 if (rc == -ENOENT)
6120 pr_info("pool %s does not exist\n", spec->pool_name); 5640 pr_info("pool %s does not exist\n", spec->pool_name);
@@ -6366,16 +5886,8 @@ static int rbd_slab_init(void)
6366 if (!rbd_obj_request_cache) 5886 if (!rbd_obj_request_cache)
6367 goto out_err; 5887 goto out_err;
6368 5888
6369 rbd_assert(!rbd_bio_clone);
6370 rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0);
6371 if (!rbd_bio_clone)
6372 goto out_err_clone;
6373
6374 return 0; 5889 return 0;
6375 5890
6376out_err_clone:
6377 kmem_cache_destroy(rbd_obj_request_cache);
6378 rbd_obj_request_cache = NULL;
6379out_err: 5891out_err:
6380 kmem_cache_destroy(rbd_img_request_cache); 5892 kmem_cache_destroy(rbd_img_request_cache);
6381 rbd_img_request_cache = NULL; 5893 rbd_img_request_cache = NULL;
@@ -6391,10 +5903,6 @@ static void rbd_slab_exit(void)
6391 rbd_assert(rbd_img_request_cache); 5903 rbd_assert(rbd_img_request_cache);
6392 kmem_cache_destroy(rbd_img_request_cache); 5904 kmem_cache_destroy(rbd_img_request_cache);
6393 rbd_img_request_cache = NULL; 5905 rbd_img_request_cache = NULL;
6394
6395 rbd_assert(rbd_bio_clone);
6396 bioset_free(rbd_bio_clone);
6397 rbd_bio_clone = NULL;
6398} 5906}
6399 5907
6400static int __init rbd_init(void) 5908static int __init rbd_init(void)