aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/block
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2012-10-20 23:17:27 -0400
committerAlex Elder <elder@inktank.com>2012-10-30 09:34:28 -0400
commitf7760dad286829682a8d36f4563ab20a65732414 (patch)
tree2eb641713ce74545ce509b3e25e65e69bdc2166c /drivers/block
parent0ed7285e0001b960c888e5455ae982025210ed3d (diff)
rbd: simplify rbd_rq_fn()
When processing a request, rbd_rq_fn() makes clones of the bio's in the request's bio chain and submits the results to osd's to be satisfied. If a request bio straddles the boundary between objects backing the rbd image, it must be represented by two cloned bio's, one for the first part (at the end of one object) and one for the second (at the beginning of the next object). This has been handled by a function bio_chain_clone(), which includes an interface only a mother could love, and which has been found to have other problems. This patch defines two new fairly generic bio functions (one which replaces bio_chain_clone()) to help out the situation, and then revises rbd_rq_fn() to make use of them. First, bio_clone_range() clones a portion of a single bio, starting at a given offset within the bio and including only as many bytes as requested. As a convenience, a request to clone the entire bio is passed directly to bio_clone(). Second, bio_chain_clone_range() performs a similar function, producing a chain of cloned bio's covering a sub-range of the source chain. No bio_pair structures are used, and if successful the result will represent exactly the specified range. Using bio_chain_clone_range() makes bio_rq_fn() a little easier to understand, because it avoids the need to pass very much state information between consecutive calls. By avoiding the need to track a bio_pair structure, it also eliminates the problem described here: http://tracker.newdream.net/issues/2933 Note that a block request (and therefore the complete length of a bio chain processed in rbd_rq_fn()) is an unsigned int, while the result of rbd_segment_length() is u64. This change makes this range trunctation explicit, and trips a bug if the the segment boundary is too far off. Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'drivers/block')
-rw-r--r--drivers/block/rbd.c231
1 files changed, 152 insertions, 79 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index c800047f5835..cc06c55875b9 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -826,77 +826,144 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
826} 826}
827 827
828/* 828/*
829 * bio_chain_clone - clone a chain of bios up to a certain length. 829 * Clone a portion of a bio, starting at the given byte offset
830 * might return a bio_pair that will need to be released. 830 * and continuing for the number of bytes indicated.
831 */ 831 */
832static struct bio *bio_chain_clone(struct bio **old, struct bio **next, 832static struct bio *bio_clone_range(struct bio *bio_src,
833 struct bio_pair **bp, 833 unsigned int offset,
834 int len, gfp_t gfpmask) 834 unsigned int len,
835{ 835 gfp_t gfpmask)
836 struct bio *old_chain = *old; 836{
837 struct bio *new_chain = NULL; 837 struct bio_vec *bv;
838 struct bio *tail; 838 unsigned int resid;
839 int total = 0; 839 unsigned short idx;
840 840 unsigned int voff;
841 if (*bp) { 841 unsigned short end_idx;
842 bio_pair_release(*bp); 842 unsigned short vcnt;
843 *bp = NULL; 843 struct bio *bio;
844 }
845 844
846 while (old_chain && (total < len)) { 845 /* Handle the easy case for the caller */
847 struct bio *tmp;
848 846
849 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs); 847 if (!offset && len == bio_src->bi_size)
850 if (!tmp) 848 return bio_clone(bio_src, gfpmask);
851 goto err_out;
852 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
853 849
854 if (total + old_chain->bi_size > len) { 850 if (WARN_ON_ONCE(!len))
855 struct bio_pair *bp; 851 return NULL;
852 if (WARN_ON_ONCE(len > bio_src->bi_size))
853 return NULL;
854 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
855 return NULL;
856 856
857 /* 857 /* Find first affected segment... */
858 * this split can only happen with a single paged bio,
859 * split_bio will BUG_ON if this is not the case
860 */
861 dout("bio_chain_clone split! total=%d remaining=%d"
862 "bi_size=%u\n",
863 total, len - total, old_chain->bi_size);
864 858
865 /* split the bio. We'll release it either in the next 859 resid = offset;
866 call, or it will have to be released outside */ 860 __bio_for_each_segment(bv, bio_src, idx, 0) {
867 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE); 861 if (resid < bv->bv_len)
868 if (!bp) 862 break;
869 goto err_out; 863 resid -= bv->bv_len;
864 }
865 voff = resid;
870 866
871 __bio_clone(tmp, &bp->bio1); 867 /* ...and the last affected segment */
872 868
873 *next = &bp->bio2; 869 resid += len;
874 } else { 870 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
875 __bio_clone(tmp, old_chain); 871 if (resid <= bv->bv_len)
876 *next = old_chain->bi_next; 872 break;
877 } 873 resid -= bv->bv_len;
874 }
875 vcnt = end_idx - idx + 1;
876
877 /* Build the clone */
878
879 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
880 if (!bio)
881 return NULL; /* ENOMEM */
878 882
879 tmp->bi_bdev = NULL; 883 bio->bi_bdev = bio_src->bi_bdev;
880 tmp->bi_next = NULL; 884 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
881 if (new_chain) 885 bio->bi_rw = bio_src->bi_rw;
882 tail->bi_next = tmp; 886 bio->bi_flags |= 1 << BIO_CLONED;
883 else
884 new_chain = tmp;
885 tail = tmp;
886 old_chain = old_chain->bi_next;
887 887
888 total += tmp->bi_size; 888 /*
889 * Copy over our part of the bio_vec, then update the first
890 * and last (or only) entries.
891 */
892 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
893 vcnt * sizeof (struct bio_vec));
894 bio->bi_io_vec[0].bv_offset += voff;
895 if (vcnt > 1) {
896 bio->bi_io_vec[0].bv_len -= voff;
897 bio->bi_io_vec[vcnt - 1].bv_len = resid;
898 } else {
899 bio->bi_io_vec[0].bv_len = len;
889 } 900 }
890 901
891 rbd_assert(total == len); 902 bio->bi_vcnt = vcnt;
903 bio->bi_size = len;
904 bio->bi_idx = 0;
905
906 return bio;
907}
908
909/*
910 * Clone a portion of a bio chain, starting at the given byte offset
911 * into the first bio in the source chain and continuing for the
912 * number of bytes indicated. The result is another bio chain of
913 * exactly the given length, or a null pointer on error.
914 *
915 * The bio_src and offset parameters are both in-out. On entry they
916 * refer to the first source bio and the offset into that bio where
917 * the start of data to be cloned is located.
918 *
919 * On return, bio_src is updated to refer to the bio in the source
920 * chain that contains first un-cloned byte, and *offset will
921 * contain the offset of that byte within that bio.
922 */
923static struct bio *bio_chain_clone_range(struct bio **bio_src,
924 unsigned int *offset,
925 unsigned int len,
926 gfp_t gfpmask)
927{
928 struct bio *bi = *bio_src;
929 unsigned int off = *offset;
930 struct bio *chain = NULL;
931 struct bio **end;
932
933 /* Build up a chain of clone bios up to the limit */
934
935 if (!bi || off >= bi->bi_size || !len)
936 return NULL; /* Nothing to clone */
892 937
893 *old = old_chain; 938 end = &chain;
939 while (len) {
940 unsigned int bi_size;
941 struct bio *bio;
942
943 if (!bi)
944 goto out_err; /* EINVAL; ran out of bio's */
945 bi_size = min_t(unsigned int, bi->bi_size - off, len);
946 bio = bio_clone_range(bi, off, bi_size, gfpmask);
947 if (!bio)
948 goto out_err; /* ENOMEM */
949
950 *end = bio;
951 end = &bio->bi_next;
952
953 off += bi_size;
954 if (off == bi->bi_size) {
955 bi = bi->bi_next;
956 off = 0;
957 }
958 len -= bi_size;
959 }
960 *bio_src = bi;
961 *offset = off;
894 962
895 return new_chain; 963 return chain;
964out_err:
965 bio_chain_put(chain);
896 966
897err_out:
898 dout("bio_chain_clone with err\n");
899 bio_chain_put(new_chain);
900 return NULL; 967 return NULL;
901} 968}
902 969
@@ -1014,8 +1081,9 @@ static int rbd_do_request(struct request *rq,
1014 req_data->coll_index = coll_index; 1081 req_data->coll_index = coll_index;
1015 } 1082 }
1016 1083
1017 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name, 1084 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1018 (unsigned long long) ofs, (unsigned long long) len); 1085 object_name, (unsigned long long) ofs,
1086 (unsigned long long) len, coll, coll_index);
1019 1087
1020 osdc = &rbd_dev->rbd_client->client->osdc; 1088 osdc = &rbd_dev->rbd_client->client->osdc;
1021 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops, 1089 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
@@ -1463,18 +1531,16 @@ static void rbd_rq_fn(struct request_queue *q)
1463{ 1531{
1464 struct rbd_device *rbd_dev = q->queuedata; 1532 struct rbd_device *rbd_dev = q->queuedata;
1465 struct request *rq; 1533 struct request *rq;
1466 struct bio_pair *bp = NULL;
1467 1534
1468 while ((rq = blk_fetch_request(q))) { 1535 while ((rq = blk_fetch_request(q))) {
1469 struct bio *bio; 1536 struct bio *bio;
1470 struct bio *rq_bio, *next_bio = NULL;
1471 bool do_write; 1537 bool do_write;
1472 unsigned int size; 1538 unsigned int size;
1473 u64 op_size = 0;
1474 u64 ofs; 1539 u64 ofs;
1475 int num_segs, cur_seg = 0; 1540 int num_segs, cur_seg = 0;
1476 struct rbd_req_coll *coll; 1541 struct rbd_req_coll *coll;
1477 struct ceph_snap_context *snapc; 1542 struct ceph_snap_context *snapc;
1543 unsigned int bio_offset;
1478 1544
1479 dout("fetched request\n"); 1545 dout("fetched request\n");
1480 1546
@@ -1486,10 +1552,6 @@ static void rbd_rq_fn(struct request_queue *q)
1486 1552
1487 /* deduce our operation (read, write) */ 1553 /* deduce our operation (read, write) */
1488 do_write = (rq_data_dir(rq) == WRITE); 1554 do_write = (rq_data_dir(rq) == WRITE);
1489
1490 size = blk_rq_bytes(rq);
1491 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1492 rq_bio = rq->bio;
1493 if (do_write && rbd_dev->mapping.read_only) { 1555 if (do_write && rbd_dev->mapping.read_only) {
1494 __blk_end_request_all(rq, -EROFS); 1556 __blk_end_request_all(rq, -EROFS);
1495 continue; 1557 continue;
@@ -1512,6 +1574,10 @@ static void rbd_rq_fn(struct request_queue *q)
1512 1574
1513 up_read(&rbd_dev->header_rwsem); 1575 up_read(&rbd_dev->header_rwsem);
1514 1576
1577 size = blk_rq_bytes(rq);
1578 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1579 bio = rq->bio;
1580
1515 dout("%s 0x%x bytes at 0x%llx\n", 1581 dout("%s 0x%x bytes at 0x%llx\n",
1516 do_write ? "write" : "read", 1582 do_write ? "write" : "read",
1517 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE); 1583 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
@@ -1531,30 +1597,37 @@ static void rbd_rq_fn(struct request_queue *q)
1531 continue; 1597 continue;
1532 } 1598 }
1533 1599
1600 bio_offset = 0;
1534 do { 1601 do {
1535 /* a bio clone to be passed down to OSD req */ 1602 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1603 unsigned int chain_size;
1604 struct bio *bio_chain;
1605
1606 BUG_ON(limit > (u64) UINT_MAX);
1607 chain_size = (unsigned int) limit;
1536 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt); 1608 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1537 op_size = rbd_segment_length(rbd_dev, ofs, size); 1609
1538 kref_get(&coll->kref); 1610 kref_get(&coll->kref);
1539 bio = bio_chain_clone(&rq_bio, &next_bio, &bp, 1611
1540 op_size, GFP_ATOMIC); 1612 /* Pass a cloned bio chain via an osd request */
1541 if (bio) 1613
1614 bio_chain = bio_chain_clone_range(&bio,
1615 &bio_offset, chain_size,
1616 GFP_ATOMIC);
1617 if (bio_chain)
1542 (void) rbd_do_op(rq, rbd_dev, snapc, 1618 (void) rbd_do_op(rq, rbd_dev, snapc,
1543 ofs, op_size, 1619 ofs, chain_size,
1544 bio, coll, cur_seg); 1620 bio_chain, coll, cur_seg);
1545 else 1621 else
1546 rbd_coll_end_req_index(rq, coll, cur_seg, 1622 rbd_coll_end_req_index(rq, coll, cur_seg,
1547 -ENOMEM, op_size); 1623 -ENOMEM, chain_size);
1548 size -= op_size; 1624 size -= chain_size;
1549 ofs += op_size; 1625 ofs += chain_size;
1550 1626
1551 cur_seg++; 1627 cur_seg++;
1552 rq_bio = next_bio;
1553 } while (size > 0); 1628 } while (size > 0);
1554 kref_put(&coll->kref, rbd_coll_release); 1629 kref_put(&coll->kref, rbd_coll_release);
1555 1630
1556 if (bp)
1557 bio_pair_release(bp);
1558 spin_lock_irq(q->queue_lock); 1631 spin_lock_irq(q->queue_lock);
1559 1632
1560 ceph_put_snap_context(snapc); 1633 ceph_put_snap_context(snapc);
@@ -1564,7 +1637,7 @@ static void rbd_rq_fn(struct request_queue *q)
1564/* 1637/*
1565 * a queue callback. Makes sure that we don't create a bio that spans across 1638 * a queue callback. Makes sure that we don't create a bio that spans across
1566 * multiple osd objects. One exception would be with a single page bios, 1639 * multiple osd objects. One exception would be with a single page bios,
1567 * which we handle later at bio_chain_clone 1640 * which we handle later at bio_chain_clone_range()
1568 */ 1641 */
1569static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, 1642static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1570 struct bio_vec *bvec) 1643 struct bio_vec *bvec)