aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-03-07 00:39:39 -0500
committerSage Weil <sage@inktank.com>2013-05-02 00:16:59 -0400
commit6aaa4511deb4b0fd776d1153dc63a89cdc024fb8 (patch)
tree455fb4334de3518e411daaa7e3ff1d9aef187cd8 /net
parent7fe1e5e57b84eab98ff352519aa66e86dac5bf61 (diff)
libceph: implement bio message data item cursor
Implement and use cursor routines for bio message data items for outbound message data. (See the previous commit for reasoning in support of the changes in out_msg_pos_next().) Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'net')
-rw-r--r--net/ceph/messenger.c137
1 files changed, 116 insertions, 21 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 30c8792be180..209990a853e5 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -739,6 +739,95 @@ static void iter_bio_next(struct bio **bio_iter, unsigned int *seg)
739 if (*seg == (*bio_iter)->bi_vcnt) 739 if (*seg == (*bio_iter)->bi_vcnt)
740 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); 740 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
741} 741}
742
743/*
744 * For a bio data item, a piece is whatever remains of the next
745 * entry in the current bio iovec, or the first entry in the next
746 * bio in the list.
747 */
748static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data)
749{
750 struct ceph_msg_data_cursor *cursor = &data->cursor;
751 struct bio *bio;
752
753 BUG_ON(data->type != CEPH_MSG_DATA_BIO);
754
755 bio = data->bio;
756 BUG_ON(!bio);
757 BUG_ON(!bio->bi_vcnt);
758 /* resid = bio->bi_size */
759
760 cursor->bio = bio;
761 cursor->vector_index = 0;
762 cursor->vector_offset = 0;
763 cursor->last_piece = !bio->bi_next && bio->bi_vcnt == 1;
764}
765
766static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
767 size_t *page_offset,
768 size_t *length)
769{
770 struct ceph_msg_data_cursor *cursor = &data->cursor;
771 struct bio *bio;
772 struct bio_vec *bio_vec;
773 unsigned int index;
774
775 BUG_ON(data->type != CEPH_MSG_DATA_BIO);
776
777 bio = cursor->bio;
778 BUG_ON(!bio);
779
780 index = cursor->vector_index;
781 BUG_ON(index >= (unsigned int) bio->bi_vcnt);
782
783 bio_vec = &bio->bi_io_vec[index];
784 BUG_ON(cursor->vector_offset >= bio_vec->bv_len);
785 *page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset);
786 BUG_ON(*page_offset >= PAGE_SIZE);
787 *length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
788 BUG_ON(*length > PAGE_SIZE);
789
790 return bio_vec->bv_page;
791}
792
793static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
794{
795 struct ceph_msg_data_cursor *cursor = &data->cursor;
796 struct bio *bio;
797 struct bio_vec *bio_vec;
798 unsigned int index;
799
800 BUG_ON(data->type != CEPH_MSG_DATA_BIO);
801
802 bio = cursor->bio;
803 BUG_ON(!bio);
804
805 index = cursor->vector_index;
806 BUG_ON(index >= (unsigned int) bio->bi_vcnt);
807 bio_vec = &bio->bi_io_vec[index];
808 BUG_ON(cursor->vector_offset + bytes > bio_vec->bv_len);
809
810 /* Advance the cursor offset */
811
812 cursor->vector_offset += bytes;
813 if (cursor->vector_offset < bio_vec->bv_len)
814 return false; /* more bytes to process in this segment */
815
816 /* Move on to the next segment, and possibly the next bio */
817
818 if (++cursor->vector_index == (unsigned int) bio->bi_vcnt) {
819 bio = bio->bi_next;
820 cursor->bio = bio;
821 cursor->vector_index = 0;
822 }
823 cursor->vector_offset = 0;
824
825 if (!cursor->last_piece && bio && !bio->bi_next)
826 if (cursor->vector_index == (unsigned int) bio->bi_vcnt - 1)
827 cursor->last_piece = true;
828
829 return true;
830}
742#endif 831#endif
743 832
744/* 833/*
@@ -843,11 +932,13 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
843 case CEPH_MSG_DATA_PAGELIST: 932 case CEPH_MSG_DATA_PAGELIST:
844 ceph_msg_data_pagelist_cursor_init(data); 933 ceph_msg_data_pagelist_cursor_init(data);
845 break; 934 break;
846 case CEPH_MSG_DATA_NONE:
847 case CEPH_MSG_DATA_PAGES:
848#ifdef CONFIG_BLOCK 935#ifdef CONFIG_BLOCK
849 case CEPH_MSG_DATA_BIO: 936 case CEPH_MSG_DATA_BIO:
937 ceph_msg_data_bio_cursor_init(data);
938 break;
850#endif /* CONFIG_BLOCK */ 939#endif /* CONFIG_BLOCK */
940 case CEPH_MSG_DATA_NONE:
941 case CEPH_MSG_DATA_PAGES:
851 default: 942 default:
852 /* BUG(); */ 943 /* BUG(); */
853 break; 944 break;
@@ -870,11 +961,13 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
870 case CEPH_MSG_DATA_PAGELIST: 961 case CEPH_MSG_DATA_PAGELIST:
871 page = ceph_msg_data_pagelist_next(data, page_offset, length); 962 page = ceph_msg_data_pagelist_next(data, page_offset, length);
872 break; 963 break;
873 case CEPH_MSG_DATA_NONE:
874 case CEPH_MSG_DATA_PAGES:
875#ifdef CONFIG_BLOCK 964#ifdef CONFIG_BLOCK
876 case CEPH_MSG_DATA_BIO: 965 case CEPH_MSG_DATA_BIO:
966 page = ceph_msg_data_bio_next(data, page_offset, length);
967 break;
877#endif /* CONFIG_BLOCK */ 968#endif /* CONFIG_BLOCK */
969 case CEPH_MSG_DATA_NONE:
970 case CEPH_MSG_DATA_PAGES:
878 default: 971 default:
879 page = NULL; 972 page = NULL;
880 break; 973 break;
@@ -900,11 +993,13 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
900 case CEPH_MSG_DATA_PAGELIST: 993 case CEPH_MSG_DATA_PAGELIST:
901 new_piece = ceph_msg_data_pagelist_advance(data, bytes); 994 new_piece = ceph_msg_data_pagelist_advance(data, bytes);
902 break; 995 break;
903 case CEPH_MSG_DATA_NONE:
904 case CEPH_MSG_DATA_PAGES:
905#ifdef CONFIG_BLOCK 996#ifdef CONFIG_BLOCK
906 case CEPH_MSG_DATA_BIO: 997 case CEPH_MSG_DATA_BIO:
998 new_piece = ceph_msg_data_bio_advance(data, bytes);
999 break;
907#endif /* CONFIG_BLOCK */ 1000#endif /* CONFIG_BLOCK */
1001 case CEPH_MSG_DATA_NONE:
1002 case CEPH_MSG_DATA_PAGES:
908 default: 1003 default:
909 BUG(); 1004 BUG();
910 break; 1005 break;
@@ -933,6 +1028,10 @@ static void prepare_message_data(struct ceph_msg *msg,
933 1028
934 /* Initialize data cursors */ 1029 /* Initialize data cursors */
935 1030
1031#ifdef CONFIG_BLOCK
1032 if (ceph_msg_has_bio(msg))
1033 ceph_msg_data_cursor_init(&msg->b);
1034#endif /* CONFIG_BLOCK */
936 if (ceph_msg_has_pagelist(msg)) 1035 if (ceph_msg_has_pagelist(msg))
937 ceph_msg_data_cursor_init(&msg->l); 1036 ceph_msg_data_cursor_init(&msg->l);
938 if (ceph_msg_has_trail(msg)) 1037 if (ceph_msg_has_trail(msg))
@@ -1233,6 +1332,10 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
1233 need_crc = ceph_msg_data_advance(&msg->t, sent); 1332 need_crc = ceph_msg_data_advance(&msg->t, sent);
1234 else if (ceph_msg_has_pagelist(msg)) 1333 else if (ceph_msg_has_pagelist(msg))
1235 need_crc = ceph_msg_data_advance(&msg->l, sent); 1334 need_crc = ceph_msg_data_advance(&msg->l, sent);
1335#ifdef CONFIG_BLOCK
1336 else if (ceph_msg_has_bio(msg))
1337 need_crc = ceph_msg_data_advance(&msg->b, sent);
1338#endif /* CONFIG_BLOCK */
1236 BUG_ON(need_crc && sent != len); 1339 BUG_ON(need_crc && sent != len);
1237 1340
1238 if (sent < len) 1341 if (sent < len)
@@ -1242,10 +1345,6 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
1242 msg_pos->page_pos = 0; 1345 msg_pos->page_pos = 0;
1243 msg_pos->page++; 1346 msg_pos->page++;
1244 msg_pos->did_page_crc = false; 1347 msg_pos->did_page_crc = false;
1245#ifdef CONFIG_BLOCK
1246 if (ceph_msg_has_bio(msg))
1247 iter_bio_next(&msg->b.bio_iter, &msg->b.bio_seg);
1248#endif
1249} 1348}
1250 1349
1251static void in_msg_pos_next(struct ceph_connection *con, size_t len, 1350static void in_msg_pos_next(struct ceph_connection *con, size_t len,
@@ -1323,8 +1422,6 @@ static int write_partial_message_data(struct ceph_connection *con)
1323 struct page *page = NULL; 1422 struct page *page = NULL;
1324 size_t page_offset; 1423 size_t page_offset;
1325 size_t length; 1424 size_t length;
1326 int max_write = PAGE_SIZE;
1327 int bio_offset = 0;
1328 bool use_cursor = false; 1425 bool use_cursor = false;
1329 bool last_piece = true; /* preserve existing behavior */ 1426 bool last_piece = true; /* preserve existing behavior */
1330 1427
@@ -1345,21 +1442,19 @@ static int write_partial_message_data(struct ceph_connection *con)
1345 &length, &last_piece); 1442 &length, &last_piece);
1346#ifdef CONFIG_BLOCK 1443#ifdef CONFIG_BLOCK
1347 } else if (ceph_msg_has_bio(msg)) { 1444 } else if (ceph_msg_has_bio(msg)) {
1348 struct bio_vec *bv; 1445 use_cursor = true;
1349 1446 page = ceph_msg_data_next(&msg->b, &page_offset,
1350 bv = bio_iovec_idx(msg->b.bio_iter, msg->b.bio_seg); 1447 &length, &last_piece);
1351 page = bv->bv_page;
1352 bio_offset = bv->bv_offset;
1353 max_write = bv->bv_len;
1354#endif 1448#endif
1355 } else { 1449 } else {
1356 page = zero_page; 1450 page = zero_page;
1357 } 1451 }
1358 if (!use_cursor) 1452 if (!use_cursor) {
1359 length = min_t(int, max_write - msg_pos->page_pos, 1453 length = min_t(int, PAGE_SIZE - msg_pos->page_pos,
1360 total_max_write); 1454 total_max_write);
1361 1455
1362 page_offset = msg_pos->page_pos + bio_offset; 1456 page_offset = msg_pos->page_pos;
1457 }
1363 if (do_datacrc && !msg_pos->did_page_crc) { 1458 if (do_datacrc && !msg_pos->did_page_crc) {
1364 u32 crc = le32_to_cpu(msg->footer.data_crc); 1459 u32 crc = le32_to_cpu(msg->footer.data_crc);
1365 1460