aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--drivers/block/rbd.c319
1 files changed, 0 insertions, 319 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index daa0f18f7089..4c175a7d3f7e 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -621,18 +621,6 @@ static void rbd_put_client(struct rbd_client *rbdc)
621 kref_put(&rbdc->kref, rbd_client_release); 621 kref_put(&rbdc->kref, rbd_client_release);
622} 622}
623 623
624/*
625 * Destroy requests collection
626 */
627static void rbd_coll_release(struct kref *kref)
628{
629 struct rbd_req_coll *coll =
630 container_of(kref, struct rbd_req_coll, kref);
631
632 dout("rbd_coll_release %p\n", coll);
633 kfree(coll);
634}
635
636static bool rbd_image_format_valid(u32 image_format) 624static bool rbd_image_format_valid(u32 image_format)
637{ 625{
638 return image_format == 1 || image_format == 2; 626 return image_format == 1 || image_format == 2;
@@ -876,28 +864,6 @@ static u64 rbd_segment_length(struct rbd_device *rbd_dev,
876 return length; 864 return length;
877} 865}
878 866
879static int rbd_get_num_segments(struct rbd_image_header *header,
880 u64 ofs, u64 len)
881{
882 u64 start_seg;
883 u64 end_seg;
884 u64 result;
885
886 if (!len)
887 return 0;
888 if (len - 1 > U64_MAX - ofs)
889 return -ERANGE;
890
891 start_seg = ofs >> header->obj_order;
892 end_seg = (ofs + len - 1) >> header->obj_order;
893
894 result = end_seg - start_seg + 1;
895 if (result > (u64) INT_MAX)
896 return -ERANGE;
897
898 return (int) result;
899}
900
901/* 867/*
902 * returns the size of an object in the image 868 * returns the size of an object in the image
903 */ 869 */
@@ -1216,52 +1182,6 @@ static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1216 kfree(op); 1182 kfree(op);
1217} 1183}
1218 1184
1219static void rbd_coll_end_req_index(struct request *rq,
1220 struct rbd_req_coll *coll,
1221 int index,
1222 s32 ret, u64 len)
1223{
1224 struct request_queue *q;
1225 int min, max, i;
1226
1227 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1228 coll, index, (int)ret, (unsigned long long)len);
1229
1230 if (!rq)
1231 return;
1232
1233 if (!coll) {
1234 blk_end_request(rq, ret, len);
1235 return;
1236 }
1237
1238 q = rq->q;
1239
1240 spin_lock_irq(q->queue_lock);
1241 coll->status[index].done = 1;
1242 coll->status[index].rc = ret;
1243 coll->status[index].bytes = len;
1244 max = min = coll->num_done;
1245 while (max < coll->total && coll->status[max].done)
1246 max++;
1247
1248 for (i = min; i<max; i++) {
1249 __blk_end_request(rq, (int)coll->status[i].rc,
1250 coll->status[i].bytes);
1251 coll->num_done++;
1252 kref_put(&coll->kref, rbd_coll_release);
1253 }
1254 spin_unlock_irq(q->queue_lock);
1255}
1256
1257static void rbd_coll_end_req(struct rbd_request *rbd_req,
1258 s32 ret, u64 len)
1259{
1260 rbd_coll_end_req_index(rbd_req->rq,
1261 rbd_req->coll, rbd_req->coll_index,
1262 ret, len);
1263}
1264
1265/* 1185/*
1266 * Send ceph osd request 1186 * Send ceph osd request
1267 */ 1187 */
@@ -1361,46 +1281,6 @@ done_osd_req:
1361 return ret; 1281 return ret;
1362} 1282}
1363 1283
1364/*
1365 * Ceph osd op callback
1366 */
1367static void rbd_req_cb(struct ceph_osd_request *osd_req, struct ceph_msg *msg)
1368{
1369 struct rbd_request *rbd_req = osd_req->r_priv;
1370 struct ceph_osd_reply_head *replyhead;
1371 struct ceph_osd_op *op;
1372 s32 rc;
1373 u64 bytes;
1374 int read_op;
1375
1376 /* parse reply */
1377 replyhead = msg->front.iov_base;
1378 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1379 op = (void *)(replyhead + 1);
1380 rc = (s32)le32_to_cpu(replyhead->result);
1381 bytes = le64_to_cpu(op->extent.length);
1382 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1383
1384 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1385 (unsigned long long) bytes, read_op, (int) rc);
1386
1387 if (rc == (s32)-ENOENT && read_op) {
1388 zero_bio_chain(rbd_req->bio, 0);
1389 rc = 0;
1390 } else if (rc == 0 && read_op && bytes < rbd_req->len) {
1391 zero_bio_chain(rbd_req->bio, bytes);
1392 bytes = rbd_req->len;
1393 }
1394
1395 rbd_coll_end_req(rbd_req, rc, bytes);
1396
1397 if (rbd_req->bio)
1398 bio_chain_put(rbd_req->bio);
1399
1400 ceph_osdc_put_request(osd_req);
1401 kfree(rbd_req);
1402}
1403
1404static void rbd_simple_req_cb(struct ceph_osd_request *osd_req, 1284static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
1405 struct ceph_msg *msg) 1285 struct ceph_msg *msg)
1406{ 1286{
@@ -1448,70 +1328,6 @@ done:
1448 return ret; 1328 return ret;
1449} 1329}
1450 1330
1451/*
1452 * Do an asynchronous ceph osd operation
1453 */
1454static int rbd_do_op(struct request *rq,
1455 struct rbd_device *rbd_dev,
1456 struct ceph_snap_context *snapc,
1457 u64 ofs, u64 len,
1458 struct bio *bio,
1459 struct rbd_req_coll *coll,
1460 int coll_index)
1461{
1462 const char *seg_name;
1463 u64 seg_ofs;
1464 u64 seg_len;
1465 int ret;
1466 struct ceph_osd_req_op *op;
1467 int opcode;
1468 int flags;
1469 u64 snapid;
1470
1471 seg_name = rbd_segment_name(rbd_dev, ofs);
1472 if (!seg_name)
1473 return -ENOMEM;
1474 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1475 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1476
1477 if (rq_data_dir(rq) == WRITE) {
1478 opcode = CEPH_OSD_OP_WRITE;
1479 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1480 snapid = CEPH_NOSNAP;
1481 } else {
1482 opcode = CEPH_OSD_OP_READ;
1483 flags = CEPH_OSD_FLAG_READ;
1484 rbd_assert(!snapc);
1485 snapid = rbd_dev->spec->snap_id;
1486 }
1487
1488 ret = -ENOMEM;
1489 op = rbd_osd_req_op_create(opcode, seg_ofs, seg_len);
1490 if (!op)
1491 goto done;
1492
1493 /* we've taken care of segment sizes earlier when we
1494 cloned the bios. We should never have a segment
1495 truncated at this point */
1496 rbd_assert(seg_len == len);
1497
1498 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1499 seg_name, seg_ofs, seg_len,
1500 bio,
1501 NULL, 0,
1502 flags,
1503 op,
1504 coll, coll_index,
1505 rbd_req_cb, NULL);
1506 if (ret < 0)
1507 rbd_coll_end_req_index(rq, coll, coll_index,
1508 (s32)ret, seg_len);
1509 rbd_osd_req_op_destroy(op);
1510done:
1511 kfree(seg_name);
1512 return ret;
1513}
1514
1515static int rbd_obj_request_submit(struct ceph_osd_client *osdc, 1331static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1516 struct rbd_obj_request *obj_request) 1332 struct rbd_obj_request *obj_request)
1517{ 1333{
@@ -1683,78 +1499,6 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1683 return ret; 1499 return ret;
1684} 1500}
1685 1501
1686static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1687{
1688 struct rbd_req_coll *coll =
1689 kzalloc(sizeof(struct rbd_req_coll) +
1690 sizeof(struct rbd_req_status) * num_reqs,
1691 GFP_ATOMIC);
1692
1693 if (!coll)
1694 return NULL;
1695 coll->total = num_reqs;
1696 kref_init(&coll->kref);
1697 return coll;
1698}
1699
1700static int rbd_dev_do_request(struct request *rq,
1701 struct rbd_device *rbd_dev,
1702 struct ceph_snap_context *snapc,
1703 u64 ofs, unsigned int size,
1704 struct bio *bio_chain)
1705{
1706 int num_segs;
1707 struct rbd_req_coll *coll;
1708 unsigned int bio_offset;
1709 int cur_seg = 0;
1710
1711 dout("%s 0x%x bytes at 0x%llx\n",
1712 rq_data_dir(rq) == WRITE ? "write" : "read",
1713 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1714
1715 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1716 if (num_segs <= 0)
1717 return num_segs;
1718
1719 coll = rbd_alloc_coll(num_segs);
1720 if (!coll)
1721 return -ENOMEM;
1722
1723 bio_offset = 0;
1724 do {
1725 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1726 unsigned int clone_size;
1727 struct bio *bio_clone;
1728
1729 BUG_ON(limit > (u64)UINT_MAX);
1730 clone_size = (unsigned int)limit;
1731 dout("bio_chain->bi_vcnt=%hu\n", bio_chain->bi_vcnt);
1732
1733 kref_get(&coll->kref);
1734
1735 /* Pass a cloned bio chain via an osd request */
1736
1737 bio_clone = bio_chain_clone_range(&bio_chain,
1738 &bio_offset, clone_size,
1739 GFP_ATOMIC);
1740 if (bio_clone)
1741 (void)rbd_do_op(rq, rbd_dev, snapc,
1742 ofs, clone_size,
1743 bio_clone, coll, cur_seg);
1744 else
1745 rbd_coll_end_req_index(rq, coll, cur_seg,
1746 (s32)-ENOMEM,
1747 clone_size);
1748 size -= clone_size;
1749 ofs += clone_size;
1750
1751 cur_seg++;
1752 } while (size > 0);
1753 kref_put(&coll->kref, rbd_coll_release);
1754
1755 return 0;
1756}
1757
1758static void rbd_osd_read_callback(struct rbd_obj_request *obj_request, 1502static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
1759 struct ceph_osd_op *op) 1503 struct ceph_osd_op *op)
1760{ 1504{
@@ -2236,68 +1980,6 @@ end_request:
2236} 1980}
2237 1981
2238/* 1982/*
2239 * block device queue callback
2240 */
2241static void rbd_rq_fn(struct request_queue *q)
2242{
2243 struct rbd_device *rbd_dev = q->queuedata;
2244 bool read_only = rbd_dev->mapping.read_only;
2245 struct request *rq;
2246
2247 while ((rq = blk_fetch_request(q))) {
2248 struct ceph_snap_context *snapc = NULL;
2249 unsigned int size = 0;
2250 int result;
2251
2252 dout("fetched request\n");
2253
2254 /* Filter out block requests we don't understand */
2255
2256 if ((rq->cmd_type != REQ_TYPE_FS)) {
2257 __blk_end_request_all(rq, 0);
2258 continue;
2259 }
2260 spin_unlock_irq(q->queue_lock);
2261
2262 /* Write requests need a reference to the snapshot context */
2263
2264 if (rq_data_dir(rq) == WRITE) {
2265 result = -EROFS;
2266 if (read_only) /* Can't write to a read-only device */
2267 goto out_end_request;
2268
2269 /*
2270 * Note that each osd request will take its
2271 * own reference to the snapshot context
2272 * supplied. The reference we take here
2273 * just guarantees the one we provide stays
2274 * valid.
2275 */
2276 down_read(&rbd_dev->header_rwsem);
2277 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
2278 up_read(&rbd_dev->header_rwsem);
2279 rbd_assert(snapc != NULL);
2280 } else if (!atomic_read(&rbd_dev->exists)) {
2281 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2282 dout("request for non-existent snapshot");
2283 result = -ENXIO;
2284 goto out_end_request;
2285 }
2286
2287 size = blk_rq_bytes(rq);
2288 result = rbd_dev_do_request(rq, rbd_dev, snapc,
2289 blk_rq_pos(rq) * SECTOR_SIZE,
2290 size, rq->bio);
2291out_end_request:
2292 if (snapc)
2293 ceph_put_snap_context(snapc);
2294 spin_lock_irq(q->queue_lock);
2295 if (!size || result < 0)
2296 __blk_end_request_all(rq, result);
2297 }
2298}
2299
2300/*
2301 * a queue callback. Makes sure that we don't create a bio that spans across 1983 * a queue callback. Makes sure that we don't create a bio that spans across
2302 * multiple osd objects. One exception would be with a single page bios, 1984 * multiple osd objects. One exception would be with a single page bios,
2303 * which we handle later at bio_chain_clone_range() 1985 * which we handle later at bio_chain_clone_range()
@@ -2546,7 +2228,6 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
2546 disk->fops = &rbd_bd_ops; 2228 disk->fops = &rbd_bd_ops;
2547 disk->private_data = rbd_dev; 2229 disk->private_data = rbd_dev;
2548 2230
2549 (void) rbd_rq_fn; /* avoid a warning */
2550 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock); 2231 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2551 if (!q) 2232 if (!q)
2552 goto out_disk; 2233 goto out_disk;