summaryrefslogtreecommitdiffstats
path: root/drivers/block/rbd.c
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2018-01-29 08:04:08 -0500
committerIlya Dryomov <idryomov@gmail.com>2018-04-02 04:12:40 -0400
commit3da691bf436690c4bb943d5d16e5934937625578 (patch)
treea278eb95bf9e51ff34368d522727ba274e0e942f /drivers/block/rbd.c
parent45a267dbb40f5cf15efa23ce815c4fe0b4674aa2 (diff)
rbd: new request handling code
The notable changes are: - instead of explicitly stat'ing the object to see if it exists before issuing the write, send the write optimistically along with the stat in a single OSD request - zero copyup optimization - all object requests are associated with an image request and have a valid ->img_request pointer; there are no standalone (!IMG_DATA) object requests anymore - code is structured as a state machine (vs a bunch of callbacks with implicit state) Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'drivers/block/rbd.c')
-rw-r--r--drivers/block/rbd.c678
1 files changed, 601 insertions, 77 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index bff3e138543f..1bffad122dc2 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -235,11 +235,37 @@ enum obj_req_flags {
235 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */ 235 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
236}; 236};
237 237
238/*
239 * Writes go through the following state machine to deal with
240 * layering:
241 *
242 * need copyup
243 * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
244 * | ^ |
245 * v \------------------------------/
246 * done
247 * ^
248 * |
249 * RBD_OBJ_WRITE_FLAT
250 *
251 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
252 * there is a parent or not.
253 */
254enum rbd_obj_write_state {
255 RBD_OBJ_WRITE_FLAT = 1,
256 RBD_OBJ_WRITE_GUARD,
257 RBD_OBJ_WRITE_COPYUP,
258};
259
238struct rbd_obj_request { 260struct rbd_obj_request {
239 u64 object_no; 261 u64 object_no;
240 u64 offset; /* object start byte */ 262 u64 offset; /* object start byte */
241 u64 length; /* bytes from offset */ 263 u64 length; /* bytes from offset */
242 unsigned long flags; 264 unsigned long flags;
265 union {
266 bool tried_parent; /* for reads */
267 enum rbd_obj_write_state write_state; /* for writes */
268 };
243 269
244 /* 270 /*
245 * An object request associated with an image will have its 271 * An object request associated with an image will have its
@@ -1283,6 +1309,27 @@ static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1283} 1309}
1284 1310
1285/* 1311/*
1312 * Zero a range in @obj_req data buffer defined by a bio (list) or
1313 * bio_vec array.
1314 *
1315 * @off is relative to the start of the data buffer.
1316 */
1317static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1318 u32 bytes)
1319{
1320 switch (obj_req->type) {
1321 case OBJ_REQUEST_BIO:
1322 zero_bios(&obj_req->bio_pos, off, bytes);
1323 break;
1324 case OBJ_REQUEST_BVECS:
1325 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1326 break;
1327 default:
1328 rbd_assert(0);
1329 }
1330}
1331
1332/*
1286 * The default/initial value for all object request flags is 0. For 1333 * The default/initial value for all object request flags is 0. For
1287 * each flag, once its value is set to 1 it is never reset to 0 1334 * each flag, once its value is set to 1 it is never reset to 0
1288 * again. 1335 * again.
@@ -1567,6 +1614,35 @@ rbd_img_request_op_type(struct rbd_img_request *img_request)
1567 return OBJ_OP_READ; 1614 return OBJ_OP_READ;
1568} 1615}
1569 1616
1617static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1618{
1619 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1620
1621 return !obj_req->offset &&
1622 obj_req->length == rbd_dev->layout.object_size;
1623}
1624
1625static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1626{
1627 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1628
1629 return obj_req->offset + obj_req->length ==
1630 rbd_dev->layout.object_size;
1631}
1632
1633static bool rbd_img_is_write(struct rbd_img_request *img_req)
1634{
1635 switch (rbd_img_request_op_type(img_req)) {
1636 case OBJ_OP_READ:
1637 return false;
1638 case OBJ_OP_WRITE:
1639 case OBJ_OP_DISCARD:
1640 return true;
1641 default:
1642 rbd_assert(0);
1643 }
1644}
1645
1570static void 1646static void
1571rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) 1647rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1572{ 1648{
@@ -1697,63 +1773,28 @@ static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1697 obj_request_done_set(obj_request); 1773 obj_request_done_set(obj_request);
1698} 1774}
1699 1775
1776static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
1777
1700static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) 1778static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1701{ 1779{
1702 struct rbd_obj_request *obj_request = osd_req->r_priv; 1780 struct rbd_obj_request *obj_req = osd_req->r_priv;
1703 u16 opcode;
1704
1705 dout("%s: osd_req %p\n", __func__, osd_req);
1706 rbd_assert(osd_req == obj_request->osd_req);
1707 if (obj_request_img_data_test(obj_request)) {
1708 rbd_assert(obj_request->img_request);
1709 rbd_assert(obj_request->which != BAD_WHICH);
1710 } else {
1711 rbd_assert(obj_request->which == BAD_WHICH);
1712 }
1713 1781
1714 if (osd_req->r_result < 0) 1782 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1715 obj_request->result = osd_req->r_result; 1783 osd_req->r_result, obj_req);
1784 rbd_assert(osd_req == obj_req->osd_req);
1716 1785
1717 /* 1786 obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
1718 * We support a 64-bit length, but ultimately it has to be 1787 if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
1719 * passed to the block layer, which just supports a 32-bit 1788 obj_req->xferred = osd_req->r_result;
1720 * length field. 1789 else
1721 */ 1790 /*
1722 obj_request->xferred = osd_req->r_ops[0].outdata_len; 1791 * Writes aren't allowed to return a data payload. In some
1723 rbd_assert(obj_request->xferred < (u64)UINT_MAX); 1792 * guarded write cases (e.g. stat + zero on an empty object)
1724 1793 * a stat response makes it through, but we don't care.
1725 opcode = osd_req->r_ops[0].op; 1794 */
1726 switch (opcode) { 1795 obj_req->xferred = 0;
1727 case CEPH_OSD_OP_READ:
1728 rbd_osd_read_callback(obj_request);
1729 break;
1730 case CEPH_OSD_OP_SETALLOCHINT:
1731 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1732 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1733 /* fall through */
1734 case CEPH_OSD_OP_WRITE:
1735 case CEPH_OSD_OP_WRITEFULL:
1736 rbd_osd_write_callback(obj_request);
1737 break;
1738 case CEPH_OSD_OP_STAT:
1739 rbd_osd_stat_callback(obj_request);
1740 break;
1741 case CEPH_OSD_OP_DELETE:
1742 case CEPH_OSD_OP_TRUNCATE:
1743 case CEPH_OSD_OP_ZERO:
1744 rbd_osd_discard_callback(obj_request);
1745 break;
1746 case CEPH_OSD_OP_CALL:
1747 rbd_osd_call_callback(obj_request);
1748 break;
1749 default:
1750 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1751 obj_request->object_no, opcode);
1752 break;
1753 }
1754 1796
1755 if (obj_request_done_test(obj_request)) 1797 rbd_obj_handle_request(obj_req);
1756 rbd_obj_request_complete(obj_request);
1757} 1798}
1758 1799
1759static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) 1800static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
@@ -1806,12 +1847,6 @@ err_req:
1806 return NULL; 1847 return NULL;
1807} 1848}
1808 1849
1809/*
1810 * Create an osd request. A read request has one osd op (read).
1811 * A write request has either one (watch) or two (hint+write) osd ops.
1812 * (All rbd data writes are prefixed with an allocation hint op, but
1813 * technically osd watch is a write request, hence this distinction.)
1814 */
1815static struct ceph_osd_request *rbd_osd_req_create( 1850static struct ceph_osd_request *rbd_osd_req_create(
1816 struct rbd_device *rbd_dev, 1851 struct rbd_device *rbd_dev,
1817 enum obj_operation_type op_type, 1852 enum obj_operation_type op_type,
@@ -1831,8 +1866,6 @@ static struct ceph_osd_request *rbd_osd_req_create(
1831 snapc = img_request->snapc; 1866 snapc = img_request->snapc;
1832 } 1867 }
1833 1868
1834 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1835
1836 return __rbd_osd_req_create(rbd_dev, snapc, num_ops, 1869 return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1837 (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ? 1870 (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
1838 CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request); 1871 CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
@@ -2251,6 +2284,211 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2251 rbd_osd_req_format_read(obj_request); 2284 rbd_osd_req_format_read(obj_request);
2252} 2285}
2253 2286
2287static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
2288{
2289 switch (obj_req->type) {
2290 case OBJ_REQUEST_BIO:
2291 osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
2292 &obj_req->bio_pos,
2293 obj_req->length);
2294 break;
2295 case OBJ_REQUEST_BVECS:
2296 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2297 obj_req->length);
2298 osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
2299 &obj_req->bvec_pos);
2300 break;
2301 default:
2302 rbd_assert(0);
2303 }
2304}
2305
2306static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
2307{
2308 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2309
2310 obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, obj_req);
2311 if (!obj_req->osd_req)
2312 return -ENOMEM;
2313
2314 osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
2315 obj_req->offset, obj_req->length, 0, 0);
2316 rbd_osd_req_setup_data(obj_req, 0);
2317
2318 rbd_osd_req_format_read(obj_req);
2319 return 0;
2320}
2321
2322static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
2323 unsigned int which)
2324{
2325 struct page **pages;
2326
2327 /*
2328 * The response data for a STAT call consists of:
2329 * le64 length;
2330 * struct {
2331 * le32 tv_sec;
2332 * le32 tv_nsec;
2333 * } mtime;
2334 */
2335 pages = ceph_alloc_page_vector(1, GFP_NOIO);
2336 if (IS_ERR(pages))
2337 return PTR_ERR(pages);
2338
2339 osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
2340 osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
2341 8 + sizeof(struct ceph_timespec),
2342 0, false, true);
2343 return 0;
2344}
2345
2346static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
2347 unsigned int which)
2348{
2349 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2350 u16 opcode;
2351
2352 osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
2353 rbd_dev->layout.object_size,
2354 rbd_dev->layout.object_size);
2355
2356 if (rbd_obj_is_entire(obj_req))
2357 opcode = CEPH_OSD_OP_WRITEFULL;
2358 else
2359 opcode = CEPH_OSD_OP_WRITE;
2360
2361 osd_req_op_extent_init(obj_req->osd_req, which, opcode,
2362 obj_req->offset, obj_req->length, 0, 0);
2363 rbd_osd_req_setup_data(obj_req, which++);
2364
2365 rbd_assert(which == obj_req->osd_req->r_num_ops);
2366 rbd_osd_req_format_write(obj_req);
2367}
2368
2369static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
2370{
2371 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2372 unsigned int num_osd_ops, which = 0;
2373 int ret;
2374
2375 if (obj_request_overlaps_parent(obj_req)) {
2376 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2377 num_osd_ops = 3; /* stat + setallochint + write/writefull */
2378 } else {
2379 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2380 num_osd_ops = 2; /* setallochint + write/writefull */
2381 }
2382
2383 obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE,
2384 num_osd_ops, obj_req);
2385 if (!obj_req->osd_req)
2386 return -ENOMEM;
2387
2388 if (obj_request_overlaps_parent(obj_req)) {
2389 ret = __rbd_obj_setup_stat(obj_req, which++);
2390 if (ret)
2391 return ret;
2392 }
2393
2394 __rbd_obj_setup_write(obj_req, which);
2395 return 0;
2396}
2397
2398static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
2399 unsigned int which)
2400{
2401 u16 opcode;
2402
2403 if (rbd_obj_is_entire(obj_req)) {
2404 if (obj_request_overlaps_parent(obj_req)) {
2405 opcode = CEPH_OSD_OP_TRUNCATE;
2406 } else {
2407 osd_req_op_init(obj_req->osd_req, which++,
2408 CEPH_OSD_OP_DELETE, 0);
2409 opcode = 0;
2410 }
2411 } else if (rbd_obj_is_tail(obj_req)) {
2412 opcode = CEPH_OSD_OP_TRUNCATE;
2413 } else {
2414 opcode = CEPH_OSD_OP_ZERO;
2415 }
2416
2417 if (opcode)
2418 osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
2419 obj_req->offset, obj_req->length,
2420 0, 0);
2421
2422 rbd_assert(which == obj_req->osd_req->r_num_ops);
2423 rbd_osd_req_format_write(obj_req);
2424}
2425
2426static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
2427{
2428 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2429 unsigned int num_osd_ops, which = 0;
2430 int ret;
2431
2432 if (rbd_obj_is_entire(obj_req)) {
2433 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2434 num_osd_ops = 1; /* truncate/delete */
2435 } else {
2436 if (obj_request_overlaps_parent(obj_req)) {
2437 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2438 num_osd_ops = 2; /* stat + truncate/zero */
2439 } else {
2440 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2441 num_osd_ops = 1; /* truncate/zero */
2442 }
2443 }
2444
2445 obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_DISCARD,
2446 num_osd_ops, obj_req);
2447 if (!obj_req->osd_req)
2448 return -ENOMEM;
2449
2450 if (!rbd_obj_is_entire(obj_req) &&
2451 obj_request_overlaps_parent(obj_req)) {
2452 ret = __rbd_obj_setup_stat(obj_req, which++);
2453 if (ret)
2454 return ret;
2455 }
2456
2457 __rbd_obj_setup_discard(obj_req, which);
2458 return 0;
2459}
2460
2461/*
2462 * For each object request in @img_req, allocate an OSD request, add
2463 * individual OSD ops and prepare them for submission. The number of
2464 * OSD ops depends on op_type and the overlap point (if any).
2465 */
2466static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2467{
2468 struct rbd_obj_request *obj_req;
2469 int ret;
2470
2471 for_each_obj_request(img_req, obj_req) {
2472 switch (rbd_img_request_op_type(img_req)) {
2473 case OBJ_OP_READ:
2474 ret = rbd_obj_setup_read(obj_req);
2475 break;
2476 case OBJ_OP_WRITE:
2477 ret = rbd_obj_setup_write(obj_req);
2478 break;
2479 case OBJ_OP_DISCARD:
2480 ret = rbd_obj_setup_discard(obj_req);
2481 break;
2482 default:
2483 rbd_assert(0);
2484 }
2485 if (ret)
2486 return ret;
2487 }
2488
2489 return 0;
2490}
2491
2254/* 2492/*
2255 * Split up an image request into one or more object requests, each 2493 * Split up an image request into one or more object requests, each
2256 * to a different object. The "type" parameter indicates whether 2494 * to a different object. The "type" parameter indicates whether
@@ -2268,7 +2506,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
2268 struct rbd_obj_request *next_obj_request; 2506 struct rbd_obj_request *next_obj_request;
2269 struct ceph_bio_iter bio_it; 2507 struct ceph_bio_iter bio_it;
2270 struct ceph_bvec_iter bvec_it; 2508 struct ceph_bvec_iter bvec_it;
2271 enum obj_operation_type op_type;
2272 u64 img_offset; 2509 u64 img_offset;
2273 u64 resid; 2510 u64 resid;
2274 2511
@@ -2278,7 +2515,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
2278 img_offset = img_request->offset; 2515 img_offset = img_request->offset;
2279 resid = img_request->length; 2516 resid = img_request->length;
2280 rbd_assert(resid > 0); 2517 rbd_assert(resid > 0);
2281 op_type = rbd_img_request_op_type(img_request);
2282 2518
2283 if (type == OBJ_REQUEST_BIO) { 2519 if (type == OBJ_REQUEST_BIO) {
2284 bio_it = *(struct ceph_bio_iter *)data_desc; 2520 bio_it = *(struct ceph_bio_iter *)data_desc;
@@ -2289,7 +2525,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
2289 } 2525 }
2290 2526
2291 while (resid) { 2527 while (resid) {
2292 struct ceph_osd_request *osd_req;
2293 u64 object_no = img_offset >> rbd_dev->header.obj_order; 2528 u64 object_no = img_offset >> rbd_dev->header.obj_order;
2294 u64 offset = rbd_segment_offset(rbd_dev, img_offset); 2529 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2295 u64 length = rbd_segment_length(rbd_dev, img_offset, resid); 2530 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
@@ -2317,23 +2552,14 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
2317 ceph_bvec_iter_advance(&bvec_it, length); 2552 ceph_bvec_iter_advance(&bvec_it, length);
2318 } 2553 }
2319 2554
2320 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2321 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2322 obj_request);
2323 if (!osd_req)
2324 goto out_unwind;
2325
2326 obj_request->osd_req = osd_req;
2327 obj_request->callback = rbd_img_obj_callback; 2555 obj_request->callback = rbd_img_obj_callback;
2328 obj_request->img_offset = img_offset; 2556 obj_request->img_offset = img_offset;
2329 2557
2330 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2331
2332 img_offset += length; 2558 img_offset += length;
2333 resid -= length; 2559 resid -= length;
2334 } 2560 }
2335 2561
2336 return 0; 2562 return __rbd_img_fill_request(img_request);
2337 2563
2338out_unwind: 2564out_unwind:
2339 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 2565 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
@@ -2712,16 +2938,171 @@ static int rbd_img_request_submit(struct rbd_img_request *img_request)
2712 2938
2713 rbd_img_request_get(img_request); 2939 rbd_img_request_get(img_request);
2714 for_each_obj_request_safe(img_request, obj_request, next_obj_request) { 2940 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2715 ret = rbd_img_obj_request_submit(obj_request); 2941 rbd_obj_request_submit(obj_request);
2716 if (ret)
2717 goto out_put_ireq;
2718 } 2942 }
2719 2943
2720out_put_ireq:
2721 rbd_img_request_put(img_request); 2944 rbd_img_request_put(img_request);
2722 return ret; 2945 return ret;
2723} 2946}
2724 2947
2948static void rbd_img_end_child_request(struct rbd_img_request *img_req);
2949
2950static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req,
2951 u64 img_offset, u32 bytes)
2952{
2953 struct rbd_img_request *img_req = obj_req->img_request;
2954 struct rbd_img_request *child_img_req;
2955 int ret;
2956
2957 child_img_req = rbd_parent_request_create(obj_req, img_offset, bytes);
2958 if (!child_img_req)
2959 return -ENOMEM;
2960
2961 child_img_req->callback = rbd_img_end_child_request;
2962
2963 if (!rbd_img_is_write(img_req)) {
2964 switch (obj_req->type) {
2965 case OBJ_REQUEST_BIO:
2966 ret = rbd_img_request_fill(child_img_req,
2967 OBJ_REQUEST_BIO,
2968 &obj_req->bio_pos);
2969 break;
2970 case OBJ_REQUEST_BVECS:
2971 ret = rbd_img_request_fill(child_img_req,
2972 OBJ_REQUEST_BVECS,
2973 &obj_req->bvec_pos);
2974 break;
2975 default:
2976 rbd_assert(0);
2977 }
2978 } else {
2979 struct ceph_bvec_iter it = {
2980 .bvecs = obj_req->copyup_bvecs,
2981 .iter = { .bi_size = bytes },
2982 };
2983
2984 ret = rbd_img_request_fill(child_img_req, OBJ_REQUEST_BVECS,
2985 &it);
2986 }
2987 if (ret) {
2988 rbd_img_request_put(child_img_req);
2989 return ret;
2990 }
2991
2992 rbd_img_request_submit(child_img_req);
2993 return 0;
2994}
2995
2996static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
2997{
2998 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2999 int ret;
3000
3001 if (obj_req->result == -ENOENT &&
3002 obj_req->img_offset < rbd_dev->parent_overlap &&
3003 !obj_req->tried_parent) {
3004 u64 obj_overlap = min(obj_req->length,
3005 rbd_dev->parent_overlap - obj_req->img_offset);
3006
3007 obj_req->tried_parent = true;
3008 ret = rbd_obj_read_from_parent(obj_req, obj_req->img_offset,
3009 obj_overlap);
3010 if (ret) {
3011 obj_req->result = ret;
3012 return true;
3013 }
3014 return false;
3015 }
3016
3017 /*
3018 * -ENOENT means a hole in the image -- zero-fill the entire
3019 * length of the request. A short read also implies zero-fill
3020 * to the end of the request. In both cases we update xferred
3021 * count to indicate the whole request was satisfied.
3022 */
3023 if (obj_req->result == -ENOENT ||
3024 (!obj_req->result && obj_req->xferred < obj_req->length)) {
3025 rbd_assert(!obj_req->xferred || !obj_req->result);
3026 rbd_obj_zero_range(obj_req, obj_req->xferred,
3027 obj_req->length - obj_req->xferred);
3028 obj_req->result = 0;
3029 obj_req->xferred = obj_req->length;
3030 }
3031
3032 return true;
3033}
3034
3035/*
3036 * copyup_bvecs pages are never highmem pages
3037 */
3038static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
3039{
3040 struct ceph_bvec_iter it = {
3041 .bvecs = bvecs,
3042 .iter = { .bi_size = bytes },
3043 };
3044
3045 ceph_bvec_iter_advance_step(&it, bytes, ({
3046 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
3047 bv.bv_len))
3048 return false;
3049 }));
3050 return true;
3051}
3052
3053static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
3054{
3055 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3056 unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
3057
3058 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3059 rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
3060 rbd_osd_req_destroy(obj_req->osd_req);
3061
3062 /*
3063 * Create a copyup request with the same number of OSD ops as
3064 * the original request. The original request was stat + op(s),
3065 * the new copyup request will be copyup + the same op(s).
3066 */
3067 obj_req->osd_req = rbd_osd_req_create(rbd_dev,
3068 rbd_img_request_op_type(obj_req->img_request),
3069 num_osd_ops, obj_req);
3070 if (!obj_req->osd_req)
3071 return -ENOMEM;
3072
3073 /*
3074 * Only send non-zero copyup data to save some I/O and network
3075 * bandwidth -- zero copyup data is equivalent to the object not
3076 * existing.
3077 */
3078 if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
3079 dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
3080 bytes = 0;
3081 }
3082
3083 osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
3084 "copyup");
3085 osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
3086 obj_req->copyup_bvecs, bytes);
3087
3088 switch (rbd_img_request_op_type(obj_req->img_request)) {
3089 case OBJ_OP_WRITE:
3090 __rbd_obj_setup_write(obj_req, 1);
3091 break;
3092 case OBJ_OP_DISCARD:
3093 rbd_assert(!rbd_obj_is_entire(obj_req));
3094 __rbd_obj_setup_discard(obj_req, 1);
3095 break;
3096 default:
3097 rbd_assert(0);
3098 }
3099
3100 rbd_obj_request_submit(obj_req);
3101 /* FIXME: in lieu of rbd_img_obj_callback() */
3102 rbd_img_request_put(obj_req->img_request);
3103 return 0;
3104}
3105
2725static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap) 3106static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
2726{ 3107{
2727 u32 i; 3108 u32 i;
@@ -2850,6 +3231,149 @@ out_err:
2850 obj_request_done_set(obj_request); 3231 obj_request_done_set(obj_request);
2851} 3232}
2852 3233
3234static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
3235{
3236 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3237 u64 img_offset;
3238 u64 obj_overlap;
3239 int ret;
3240
3241 if (!obj_request_overlaps_parent(obj_req)) {
3242 /*
3243 * The overlap has become 0 (most likely because the
3244 * image has been flattened). Use rbd_obj_issue_copyup()
3245 * to re-submit the original write request -- the copyup
3246 * operation itself will be a no-op, since someone must
3247 * have populated the child object while we weren't
3248 * looking. Move to WRITE_FLAT state as we'll be done
3249 * with the operation once the null copyup completes.
3250 */
3251 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
3252 return rbd_obj_issue_copyup(obj_req, 0);
3253 }
3254
3255 /*
3256 * Determine the byte range covered by the object in the
3257 * child image to which the original request was to be sent.
3258 */
3259 img_offset = obj_req->img_offset - obj_req->offset;
3260 obj_overlap = rbd_dev->layout.object_size;
3261
3262 /*
3263 * There is no defined parent data beyond the parent
3264 * overlap, so limit what we read at that boundary if
3265 * necessary.
3266 */
3267 if (img_offset + obj_overlap > rbd_dev->parent_overlap) {
3268 rbd_assert(img_offset < rbd_dev->parent_overlap);
3269 obj_overlap = rbd_dev->parent_overlap - img_offset;
3270 }
3271
3272 ret = setup_copyup_bvecs(obj_req, obj_overlap);
3273 if (ret)
3274 return ret;
3275
3276 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3277 return rbd_obj_read_from_parent(obj_req, img_offset, obj_overlap);
3278}
3279
3280static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
3281{
3282 int ret;
3283
3284again:
3285 switch (obj_req->write_state) {
3286 case RBD_OBJ_WRITE_GUARD:
3287 rbd_assert(!obj_req->xferred);
3288 if (obj_req->result == -ENOENT) {
3289 /*
3290 * The target object doesn't exist. Read the data for
3291 * the entire target object up to the overlap point (if
3292 * any) from the parent, so we can use it for a copyup.
3293 */
3294 ret = rbd_obj_handle_write_guard(obj_req);
3295 if (ret) {
3296 obj_req->result = ret;
3297 return true;
3298 }
3299 return false;
3300 }
3301 /* fall through */
3302 case RBD_OBJ_WRITE_FLAT:
3303 if (!obj_req->result)
3304 /*
3305 * There is no such thing as a successful short
3306 * write -- indicate the whole request was satisfied.
3307 */
3308 obj_req->xferred = obj_req->length;
3309 return true;
3310 case RBD_OBJ_WRITE_COPYUP:
3311 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
3312 if (obj_req->result)
3313 goto again;
3314
3315 rbd_assert(obj_req->xferred);
3316 ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
3317 if (ret) {
3318 obj_req->result = ret;
3319 return true;
3320 }
3321 return false;
3322 default:
3323 rbd_assert(0);
3324 }
3325}
3326
3327/*
3328 * Returns true if @obj_req is completed, or false otherwise.
3329 */
3330static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
3331{
3332 switch (rbd_img_request_op_type(obj_req->img_request)) {
3333 case OBJ_OP_READ:
3334 return rbd_obj_handle_read(obj_req);
3335 case OBJ_OP_WRITE:
3336 return rbd_obj_handle_write(obj_req);
3337 case OBJ_OP_DISCARD:
3338 if (rbd_obj_handle_write(obj_req)) {
3339 /*
3340 * Hide -ENOENT from delete/truncate/zero -- discarding
3341 * a non-existent object is not a problem.
3342 */
3343 if (obj_req->result == -ENOENT) {
3344 obj_req->result = 0;
3345 obj_req->xferred = obj_req->length;
3346 }
3347 return true;
3348 }
3349 return false;
3350 default:
3351 rbd_assert(0);
3352 }
3353}
3354
3355static void rbd_img_end_child_request(struct rbd_img_request *img_req)
3356{
3357 struct rbd_obj_request *obj_req = img_req->obj_request;
3358
3359 rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
3360
3361 obj_req->result = img_req->result;
3362 obj_req->xferred = img_req->xferred;
3363 rbd_img_request_put(img_req);
3364
3365 rbd_obj_handle_request(obj_req);
3366}
3367
3368static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
3369{
3370 if (!__rbd_obj_handle_request(obj_req))
3371 return;
3372
3373 obj_request_done_set(obj_req);
3374 rbd_obj_request_complete(obj_req);
3375}
3376
2853static const struct rbd_client_id rbd_empty_cid; 3377static const struct rbd_client_id rbd_empty_cid;
2854 3378
2855static bool rbd_cid_equal(const struct rbd_client_id *lhs, 3379static bool rbd_cid_equal(const struct rbd_client_id *lhs,