aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-03-07 16:38:28 -0500
committerSage Weil <sage@inktank.com>2013-05-02 00:17:01 -0400
commite766d7b55e10f93c7bab298135a4e90dcc46620d (patch)
tree0b3bebb5ee06c1a63d2155c6ba3ce52f83d63fe7 /net
parent6aaa4511deb4b0fd776d1153dc63a89cdc024fb8 (diff)
libceph: implement pages array cursor
Implement and use cursor routines for page array message data items for outbound message data. Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'net')
-rw-r--r--net/ceph/messenger.c93
1 files changed, 89 insertions, 4 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 209990a853e5..d611156808b3 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -831,6 +831,79 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
831#endif 831#endif
832 832
833/* 833/*
834 * For a page array, a piece comes from the first page in the array
835 * that has not already been fully consumed.
836 */
837static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data)
838{
839 struct ceph_msg_data_cursor *cursor = &data->cursor;
840 int page_count;
841
842 BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
843
844 BUG_ON(!data->pages);
845 BUG_ON(!data->length);
846
847 page_count = calc_pages_for(data->alignment, (u64)data->length);
848 BUG_ON(page_count > (int) USHRT_MAX);
849 cursor->resid = data->length;
850 cursor->page_offset = data->alignment & ~PAGE_MASK;
851 cursor->page_index = 0;
852 cursor->page_count = (unsigned short) page_count;
853 cursor->last_piece = cursor->page_count == 1;
854}
855
856static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
857 size_t *page_offset,
858 size_t *length)
859{
860 struct ceph_msg_data_cursor *cursor = &data->cursor;
861
862 BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
863
864 BUG_ON(cursor->page_index >= cursor->page_count);
865 BUG_ON(cursor->page_offset >= PAGE_SIZE);
866 BUG_ON(!cursor->resid);
867
868 *page_offset = cursor->page_offset;
869 if (cursor->last_piece) {
870 BUG_ON(*page_offset + cursor->resid > PAGE_SIZE);
871 *length = cursor->resid;
872 } else {
873 *length = PAGE_SIZE - *page_offset;
874 }
875
876 return data->pages[cursor->page_index];
877}
878
879static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
880 size_t bytes)
881{
882 struct ceph_msg_data_cursor *cursor = &data->cursor;
883
884 BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
885
886 BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
887 BUG_ON(bytes > cursor->resid);
888
889 /* Advance the cursor page offset */
890
891 cursor->resid -= bytes;
892 cursor->page_offset += bytes;
893 if (!bytes || cursor->page_offset & ~PAGE_MASK)
894 return false; /* more bytes to process in the current page */
895
896 /* Move on to the next page */
897
898 BUG_ON(cursor->page_index >= cursor->page_count);
899 cursor->page_offset = 0;
900 cursor->page_index++;
901 cursor->last_piece = cursor->page_index == cursor->page_count - 1;
902
903 return true;
904}
905
906/*
834 * For a pagelist, a piece is whatever remains to be consumed in the 907 * For a pagelist, a piece is whatever remains to be consumed in the
835 * first page in the list, or the front of the next page. 908 * first page in the list, or the front of the next page.
836 */ 909 */
@@ -932,13 +1005,15 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
932 case CEPH_MSG_DATA_PAGELIST: 1005 case CEPH_MSG_DATA_PAGELIST:
933 ceph_msg_data_pagelist_cursor_init(data); 1006 ceph_msg_data_pagelist_cursor_init(data);
934 break; 1007 break;
1008 case CEPH_MSG_DATA_PAGES:
1009 ceph_msg_data_pages_cursor_init(data);
1010 break;
935#ifdef CONFIG_BLOCK 1011#ifdef CONFIG_BLOCK
936 case CEPH_MSG_DATA_BIO: 1012 case CEPH_MSG_DATA_BIO:
937 ceph_msg_data_bio_cursor_init(data); 1013 ceph_msg_data_bio_cursor_init(data);
938 break; 1014 break;
939#endif /* CONFIG_BLOCK */ 1015#endif /* CONFIG_BLOCK */
940 case CEPH_MSG_DATA_NONE: 1016 case CEPH_MSG_DATA_NONE:
941 case CEPH_MSG_DATA_PAGES:
942 default: 1017 default:
943 /* BUG(); */ 1018 /* BUG(); */
944 break; 1019 break;
@@ -961,13 +1036,15 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
961 case CEPH_MSG_DATA_PAGELIST: 1036 case CEPH_MSG_DATA_PAGELIST:
962 page = ceph_msg_data_pagelist_next(data, page_offset, length); 1037 page = ceph_msg_data_pagelist_next(data, page_offset, length);
963 break; 1038 break;
1039 case CEPH_MSG_DATA_PAGES:
1040 page = ceph_msg_data_pages_next(data, page_offset, length);
1041 break;
964#ifdef CONFIG_BLOCK 1042#ifdef CONFIG_BLOCK
965 case CEPH_MSG_DATA_BIO: 1043 case CEPH_MSG_DATA_BIO:
966 page = ceph_msg_data_bio_next(data, page_offset, length); 1044 page = ceph_msg_data_bio_next(data, page_offset, length);
967 break; 1045 break;
968#endif /* CONFIG_BLOCK */ 1046#endif /* CONFIG_BLOCK */
969 case CEPH_MSG_DATA_NONE: 1047 case CEPH_MSG_DATA_NONE:
970 case CEPH_MSG_DATA_PAGES:
971 default: 1048 default:
972 page = NULL; 1049 page = NULL;
973 break; 1050 break;
@@ -993,13 +1070,15 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
993 case CEPH_MSG_DATA_PAGELIST: 1070 case CEPH_MSG_DATA_PAGELIST:
994 new_piece = ceph_msg_data_pagelist_advance(data, bytes); 1071 new_piece = ceph_msg_data_pagelist_advance(data, bytes);
995 break; 1072 break;
1073 case CEPH_MSG_DATA_PAGES:
1074 new_piece = ceph_msg_data_pages_advance(data, bytes);
1075 break;
996#ifdef CONFIG_BLOCK 1076#ifdef CONFIG_BLOCK
997 case CEPH_MSG_DATA_BIO: 1077 case CEPH_MSG_DATA_BIO:
998 new_piece = ceph_msg_data_bio_advance(data, bytes); 1078 new_piece = ceph_msg_data_bio_advance(data, bytes);
999 break; 1079 break;
1000#endif /* CONFIG_BLOCK */ 1080#endif /* CONFIG_BLOCK */
1001 case CEPH_MSG_DATA_NONE: 1081 case CEPH_MSG_DATA_NONE:
1002 case CEPH_MSG_DATA_PAGES:
1003 default: 1082 default:
1004 BUG(); 1083 BUG();
1005 break; 1084 break;
@@ -1032,6 +1111,8 @@ static void prepare_message_data(struct ceph_msg *msg,
1032 if (ceph_msg_has_bio(msg)) 1111 if (ceph_msg_has_bio(msg))
1033 ceph_msg_data_cursor_init(&msg->b); 1112 ceph_msg_data_cursor_init(&msg->b);
1034#endif /* CONFIG_BLOCK */ 1113#endif /* CONFIG_BLOCK */
1114 if (ceph_msg_has_pages(msg))
1115 ceph_msg_data_cursor_init(&msg->p);
1035 if (ceph_msg_has_pagelist(msg)) 1116 if (ceph_msg_has_pagelist(msg))
1036 ceph_msg_data_cursor_init(&msg->l); 1117 ceph_msg_data_cursor_init(&msg->l);
1037 if (ceph_msg_has_trail(msg)) 1118 if (ceph_msg_has_trail(msg))
@@ -1330,6 +1411,8 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
1330 msg_pos->page_pos += sent; 1411 msg_pos->page_pos += sent;
1331 if (in_trail) 1412 if (in_trail)
1332 need_crc = ceph_msg_data_advance(&msg->t, sent); 1413 need_crc = ceph_msg_data_advance(&msg->t, sent);
1414 else if (ceph_msg_has_pages(msg))
1415 need_crc = ceph_msg_data_advance(&msg->p, sent);
1333 else if (ceph_msg_has_pagelist(msg)) 1416 else if (ceph_msg_has_pagelist(msg))
1334 need_crc = ceph_msg_data_advance(&msg->l, sent); 1417 need_crc = ceph_msg_data_advance(&msg->l, sent);
1335#ifdef CONFIG_BLOCK 1418#ifdef CONFIG_BLOCK
@@ -1435,7 +1518,9 @@ static int write_partial_message_data(struct ceph_connection *con)
1435 page = ceph_msg_data_next(&msg->t, &page_offset, 1518 page = ceph_msg_data_next(&msg->t, &page_offset,
1436 &length, &last_piece); 1519 &length, &last_piece);
1437 } else if (ceph_msg_has_pages(msg)) { 1520 } else if (ceph_msg_has_pages(msg)) {
1438 page = msg->p.pages[msg_pos->page]; 1521 use_cursor = true;
1522 page = ceph_msg_data_next(&msg->p, &page_offset,
1523 &length, &last_piece);
1439 } else if (ceph_msg_has_pagelist(msg)) { 1524 } else if (ceph_msg_has_pagelist(msg)) {
1440 use_cursor = true; 1525 use_cursor = true;
1441 page = ceph_msg_data_next(&msg->l, &page_offset, 1526 page = ceph_msg_data_next(&msg->l, &page_offset,