aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--drivers/block/rbd.c171
1 files changed, 151 insertions, 20 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 2146cab1c61b..9712fad82bc6 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -92,6 +92,8 @@ struct rbd_client {
92 struct list_head node; 92 struct list_head node;
93}; 93};
94 94
95struct rbd_req_coll;
96
95/* 97/*
96 * a single io request 98 * a single io request
97 */ 99 */
@@ -100,6 +102,24 @@ struct rbd_request {
100 struct bio *bio; /* cloned bio */ 102 struct bio *bio; /* cloned bio */
101 struct page **pages; /* list of used pages */ 103 struct page **pages; /* list of used pages */
102 u64 len; 104 u64 len;
105 int coll_index;
106 struct rbd_req_coll *coll;
107};
108
109struct rbd_req_status {
110 int done;
111 int rc;
112 u64 bytes;
113};
114
115/*
116 * a collection of requests
117 */
118struct rbd_req_coll {
119 int total;
120 int num_done;
121 struct kref kref;
122 struct rbd_req_status status[0];
103}; 123};
104 124
105struct rbd_snap { 125struct rbd_snap {
@@ -416,6 +436,17 @@ static void rbd_put_client(struct rbd_device *rbd_dev)
416 rbd_dev->client = NULL; 436 rbd_dev->client = NULL;
417} 437}
418 438
439/*
440 * Destroy requests collection
441 */
442static void rbd_coll_release(struct kref *kref)
443{
444 struct rbd_req_coll *coll =
445 container_of(kref, struct rbd_req_coll, kref);
446
447 dout("rbd_coll_release %p\n", coll);
448 kfree(coll);
449}
419 450
420/* 451/*
421 * Create a new header structure, translate header format from the on-disk 452 * Create a new header structure, translate header format from the on-disk
@@ -590,6 +621,14 @@ static u64 rbd_get_segment(struct rbd_image_header *header,
590 return len; 621 return len;
591} 622}
592 623
624static int rbd_get_num_segments(struct rbd_image_header *header,
625 u64 ofs, u64 len)
626{
627 u64 start_seg = ofs >> header->obj_order;
628 u64 end_seg = (ofs + len - 1) >> header->obj_order;
629 return end_seg - start_seg + 1;
630}
631
593/* 632/*
594 * bio helpers 633 * bio helpers
595 */ 634 */
@@ -735,6 +774,50 @@ static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
735 kfree(ops); 774 kfree(ops);
736} 775}
737 776
777static void rbd_coll_end_req_index(struct request *rq,
778 struct rbd_req_coll *coll,
779 int index,
780 int ret, u64 len)
781{
782 struct request_queue *q;
783 int min, max, i;
784
785 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
786 coll, index, ret, len);
787
788 if (!rq)
789 return;
790
791 if (!coll) {
792 blk_end_request(rq, ret, len);
793 return;
794 }
795
796 q = rq->q;
797
798 spin_lock_irq(q->queue_lock);
799 coll->status[index].done = 1;
800 coll->status[index].rc = ret;
801 coll->status[index].bytes = len;
802 max = min = coll->num_done;
803 while (max < coll->total && coll->status[max].done)
804 max++;
805
806 for (i = min; i<max; i++) {
807 __blk_end_request(rq, coll->status[i].rc,
808 coll->status[i].bytes);
809 coll->num_done++;
810 kref_put(&coll->kref, rbd_coll_release);
811 }
812 spin_unlock_irq(q->queue_lock);
813}
814
815static void rbd_coll_end_req(struct rbd_request *req,
816 int ret, u64 len)
817{
818 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
819}
820
738/* 821/*
739 * Send ceph osd request 822 * Send ceph osd request
740 */ 823 */
@@ -749,6 +832,8 @@ static int rbd_do_request(struct request *rq,
749 int flags, 832 int flags,
750 struct ceph_osd_req_op *ops, 833 struct ceph_osd_req_op *ops,
751 int num_reply, 834 int num_reply,
835 struct rbd_req_coll *coll,
836 int coll_index,
752 void (*rbd_cb)(struct ceph_osd_request *req, 837 void (*rbd_cb)(struct ceph_osd_request *req,
753 struct ceph_msg *msg), 838 struct ceph_msg *msg),
754 struct ceph_osd_request **linger_req, 839 struct ceph_osd_request **linger_req,
@@ -763,12 +848,20 @@ static int rbd_do_request(struct request *rq,
763 struct ceph_osd_request_head *reqhead; 848 struct ceph_osd_request_head *reqhead;
764 struct rbd_image_header *header = &dev->header; 849 struct rbd_image_header *header = &dev->header;
765 850
766 ret = -ENOMEM;
767 req_data = kzalloc(sizeof(*req_data), GFP_NOIO); 851 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
768 if (!req_data) 852 if (!req_data) {
769 goto done; 853 if (coll)
854 rbd_coll_end_req_index(rq, coll, coll_index,
855 -ENOMEM, len);
856 return -ENOMEM;
857 }
858
859 if (coll) {
860 req_data->coll = coll;
861 req_data->coll_index = coll_index;
862 }
770 863
771 dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs); 864 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
772 865
773 down_read(&header->snap_rwsem); 866 down_read(&header->snap_rwsem);
774 867
@@ -828,7 +921,8 @@ static int rbd_do_request(struct request *rq,
828 ret = ceph_osdc_wait_request(&dev->client->osdc, req); 921 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
829 if (ver) 922 if (ver)
830 *ver = le64_to_cpu(req->r_reassert_version.version); 923 *ver = le64_to_cpu(req->r_reassert_version.version);
831 dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version)); 924 dout("reassert_ver=%lld\n",
925 le64_to_cpu(req->r_reassert_version.version));
832 ceph_osdc_put_request(req); 926 ceph_osdc_put_request(req);
833 } 927 }
834 return ret; 928 return ret;
@@ -837,10 +931,8 @@ done_err:
837 bio_chain_put(req_data->bio); 931 bio_chain_put(req_data->bio);
838 ceph_osdc_put_request(req); 932 ceph_osdc_put_request(req);
839done_pages: 933done_pages:
934 rbd_coll_end_req(req_data, ret, len);
840 kfree(req_data); 935 kfree(req_data);
841done:
842 if (rq)
843 blk_end_request(rq, ret, len);
844 return ret; 936 return ret;
845} 937}
846 938
@@ -874,7 +966,7 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
874 bytes = req_data->len; 966 bytes = req_data->len;
875 } 967 }
876 968
877 blk_end_request(req_data->rq, rc, bytes); 969 rbd_coll_end_req(req_data, rc, bytes);
878 970
879 if (req_data->bio) 971 if (req_data->bio)
880 bio_chain_put(req_data->bio); 972 bio_chain_put(req_data->bio);
@@ -934,6 +1026,7 @@ static int rbd_req_sync_op(struct rbd_device *dev,
934 flags, 1026 flags,
935 ops, 1027 ops,
936 2, 1028 2,
1029 NULL, 0,
937 NULL, 1030 NULL,
938 linger_req, ver); 1031 linger_req, ver);
939 if (ret < 0) 1032 if (ret < 0)
@@ -959,7 +1052,9 @@ static int rbd_do_op(struct request *rq,
959 u64 snapid, 1052 u64 snapid,
960 int opcode, int flags, int num_reply, 1053 int opcode, int flags, int num_reply,
961 u64 ofs, u64 len, 1054 u64 ofs, u64 len,
962 struct bio *bio) 1055 struct bio *bio,
1056 struct rbd_req_coll *coll,
1057 int coll_index)
963{ 1058{
964 char *seg_name; 1059 char *seg_name;
965 u64 seg_ofs; 1060 u64 seg_ofs;
@@ -995,6 +1090,7 @@ static int rbd_do_op(struct request *rq,
995 flags, 1090 flags,
996 ops, 1091 ops,
997 num_reply, 1092 num_reply,
1093 coll, coll_index,
998 rbd_req_cb, 0, NULL); 1094 rbd_req_cb, 0, NULL);
999 1095
1000 rbd_destroy_ops(ops); 1096 rbd_destroy_ops(ops);
@@ -1010,13 +1106,15 @@ static int rbd_req_write(struct request *rq,
1010 struct rbd_device *rbd_dev, 1106 struct rbd_device *rbd_dev,
1011 struct ceph_snap_context *snapc, 1107 struct ceph_snap_context *snapc,
1012 u64 ofs, u64 len, 1108 u64 ofs, u64 len,
1013 struct bio *bio) 1109 struct bio *bio,
1110 struct rbd_req_coll *coll,
1111 int coll_index)
1014{ 1112{
1015 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP, 1113 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1016 CEPH_OSD_OP_WRITE, 1114 CEPH_OSD_OP_WRITE,
1017 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1115 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1018 2, 1116 2,
1019 ofs, len, bio); 1117 ofs, len, bio, coll, coll_index);
1020} 1118}
1021 1119
1022/* 1120/*
@@ -1026,14 +1124,16 @@ static int rbd_req_read(struct request *rq,
1026 struct rbd_device *rbd_dev, 1124 struct rbd_device *rbd_dev,
1027 u64 snapid, 1125 u64 snapid,
1028 u64 ofs, u64 len, 1126 u64 ofs, u64 len,
1029 struct bio *bio) 1127 struct bio *bio,
1128 struct rbd_req_coll *coll,
1129 int coll_index)
1030{ 1130{
1031 return rbd_do_op(rq, rbd_dev, NULL, 1131 return rbd_do_op(rq, rbd_dev, NULL,
1032 (snapid ? snapid : CEPH_NOSNAP), 1132 (snapid ? snapid : CEPH_NOSNAP),
1033 CEPH_OSD_OP_READ, 1133 CEPH_OSD_OP_READ,
1034 CEPH_OSD_FLAG_READ, 1134 CEPH_OSD_FLAG_READ,
1035 2, 1135 2,
1036 ofs, len, bio); 1136 ofs, len, bio, coll, coll_index);
1037} 1137}
1038 1138
1039/* 1139/*
@@ -1081,6 +1181,7 @@ static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1081 CEPH_OSD_FLAG_READ, 1181 CEPH_OSD_FLAG_READ,
1082 ops, 1182 ops,
1083 1, 1183 1,
1184 NULL, 0,
1084 rbd_simple_req_cb, 0, NULL); 1185 rbd_simple_req_cb, 0, NULL);
1085 1186
1086 rbd_destroy_ops(ops); 1187 rbd_destroy_ops(ops);
@@ -1278,6 +1379,20 @@ static int rbd_req_sync_exec(struct rbd_device *dev,
1278 return ret; 1379 return ret;
1279} 1380}
1280 1381
1382static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1383{
1384 struct rbd_req_coll *coll =
1385 kzalloc(sizeof(struct rbd_req_coll) +
1386 sizeof(struct rbd_req_status) * num_reqs,
1387 GFP_ATOMIC);
1388
1389 if (!coll)
1390 return NULL;
1391 coll->total = num_reqs;
1392 kref_init(&coll->kref);
1393 return coll;
1394}
1395
1281/* 1396/*
1282 * block device queue callback 1397 * block device queue callback
1283 */ 1398 */
@@ -1295,6 +1410,8 @@ static void rbd_rq_fn(struct request_queue *q)
1295 bool do_write; 1410 bool do_write;
1296 int size, op_size = 0; 1411 int size, op_size = 0;
1297 u64 ofs; 1412 u64 ofs;
1413 int num_segs, cur_seg = 0;
1414 struct rbd_req_coll *coll;
1298 1415
1299 /* peek at request from block layer */ 1416 /* peek at request from block layer */
1300 if (!rq) 1417 if (!rq)
@@ -1325,6 +1442,14 @@ static void rbd_rq_fn(struct request_queue *q)
1325 do_write ? "write" : "read", 1442 do_write ? "write" : "read",
1326 size, blk_rq_pos(rq) * 512ULL); 1443 size, blk_rq_pos(rq) * 512ULL);
1327 1444
1445 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1446 coll = rbd_alloc_coll(num_segs);
1447 if (!coll) {
1448 spin_lock_irq(q->queue_lock);
1449 __blk_end_request_all(rq, -ENOMEM);
1450 goto next;
1451 }
1452
1328 do { 1453 do {
1329 /* a bio clone to be passed down to OSD req */ 1454 /* a bio clone to be passed down to OSD req */
1330 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt); 1455 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
@@ -1332,35 +1457,41 @@ static void rbd_rq_fn(struct request_queue *q)
1332 rbd_dev->header.block_name, 1457 rbd_dev->header.block_name,
1333 ofs, size, 1458 ofs, size,
1334 NULL, NULL); 1459 NULL, NULL);
1460 kref_get(&coll->kref);
1335 bio = bio_chain_clone(&rq_bio, &next_bio, &bp, 1461 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1336 op_size, GFP_ATOMIC); 1462 op_size, GFP_ATOMIC);
1337 if (!bio) { 1463 if (!bio) {
1338 spin_lock_irq(q->queue_lock); 1464 rbd_coll_end_req_index(rq, coll, cur_seg,
1339 __blk_end_request_all(rq, -ENOMEM); 1465 -ENOMEM, op_size);
1340 goto next; 1466 goto next_seg;
1341 } 1467 }
1342 1468
1469
1343 /* init OSD command: write or read */ 1470 /* init OSD command: write or read */
1344 if (do_write) 1471 if (do_write)
1345 rbd_req_write(rq, rbd_dev, 1472 rbd_req_write(rq, rbd_dev,
1346 rbd_dev->header.snapc, 1473 rbd_dev->header.snapc,
1347 ofs, 1474 ofs,
1348 op_size, bio); 1475 op_size, bio,
1476 coll, cur_seg);
1349 else 1477 else
1350 rbd_req_read(rq, rbd_dev, 1478 rbd_req_read(rq, rbd_dev,
1351 cur_snap_id(rbd_dev), 1479 cur_snap_id(rbd_dev),
1352 ofs, 1480 ofs,
1353 op_size, bio); 1481 op_size, bio,
1482 coll, cur_seg);
1354 1483
1484next_seg:
1355 size -= op_size; 1485 size -= op_size;
1356 ofs += op_size; 1486 ofs += op_size;
1357 1487
1488 cur_seg++;
1358 rq_bio = next_bio; 1489 rq_bio = next_bio;
1359 } while (size > 0); 1490 } while (size > 0);
1491 kref_put(&coll->kref, rbd_coll_release);
1360 1492
1361 if (bp) 1493 if (bp)
1362 bio_pair_release(bp); 1494 bio_pair_release(bp);
1363
1364 spin_lock_irq(q->queue_lock); 1495 spin_lock_irq(q->queue_lock);
1365next: 1496next:
1366 rq = blk_fetch_request(q); 1497 rq = blk_fetch_request(q);