aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2011-05-13 16:52:56 -0400
committerSage Weil <sage@newdream.net>2011-05-13 16:52:57 -0400
commit1fec70932d867416ffe620dd17005f168cc84eb5 (patch)
tree921f08b555d2b71e7356034aae65bed4ed37e2df
parent11f770027b5c0de16544f3ec82b5c6f9f8d5a644 (diff)
rbd: fix split bio handling
The rbd driver currently splits bios when they span an object boundary. However, the blk_end_request expects the completions to roll up the results in block device order, and the split rbd/ceph ops can complete in any order. This patch adds a struct rbd_req_coll to track completion of split requests and ensures that the results are passed back up to the block layer in order. This fixes errors where the file system gets completion of a read operation that spans an object boundary before the data has actually arrived. The bug is easily reproduced with iozone with a working set larger than available RAM. Reported-by: Fyodor Ustinov <ufm@ufm.su> Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net> Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r--drivers/block/rbd.c171
1 files changed, 151 insertions, 20 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 2146cab1c61..9712fad82bc 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -92,6 +92,8 @@ struct rbd_client {
92 struct list_head node; 92 struct list_head node;
93}; 93};
94 94
95struct rbd_req_coll;
96
95/* 97/*
96 * a single io request 98 * a single io request
97 */ 99 */
@@ -100,6 +102,24 @@ struct rbd_request {
100 struct bio *bio; /* cloned bio */ 102 struct bio *bio; /* cloned bio */
101 struct page **pages; /* list of used pages */ 103 struct page **pages; /* list of used pages */
102 u64 len; 104 u64 len;
105 int coll_index;
106 struct rbd_req_coll *coll;
107};
108
109struct rbd_req_status {
110 int done;
111 int rc;
112 u64 bytes;
113};
114
115/*
116 * a collection of requests
117 */
118struct rbd_req_coll {
119 int total;
120 int num_done;
121 struct kref kref;
122 struct rbd_req_status status[0];
103}; 123};
104 124
105struct rbd_snap { 125struct rbd_snap {
@@ -416,6 +436,17 @@ static void rbd_put_client(struct rbd_device *rbd_dev)
416 rbd_dev->client = NULL; 436 rbd_dev->client = NULL;
417} 437}
418 438
439/*
440 * Destroy requests collection
441 */
442static void rbd_coll_release(struct kref *kref)
443{
444 struct rbd_req_coll *coll =
445 container_of(kref, struct rbd_req_coll, kref);
446
447 dout("rbd_coll_release %p\n", coll);
448 kfree(coll);
449}
419 450
420/* 451/*
421 * Create a new header structure, translate header format from the on-disk 452 * Create a new header structure, translate header format from the on-disk
@@ -590,6 +621,14 @@ static u64 rbd_get_segment(struct rbd_image_header *header,
590 return len; 621 return len;
591} 622}
592 623
624static int rbd_get_num_segments(struct rbd_image_header *header,
625 u64 ofs, u64 len)
626{
627 u64 start_seg = ofs >> header->obj_order;
628 u64 end_seg = (ofs + len - 1) >> header->obj_order;
629 return end_seg - start_seg + 1;
630}
631
593/* 632/*
594 * bio helpers 633 * bio helpers
595 */ 634 */
@@ -735,6 +774,50 @@ static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
735 kfree(ops); 774 kfree(ops);
736} 775}
737 776
777static void rbd_coll_end_req_index(struct request *rq,
778 struct rbd_req_coll *coll,
779 int index,
780 int ret, u64 len)
781{
782 struct request_queue *q;
783 int min, max, i;
784
785 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
786 coll, index, ret, len);
787
788 if (!rq)
789 return;
790
791 if (!coll) {
792 blk_end_request(rq, ret, len);
793 return;
794 }
795
796 q = rq->q;
797
798 spin_lock_irq(q->queue_lock);
799 coll->status[index].done = 1;
800 coll->status[index].rc = ret;
801 coll->status[index].bytes = len;
802 max = min = coll->num_done;
803 while (max < coll->total && coll->status[max].done)
804 max++;
805
806 for (i = min; i<max; i++) {
807 __blk_end_request(rq, coll->status[i].rc,
808 coll->status[i].bytes);
809 coll->num_done++;
810 kref_put(&coll->kref, rbd_coll_release);
811 }
812 spin_unlock_irq(q->queue_lock);
813}
814
815static void rbd_coll_end_req(struct rbd_request *req,
816 int ret, u64 len)
817{
818 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
819}
820
738/* 821/*
739 * Send ceph osd request 822 * Send ceph osd request
740 */ 823 */
@@ -749,6 +832,8 @@ static int rbd_do_request(struct request *rq,
749 int flags, 832 int flags,
750 struct ceph_osd_req_op *ops, 833 struct ceph_osd_req_op *ops,
751 int num_reply, 834 int num_reply,
835 struct rbd_req_coll *coll,
836 int coll_index,
752 void (*rbd_cb)(struct ceph_osd_request *req, 837 void (*rbd_cb)(struct ceph_osd_request *req,
753 struct ceph_msg *msg), 838 struct ceph_msg *msg),
754 struct ceph_osd_request **linger_req, 839 struct ceph_osd_request **linger_req,
@@ -763,12 +848,20 @@ static int rbd_do_request(struct request *rq,
763 struct ceph_osd_request_head *reqhead; 848 struct ceph_osd_request_head *reqhead;
764 struct rbd_image_header *header = &dev->header; 849 struct rbd_image_header *header = &dev->header;
765 850
766 ret = -ENOMEM;
767 req_data = kzalloc(sizeof(*req_data), GFP_NOIO); 851 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
768 if (!req_data) 852 if (!req_data) {
769 goto done; 853 if (coll)
854 rbd_coll_end_req_index(rq, coll, coll_index,
855 -ENOMEM, len);
856 return -ENOMEM;
857 }
858
859 if (coll) {
860 req_data->coll = coll;
861 req_data->coll_index = coll_index;
862 }
770 863
771 dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs); 864 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
772 865
773 down_read(&header->snap_rwsem); 866 down_read(&header->snap_rwsem);
774 867
@@ -828,7 +921,8 @@ static int rbd_do_request(struct request *rq,
828 ret = ceph_osdc_wait_request(&dev->client->osdc, req); 921 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
829 if (ver) 922 if (ver)
830 *ver = le64_to_cpu(req->r_reassert_version.version); 923 *ver = le64_to_cpu(req->r_reassert_version.version);
831 dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version)); 924 dout("reassert_ver=%lld\n",
925 le64_to_cpu(req->r_reassert_version.version));
832 ceph_osdc_put_request(req); 926 ceph_osdc_put_request(req);
833 } 927 }
834 return ret; 928 return ret;
@@ -837,10 +931,8 @@ done_err:
837 bio_chain_put(req_data->bio); 931 bio_chain_put(req_data->bio);
838 ceph_osdc_put_request(req); 932 ceph_osdc_put_request(req);
839done_pages: 933done_pages:
934 rbd_coll_end_req(req_data, ret, len);
840 kfree(req_data); 935 kfree(req_data);
841done:
842 if (rq)
843 blk_end_request(rq, ret, len);
844 return ret; 936 return ret;
845} 937}
846 938
@@ -874,7 +966,7 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
874 bytes = req_data->len; 966 bytes = req_data->len;
875 } 967 }
876 968
877 blk_end_request(req_data->rq, rc, bytes); 969 rbd_coll_end_req(req_data, rc, bytes);
878 970
879 if (req_data->bio) 971 if (req_data->bio)
880 bio_chain_put(req_data->bio); 972 bio_chain_put(req_data->bio);
@@ -934,6 +1026,7 @@ static int rbd_req_sync_op(struct rbd_device *dev,
934 flags, 1026 flags,
935 ops, 1027 ops,
936 2, 1028 2,
1029 NULL, 0,
937 NULL, 1030 NULL,
938 linger_req, ver); 1031 linger_req, ver);
939 if (ret < 0) 1032 if (ret < 0)
@@ -959,7 +1052,9 @@ static int rbd_do_op(struct request *rq,
959 u64 snapid, 1052 u64 snapid,
960 int opcode, int flags, int num_reply, 1053 int opcode, int flags, int num_reply,
961 u64 ofs, u64 len, 1054 u64 ofs, u64 len,
962 struct bio *bio) 1055 struct bio *bio,
1056 struct rbd_req_coll *coll,
1057 int coll_index)
963{ 1058{
964 char *seg_name; 1059 char *seg_name;
965 u64 seg_ofs; 1060 u64 seg_ofs;
@@ -995,6 +1090,7 @@ static int rbd_do_op(struct request *rq,
995 flags, 1090 flags,
996 ops, 1091 ops,
997 num_reply, 1092 num_reply,
1093 coll, coll_index,
998 rbd_req_cb, 0, NULL); 1094 rbd_req_cb, 0, NULL);
999 1095
1000 rbd_destroy_ops(ops); 1096 rbd_destroy_ops(ops);
@@ -1010,13 +1106,15 @@ static int rbd_req_write(struct request *rq,
1010 struct rbd_device *rbd_dev, 1106 struct rbd_device *rbd_dev,
1011 struct ceph_snap_context *snapc, 1107 struct ceph_snap_context *snapc,
1012 u64 ofs, u64 len, 1108 u64 ofs, u64 len,
1013 struct bio *bio) 1109 struct bio *bio,
1110 struct rbd_req_coll *coll,
1111 int coll_index)
1014{ 1112{
1015 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP, 1113 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1016 CEPH_OSD_OP_WRITE, 1114 CEPH_OSD_OP_WRITE,
1017 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1115 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1018 2, 1116 2,
1019 ofs, len, bio); 1117 ofs, len, bio, coll, coll_index);
1020} 1118}
1021 1119
1022/* 1120/*
@@ -1026,14 +1124,16 @@ static int rbd_req_read(struct request *rq,
1026 struct rbd_device *rbd_dev, 1124 struct rbd_device *rbd_dev,
1027 u64 snapid, 1125 u64 snapid,
1028 u64 ofs, u64 len, 1126 u64 ofs, u64 len,
1029 struct bio *bio) 1127 struct bio *bio,
1128 struct rbd_req_coll *coll,
1129 int coll_index)
1030{ 1130{
1031 return rbd_do_op(rq, rbd_dev, NULL, 1131 return rbd_do_op(rq, rbd_dev, NULL,
1032 (snapid ? snapid : CEPH_NOSNAP), 1132 (snapid ? snapid : CEPH_NOSNAP),
1033 CEPH_OSD_OP_READ, 1133 CEPH_OSD_OP_READ,
1034 CEPH_OSD_FLAG_READ, 1134 CEPH_OSD_FLAG_READ,
1035 2, 1135 2,
1036 ofs, len, bio); 1136 ofs, len, bio, coll, coll_index);
1037} 1137}
1038 1138
1039/* 1139/*
@@ -1081,6 +1181,7 @@ static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1081 CEPH_OSD_FLAG_READ, 1181 CEPH_OSD_FLAG_READ,
1082 ops, 1182 ops,
1083 1, 1183 1,
1184 NULL, 0,
1084 rbd_simple_req_cb, 0, NULL); 1185 rbd_simple_req_cb, 0, NULL);
1085 1186
1086 rbd_destroy_ops(ops); 1187 rbd_destroy_ops(ops);
@@ -1278,6 +1379,20 @@ static int rbd_req_sync_exec(struct rbd_device *dev,
1278 return ret; 1379 return ret;
1279} 1380}
1280 1381
1382static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1383{
1384 struct rbd_req_coll *coll =
1385 kzalloc(sizeof(struct rbd_req_coll) +
1386 sizeof(struct rbd_req_status) * num_reqs,
1387 GFP_ATOMIC);
1388
1389 if (!coll)
1390 return NULL;
1391 coll->total = num_reqs;
1392 kref_init(&coll->kref);
1393 return coll;
1394}
1395
1281/* 1396/*
1282 * block device queue callback 1397 * block device queue callback
1283 */ 1398 */
@@ -1295,6 +1410,8 @@ static void rbd_rq_fn(struct request_queue *q)
1295 bool do_write; 1410 bool do_write;
1296 int size, op_size = 0; 1411 int size, op_size = 0;
1297 u64 ofs; 1412 u64 ofs;
1413 int num_segs, cur_seg = 0;
1414 struct rbd_req_coll *coll;
1298 1415
1299 /* peek at request from block layer */ 1416 /* peek at request from block layer */
1300 if (!rq) 1417 if (!rq)
@@ -1325,6 +1442,14 @@ static void rbd_rq_fn(struct request_queue *q)
1325 do_write ? "write" : "read", 1442 do_write ? "write" : "read",
1326 size, blk_rq_pos(rq) * 512ULL); 1443 size, blk_rq_pos(rq) * 512ULL);
1327 1444
1445 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1446 coll = rbd_alloc_coll(num_segs);
1447 if (!coll) {
1448 spin_lock_irq(q->queue_lock);
1449 __blk_end_request_all(rq, -ENOMEM);
1450 goto next;
1451 }
1452
1328 do { 1453 do {
1329 /* a bio clone to be passed down to OSD req */ 1454 /* a bio clone to be passed down to OSD req */
1330 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt); 1455 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
@@ -1332,35 +1457,41 @@ static void rbd_rq_fn(struct request_queue *q)
1332 rbd_dev->header.block_name, 1457 rbd_dev->header.block_name,
1333 ofs, size, 1458 ofs, size,
1334 NULL, NULL); 1459 NULL, NULL);
1460 kref_get(&coll->kref);
1335 bio = bio_chain_clone(&rq_bio, &next_bio, &bp, 1461 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1336 op_size, GFP_ATOMIC); 1462 op_size, GFP_ATOMIC);
1337 if (!bio) { 1463 if (!bio) {
1338 spin_lock_irq(q->queue_lock); 1464 rbd_coll_end_req_index(rq, coll, cur_seg,
1339 __blk_end_request_all(rq, -ENOMEM); 1465 -ENOMEM, op_size);
1340 goto next; 1466 goto next_seg;
1341 } 1467 }
1342 1468
1469
1343 /* init OSD command: write or read */ 1470 /* init OSD command: write or read */
1344 if (do_write) 1471 if (do_write)
1345 rbd_req_write(rq, rbd_dev, 1472 rbd_req_write(rq, rbd_dev,
1346 rbd_dev->header.snapc, 1473 rbd_dev->header.snapc,
1347 ofs, 1474 ofs,
1348 op_size, bio); 1475 op_size, bio,
1476 coll, cur_seg);
1349 else 1477 else
1350 rbd_req_read(rq, rbd_dev, 1478 rbd_req_read(rq, rbd_dev,
1351 cur_snap_id(rbd_dev), 1479 cur_snap_id(rbd_dev),
1352 ofs, 1480 ofs,
1353 op_size, bio); 1481 op_size, bio,
1482 coll, cur_seg);
1354 1483
1484next_seg:
1355 size -= op_size; 1485 size -= op_size;
1356 ofs += op_size; 1486 ofs += op_size;
1357 1487
1488 cur_seg++;
1358 rq_bio = next_bio; 1489 rq_bio = next_bio;
1359 } while (size > 0); 1490 } while (size > 0);
1491 kref_put(&coll->kref, rbd_coll_release);
1360 1492
1361 if (bp) 1493 if (bp)
1362 bio_pair_release(bp); 1494 bio_pair_release(bp);
1363
1364 spin_lock_irq(q->queue_lock); 1495 spin_lock_irq(q->queue_lock);
1365next: 1496next:
1366 rq = blk_fetch_request(q); 1497 rq = blk_fetch_request(q);