aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-03-14 15:09:06 -0400
committerSage Weil <sage@inktank.com>2013-05-02 00:18:30 -0400
commit8ae4f4f5c056150d5480710ab356801e84d01a3d (patch)
tree8e468e214e55cb4f6f0858d127bc7f5cfa5569fb /net/ceph
parent36153ec9dd6287d7cedf6afb51453c445d946cee (diff)
libceph: have cursor point to data
Rather than having a ceph message data item point to the cursor it's associated with, have the cursor point to a data item. This will allow a message cursor to be used for more than one data item. Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/messenger.c113
1 files changed, 55 insertions, 58 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 4626da34a5c3..3aa0f30c3c5e 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -722,10 +722,10 @@ static void con_out_kvec_add(struct ceph_connection *con,
722 * entry in the current bio iovec, or the first entry in the next 722 * entry in the current bio iovec, or the first entry in the next
723 * bio in the list. 723 * bio in the list.
724 */ 724 */
725static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data, 725static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
726 size_t length) 726 size_t length)
727{ 727{
728 struct ceph_msg_data_cursor *cursor = data->cursor; 728 struct ceph_msg_data *data = cursor->data;
729 struct bio *bio; 729 struct bio *bio;
730 730
731 BUG_ON(data->type != CEPH_MSG_DATA_BIO); 731 BUG_ON(data->type != CEPH_MSG_DATA_BIO);
@@ -741,11 +741,11 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data,
741 cursor->last_piece = length <= bio->bi_io_vec[0].bv_len; 741 cursor->last_piece = length <= bio->bi_io_vec[0].bv_len;
742} 742}
743 743
744static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data, 744static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
745 size_t *page_offset, 745 size_t *page_offset,
746 size_t *length) 746 size_t *length)
747{ 747{
748 struct ceph_msg_data_cursor *cursor = data->cursor; 748 struct ceph_msg_data *data = cursor->data;
749 struct bio *bio; 749 struct bio *bio;
750 struct bio_vec *bio_vec; 750 struct bio_vec *bio_vec;
751 unsigned int index; 751 unsigned int index;
@@ -772,14 +772,14 @@ static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
772 return bio_vec->bv_page; 772 return bio_vec->bv_page;
773} 773}
774 774
775static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes) 775static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
776 size_t bytes)
776{ 777{
777 struct ceph_msg_data_cursor *cursor = data->cursor;
778 struct bio *bio; 778 struct bio *bio;
779 struct bio_vec *bio_vec; 779 struct bio_vec *bio_vec;
780 unsigned int index; 780 unsigned int index;
781 781
782 BUG_ON(data->type != CEPH_MSG_DATA_BIO); 782 BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
783 783
784 bio = cursor->bio; 784 bio = cursor->bio;
785 BUG_ON(!bio); 785 BUG_ON(!bio);
@@ -823,10 +823,10 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
823 * For a page array, a piece comes from the first page in the array 823 * For a page array, a piece comes from the first page in the array
824 * that has not already been fully consumed. 824 * that has not already been fully consumed.
825 */ 825 */
826static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data, 826static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
827 size_t length) 827 size_t length)
828{ 828{
829 struct ceph_msg_data_cursor *cursor = data->cursor; 829 struct ceph_msg_data *data = cursor->data;
830 int page_count; 830 int page_count;
831 831
832 BUG_ON(data->type != CEPH_MSG_DATA_PAGES); 832 BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
@@ -845,11 +845,11 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data,
845 cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE; 845 cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE;
846} 846}
847 847
848static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data, 848static struct page *
849 size_t *page_offset, 849ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
850 size_t *length) 850 size_t *page_offset, size_t *length)
851{ 851{
852 struct ceph_msg_data_cursor *cursor = data->cursor; 852 struct ceph_msg_data *data = cursor->data;
853 853
854 BUG_ON(data->type != CEPH_MSG_DATA_PAGES); 854 BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
855 855
@@ -865,12 +865,10 @@ static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
865 return data->pages[cursor->page_index]; 865 return data->pages[cursor->page_index];
866} 866}
867 867
868static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data, 868static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
869 size_t bytes) 869 size_t bytes)
870{ 870{
871 struct ceph_msg_data_cursor *cursor = data->cursor; 871 BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
872
873 BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
874 872
875 BUG_ON(cursor->page_offset + bytes > PAGE_SIZE); 873 BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
876 874
@@ -894,10 +892,11 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
894 * For a pagelist, a piece is whatever remains to be consumed in the 892 * For a pagelist, a piece is whatever remains to be consumed in the
895 * first page in the list, or the front of the next page. 893 * first page in the list, or the front of the next page.
896 */ 894 */
897static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data, 895static void
896ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
898 size_t length) 897 size_t length)
899{ 898{
900 struct ceph_msg_data_cursor *cursor = data->cursor; 899 struct ceph_msg_data *data = cursor->data;
901 struct ceph_pagelist *pagelist; 900 struct ceph_pagelist *pagelist;
902 struct page *page; 901 struct page *page;
903 902
@@ -919,11 +918,11 @@ static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data,
919 cursor->last_piece = length <= PAGE_SIZE; 918 cursor->last_piece = length <= PAGE_SIZE;
920} 919}
921 920
922static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data, 921static struct page *
923 size_t *page_offset, 922ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
924 size_t *length) 923 size_t *page_offset, size_t *length)
925{ 924{
926 struct ceph_msg_data_cursor *cursor = data->cursor; 925 struct ceph_msg_data *data = cursor->data;
927 struct ceph_pagelist *pagelist; 926 struct ceph_pagelist *pagelist;
928 927
929 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); 928 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
@@ -941,13 +940,13 @@ static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
941 else 940 else
942 *length = PAGE_SIZE - *page_offset; 941 *length = PAGE_SIZE - *page_offset;
943 942
944 return data->cursor->page; 943 return cursor->page;
945} 944}
946 945
947static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data, 946static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
948 size_t bytes) 947 size_t bytes)
949{ 948{
950 struct ceph_msg_data_cursor *cursor = data->cursor; 949 struct ceph_msg_data *data = cursor->data;
951 struct ceph_pagelist *pagelist; 950 struct ceph_pagelist *pagelist;
952 951
953 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); 952 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
@@ -983,19 +982,21 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
983 * be processed in that piece. It also tracks whether the current 982 * be processed in that piece. It also tracks whether the current
984 * piece is the last one in the data item. 983 * piece is the last one in the data item.
985 */ 984 */
986static void ceph_msg_data_cursor_init(struct ceph_msg_data *data, 985static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
987 size_t length)
988{ 986{
989 switch (data->type) { 987 struct ceph_msg_data_cursor *cursor = &msg->cursor;
988
989 cursor->data = msg->data;
990 switch (cursor->data->type) {
990 case CEPH_MSG_DATA_PAGELIST: 991 case CEPH_MSG_DATA_PAGELIST:
991 ceph_msg_data_pagelist_cursor_init(data, length); 992 ceph_msg_data_pagelist_cursor_init(cursor, length);
992 break; 993 break;
993 case CEPH_MSG_DATA_PAGES: 994 case CEPH_MSG_DATA_PAGES:
994 ceph_msg_data_pages_cursor_init(data, length); 995 ceph_msg_data_pages_cursor_init(cursor, length);
995 break; 996 break;
996#ifdef CONFIG_BLOCK 997#ifdef CONFIG_BLOCK
997 case CEPH_MSG_DATA_BIO: 998 case CEPH_MSG_DATA_BIO:
998 ceph_msg_data_bio_cursor_init(data, length); 999 ceph_msg_data_bio_cursor_init(cursor, length);
999 break; 1000 break;
1000#endif /* CONFIG_BLOCK */ 1001#endif /* CONFIG_BLOCK */
1001 case CEPH_MSG_DATA_NONE: 1002 case CEPH_MSG_DATA_NONE:
@@ -1003,7 +1004,7 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data,
1003 /* BUG(); */ 1004 /* BUG(); */
1004 break; 1005 break;
1005 } 1006 }
1006 data->cursor->need_crc = true; 1007 cursor->need_crc = true;
1007} 1008}
1008 1009
1009/* 1010/*
@@ -1011,23 +1012,22 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data,
1011 * data item, and supply the page offset and length of that piece. 1012 * data item, and supply the page offset and length of that piece.
1012 * Indicate whether this is the last piece in this data item. 1013 * Indicate whether this is the last piece in this data item.
1013 */ 1014 */
1014static struct page *ceph_msg_data_next(struct ceph_msg_data *data, 1015static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
1015 size_t *page_offset, 1016 size_t *page_offset, size_t *length,
1016 size_t *length,
1017 bool *last_piece) 1017 bool *last_piece)
1018{ 1018{
1019 struct page *page; 1019 struct page *page;
1020 1020
1021 switch (data->type) { 1021 switch (cursor->data->type) {
1022 case CEPH_MSG_DATA_PAGELIST: 1022 case CEPH_MSG_DATA_PAGELIST:
1023 page = ceph_msg_data_pagelist_next(data, page_offset, length); 1023 page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
1024 break; 1024 break;
1025 case CEPH_MSG_DATA_PAGES: 1025 case CEPH_MSG_DATA_PAGES:
1026 page = ceph_msg_data_pages_next(data, page_offset, length); 1026 page = ceph_msg_data_pages_next(cursor, page_offset, length);
1027 break; 1027 break;
1028#ifdef CONFIG_BLOCK 1028#ifdef CONFIG_BLOCK
1029 case CEPH_MSG_DATA_BIO: 1029 case CEPH_MSG_DATA_BIO:
1030 page = ceph_msg_data_bio_next(data, page_offset, length); 1030 page = ceph_msg_data_bio_next(cursor, page_offset, length);
1031 break; 1031 break;
1032#endif /* CONFIG_BLOCK */ 1032#endif /* CONFIG_BLOCK */
1033 case CEPH_MSG_DATA_NONE: 1033 case CEPH_MSG_DATA_NONE:
@@ -1039,7 +1039,7 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
1039 BUG_ON(*page_offset + *length > PAGE_SIZE); 1039 BUG_ON(*page_offset + *length > PAGE_SIZE);
1040 BUG_ON(!*length); 1040 BUG_ON(!*length);
1041 if (last_piece) 1041 if (last_piece)
1042 *last_piece = data->cursor->last_piece; 1042 *last_piece = cursor->last_piece;
1043 1043
1044 return page; 1044 return page;
1045} 1045}
@@ -1048,22 +1048,22 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
1048 * Returns true if the result moves the cursor on to the next piece 1048 * Returns true if the result moves the cursor on to the next piece
1049 * of the data item. 1049 * of the data item.
1050 */ 1050 */
1051static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes) 1051static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
1052 size_t bytes)
1052{ 1053{
1053 struct ceph_msg_data_cursor *cursor = data->cursor;
1054 bool new_piece; 1054 bool new_piece;
1055 1055
1056 BUG_ON(bytes > cursor->resid); 1056 BUG_ON(bytes > cursor->resid);
1057 switch (data->type) { 1057 switch (cursor->data->type) {
1058 case CEPH_MSG_DATA_PAGELIST: 1058 case CEPH_MSG_DATA_PAGELIST:
1059 new_piece = ceph_msg_data_pagelist_advance(data, bytes); 1059 new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
1060 break; 1060 break;
1061 case CEPH_MSG_DATA_PAGES: 1061 case CEPH_MSG_DATA_PAGES:
1062 new_piece = ceph_msg_data_pages_advance(data, bytes); 1062 new_piece = ceph_msg_data_pages_advance(cursor, bytes);
1063 break; 1063 break;
1064#ifdef CONFIG_BLOCK 1064#ifdef CONFIG_BLOCK
1065 case CEPH_MSG_DATA_BIO: 1065 case CEPH_MSG_DATA_BIO:
1066 new_piece = ceph_msg_data_bio_advance(data, bytes); 1066 new_piece = ceph_msg_data_bio_advance(cursor, bytes);
1067 break; 1067 break;
1068#endif /* CONFIG_BLOCK */ 1068#endif /* CONFIG_BLOCK */
1069 case CEPH_MSG_DATA_NONE: 1069 case CEPH_MSG_DATA_NONE:
@@ -1071,7 +1071,7 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
1071 BUG(); 1071 BUG();
1072 break; 1072 break;
1073 } 1073 }
1074 data->cursor->need_crc = new_piece; 1074 cursor->need_crc = new_piece;
1075 1075
1076 return new_piece; 1076 return new_piece;
1077} 1077}
@@ -1083,7 +1083,7 @@ static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
1083 1083
1084 /* Initialize data cursor */ 1084 /* Initialize data cursor */
1085 1085
1086 ceph_msg_data_cursor_init(msg->data, (size_t)data_len); 1086 ceph_msg_data_cursor_init(msg, (size_t)data_len);
1087} 1087}
1088 1088
1089/* 1089/*
@@ -1404,7 +1404,7 @@ static u32 ceph_crc32c_page(u32 crc, struct page *page,
1404static int write_partial_message_data(struct ceph_connection *con) 1404static int write_partial_message_data(struct ceph_connection *con)
1405{ 1405{
1406 struct ceph_msg *msg = con->out_msg; 1406 struct ceph_msg *msg = con->out_msg;
1407 struct ceph_msg_data_cursor *cursor = msg->data->cursor; 1407 struct ceph_msg_data_cursor *cursor = &msg->cursor;
1408 bool do_datacrc = !con->msgr->nocrc; 1408 bool do_datacrc = !con->msgr->nocrc;
1409 u32 crc; 1409 u32 crc;
1410 1410
@@ -1430,7 +1430,7 @@ static int write_partial_message_data(struct ceph_connection *con)
1430 bool need_crc; 1430 bool need_crc;
1431 int ret; 1431 int ret;
1432 1432
1433 page = ceph_msg_data_next(msg->data, &page_offset, &length, 1433 page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
1434 &last_piece); 1434 &last_piece);
1435 ret = ceph_tcp_sendpage(con->sock, page, page_offset, 1435 ret = ceph_tcp_sendpage(con->sock, page, page_offset,
1436 length, last_piece); 1436 length, last_piece);
@@ -1442,7 +1442,7 @@ static int write_partial_message_data(struct ceph_connection *con)
1442 } 1442 }
1443 if (do_datacrc && cursor->need_crc) 1443 if (do_datacrc && cursor->need_crc)
1444 crc = ceph_crc32c_page(crc, page, page_offset, length); 1444 crc = ceph_crc32c_page(crc, page, page_offset, length);
1445 need_crc = ceph_msg_data_advance(msg->data, (size_t)ret); 1445 need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
1446 } 1446 }
1447 1447
1448 dout("%s %p msg %p done\n", __func__, con, msg); 1448 dout("%s %p msg %p done\n", __func__, con, msg);
@@ -2102,7 +2102,7 @@ static int read_partial_message_section(struct ceph_connection *con,
2102static int read_partial_msg_data(struct ceph_connection *con) 2102static int read_partial_msg_data(struct ceph_connection *con)
2103{ 2103{
2104 struct ceph_msg *msg = con->in_msg; 2104 struct ceph_msg *msg = con->in_msg;
2105 struct ceph_msg_data_cursor *cursor = msg->data->cursor; 2105 struct ceph_msg_data_cursor *cursor = &msg->cursor;
2106 const bool do_datacrc = !con->msgr->nocrc; 2106 const bool do_datacrc = !con->msgr->nocrc;
2107 struct page *page; 2107 struct page *page;
2108 size_t page_offset; 2108 size_t page_offset;
@@ -2117,7 +2117,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
2117 if (do_datacrc) 2117 if (do_datacrc)
2118 crc = con->in_data_crc; 2118 crc = con->in_data_crc;
2119 while (cursor->resid) { 2119 while (cursor->resid) {
2120 page = ceph_msg_data_next(msg->data, &page_offset, &length, 2120 page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
2121 NULL); 2121 NULL);
2122 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); 2122 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
2123 if (ret <= 0) { 2123 if (ret <= 0) {
@@ -2129,7 +2129,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
2129 2129
2130 if (do_datacrc) 2130 if (do_datacrc)
2131 crc = ceph_crc32c_page(crc, page, page_offset, ret); 2131 crc = ceph_crc32c_page(crc, page, page_offset, ret);
2132 (void) ceph_msg_data_advance(msg->data, (size_t)ret); 2132 (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
2133 } 2133 }
2134 if (do_datacrc) 2134 if (do_datacrc)
2135 con->in_data_crc = crc; 2135 con->in_data_crc = crc;
@@ -2991,7 +2991,6 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
2991 2991
2992 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES); 2992 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
2993 BUG_ON(!data); 2993 BUG_ON(!data);
2994 data->cursor = &msg->cursor;
2995 data->pages = pages; 2994 data->pages = pages;
2996 data->length = length; 2995 data->length = length;
2997 data->alignment = alignment & ~PAGE_MASK; 2996 data->alignment = alignment & ~PAGE_MASK;
@@ -3013,7 +3012,6 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
3013 3012
3014 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST); 3013 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
3015 BUG_ON(!data); 3014 BUG_ON(!data);
3016 data->cursor = &msg->cursor;
3017 data->pagelist = pagelist; 3015 data->pagelist = pagelist;
3018 3016
3019 msg->data = data; 3017 msg->data = data;
@@ -3033,7 +3031,6 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
3033 3031
3034 data = ceph_msg_data_create(CEPH_MSG_DATA_BIO); 3032 data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
3035 BUG_ON(!data); 3033 BUG_ON(!data);
3036 data->cursor = &msg->cursor;
3037 data->bio = bio; 3034 data->bio = bio;
3038 data->bio_length = length; 3035 data->bio_length = length;
3039 3036