aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-04-05 15:46:01 -0400
committerSage Weil <sage@inktank.com>2013-05-02 00:18:33 -0400
commitca8b3a69174b04376722672d7dd6b666a7f17c50 (patch)
tree68c78a604897ae23ff10d971d511b8e2690e2f58
parent5240d9f95dfe0f0701b35fbff1cb5b70825ad23f (diff)
libceph: implement multiple data items in a message
This patch adds support to the messenger for more than one data item in its data list. A message data cursor has two more fields to support this: - a count of the number of bytes left to be consumed across all data items in the list, "total_resid" - a pointer to the head of the list (for validation only) The cursor initialization routine has been split into two parts: the outer one, which initializes the cursor for traversing the entire list of data items; and the inner one, which initializes the cursor to start processing a single data item. When a message cursor is first initialized, the outer initialization routine sets total_resid to the length provided. The data pointer is initialized to the first data item on the list. From there, the inner initialization routine finishes by setting up to process the data item the cursor points to. Advancing the cursor consumes bytes in total_resid. If the resid field reaches zero, it means the current data item is fully consumed. If total_resid indicates there is more data, the cursor is advanced to point to the next data item, and then the inner initialization routine prepares for using that. (A check is made at this point to make sure we don't wrap around the front of the list.) The type-specific init routines are modified so they can be given a length that's larger than what the data item can support. The resid field is initialized to the smaller of the provided length and the length of the entire data item. When total_resid reaches zero, we're done. This resolves: http://tracker.ceph.com/issues/3761 Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
-rw-r--r--include/linux/ceph/messenger.h5
-rw-r--r--net/ceph/messenger.c48
2 files changed, 36 insertions, 17 deletions
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 318da0170a1e..de1d2e1ecce2 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -108,7 +108,10 @@ struct ceph_msg_data {
108}; 108};
109 109
110struct ceph_msg_data_cursor { 110struct ceph_msg_data_cursor {
111 struct ceph_msg_data *data; /* data item this describes */ 111 size_t total_resid; /* across all data items */
112 struct list_head *data_head; /* = &ceph_msg->data */
113
114 struct ceph_msg_data *data; /* current data item */
112 size_t resid; /* bytes not yet consumed */ 115 size_t resid; /* bytes not yet consumed */
113 bool last_piece; /* current is last piece */ 116 bool last_piece; /* current is last piece */
114 bool need_crc; /* crc update needed */ 117 bool need_crc; /* crc update needed */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 8bfe7d34bc85..84703e550c26 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -734,7 +734,7 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
734 BUG_ON(!bio); 734 BUG_ON(!bio);
735 BUG_ON(!bio->bi_vcnt); 735 BUG_ON(!bio->bi_vcnt);
736 736
737 cursor->resid = length; 737 cursor->resid = min(length, data->bio_length);
738 cursor->bio = bio; 738 cursor->bio = bio;
739 cursor->vector_index = 0; 739 cursor->vector_index = 0;
740 cursor->vector_offset = 0; 740 cursor->vector_offset = 0;
@@ -833,9 +833,8 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
833 833
834 BUG_ON(!data->pages); 834 BUG_ON(!data->pages);
835 BUG_ON(!data->length); 835 BUG_ON(!data->length);
836 BUG_ON(length > data->length); /* short reads are OK */
837 836
838 cursor->resid = length; 837 cursor->resid = min(length, data->length);
839 page_count = calc_pages_for(data->alignment, (u64)data->length); 838 page_count = calc_pages_for(data->alignment, (u64)data->length);
840 cursor->page_offset = data->alignment & ~PAGE_MASK; 839 cursor->page_offset = data->alignment & ~PAGE_MASK;
841 cursor->page_index = 0; 840 cursor->page_index = 0;
@@ -904,7 +903,6 @@ ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
904 903
905 pagelist = data->pagelist; 904 pagelist = data->pagelist;
906 BUG_ON(!pagelist); 905 BUG_ON(!pagelist);
907 BUG_ON(length > pagelist->length); /* short reads are OK */
908 906
909 if (!length) 907 if (!length)
910 return; /* pagelist can be assigned but empty */ 908 return; /* pagelist can be assigned but empty */
@@ -912,7 +910,7 @@ ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
912 BUG_ON(list_empty(&pagelist->head)); 910 BUG_ON(list_empty(&pagelist->head));
913 page = list_first_entry(&pagelist->head, struct page, lru); 911 page = list_first_entry(&pagelist->head, struct page, lru);
914 912
915 cursor->resid = length; 913 cursor->resid = min(length, pagelist->length);
916 cursor->page = page; 914 cursor->page = page;
917 cursor->offset = 0; 915 cursor->offset = 0;
918 cursor->last_piece = length <= PAGE_SIZE; 916 cursor->last_piece = length <= PAGE_SIZE;
@@ -982,13 +980,10 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
982 * be processed in that piece. It also tracks whether the current 980 * be processed in that piece. It also tracks whether the current
983 * piece is the last one in the data item. 981 * piece is the last one in the data item.
984 */ 982 */
985static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length) 983static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
986{ 984{
987 struct ceph_msg_data_cursor *cursor = &msg->cursor; 985 size_t length = cursor->total_resid;
988 struct ceph_msg_data *data;
989 986
990 data = list_first_entry(&msg->data, struct ceph_msg_data, links);
991 cursor->data = data;
992 switch (cursor->data->type) { 987 switch (cursor->data->type) {
993 case CEPH_MSG_DATA_PAGELIST: 988 case CEPH_MSG_DATA_PAGELIST:
994 ceph_msg_data_pagelist_cursor_init(cursor, length); 989 ceph_msg_data_pagelist_cursor_init(cursor, length);
@@ -1009,6 +1004,25 @@ static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
1009 cursor->need_crc = true; 1004 cursor->need_crc = true;
1010} 1005}
1011 1006
1007static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
1008{
1009 struct ceph_msg_data_cursor *cursor = &msg->cursor;
1010 struct ceph_msg_data *data;
1011
1012 BUG_ON(!length);
1013 BUG_ON(length > msg->data_length);
1014 BUG_ON(list_empty(&msg->data));
1015
1016 data = list_first_entry(&msg->data, struct ceph_msg_data, links);
1017
1018 cursor->data_head = &msg->data;
1019 cursor->total_resid = length;
1020 data = list_first_entry(&msg->data, struct ceph_msg_data, links);
1021 cursor->data = data;
1022
1023 __ceph_msg_data_cursor_init(cursor);
1024}
1025
1012/* 1026/*
1013 * Return the page containing the next piece to process for a given 1027 * Return the page containing the next piece to process for a given
1014 * data item, and supply the page offset and length of that piece. 1028 * data item, and supply the page offset and length of that piece.
@@ -1073,8 +1087,16 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
1073 BUG(); 1087 BUG();
1074 break; 1088 break;
1075 } 1089 }
1090 cursor->total_resid -= bytes;
1076 cursor->need_crc = new_piece; 1091 cursor->need_crc = new_piece;
1077 1092
1093 if (!cursor->resid && cursor->total_resid) {
1094 WARN_ON(!cursor->last_piece);
1095 BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
1096 cursor->data = list_entry_next(cursor->data, links);
1097 __ceph_msg_data_cursor_init(cursor);
1098 }
1099
1078 return new_piece; 1100 return new_piece;
1079} 1101}
1080 1102
@@ -2990,8 +3012,6 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
2990 3012
2991 BUG_ON(!pages); 3013 BUG_ON(!pages);
2992 BUG_ON(!length); 3014 BUG_ON(!length);
2993 BUG_ON(msg->data_length);
2994 BUG_ON(!list_empty(&msg->data));
2995 3015
2996 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES); 3016 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
2997 BUG_ON(!data); 3017 BUG_ON(!data);
@@ -3012,8 +3032,6 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
3012 3032
3013 BUG_ON(!pagelist); 3033 BUG_ON(!pagelist);
3014 BUG_ON(!pagelist->length); 3034 BUG_ON(!pagelist->length);
3015 BUG_ON(msg->data_length);
3016 BUG_ON(!list_empty(&msg->data));
3017 3035
3018 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST); 3036 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
3019 BUG_ON(!data); 3037 BUG_ON(!data);
@@ -3031,8 +3049,6 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
3031 struct ceph_msg_data *data; 3049 struct ceph_msg_data *data;
3032 3050
3033 BUG_ON(!bio); 3051 BUG_ON(!bio);
3034 BUG_ON(msg->data_length);
3035 BUG_ON(!list_empty(&msg->data));
3036 3052
3037 data = ceph_msg_data_create(CEPH_MSG_DATA_BIO); 3053 data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
3038 BUG_ON(!data); 3054 BUG_ON(!data);