aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-03-07 00:39:39 -0500
committerSage Weil <sage@inktank.com>2013-05-02 00:16:56 -0400
commitfe38a2b67bc6b3a60da82a23e9082256a30e39d9 (patch)
tree956555bf1b5ea405d77045bd9d7743879d60438e /net
parent437945094fed0deb1810e8da95465c8f26bc6f80 (diff)
libceph: start defining message data cursor
This patch lays out the foundation for using generic routines to manage processing items of message data. For simplicity, we'll start with just the trail portion of a message, because it stands alone and is only present for outgoing data. First some basic concepts. We'll use the term "data item" to represent one of the ceph_msg_data structures associated with a message. There are currently four of those, with single-letter field names p, l, b, and t. A data item is further broken into "pieces" which always lie in a single page. A data item will include a "cursor" that will track state as the memory defined by the item is consumed by sending data from or receiving data into it. We define three routines to manipulate a data item's cursor: the "init" routine; the "next" routine; and the "advance" routine. The "init" routine initializes the cursor so it points at the beginning of the first piece in the item. The "next" routine returns the page, page offset, and length (limited by both the page and item size) of the next unconsumed piece in the item. It also indicates to the caller whether the piece being returned is the last one in the data item. The "advance" routine consumes the requested number of bytes in the item (advancing the cursor). This is used to record the number of bytes from the current piece that were actually sent or received by the network code. It returns an indication of whether the result means the current piece has been fully consumed. This is used by the message send code to determine whether it should calculate the CRC for the next piece processed. The trail of a message is implemented as a ceph pagelist. The routines defined for it will be usable for non-trail pagelist data as well. Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'net')
-rw-r--r--net/ceph/messenger.c138
1 files changed, 128 insertions, 10 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index f256b4b174ad..b978cf8b27ff 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -21,6 +21,9 @@
21#include <linux/ceph/pagelist.h> 21#include <linux/ceph/pagelist.h>
22#include <linux/export.h> 22#include <linux/export.h>
23 23
24#define list_entry_next(pos, member) \
25 list_entry(pos->member.next, typeof(*pos), member)
26
24/* 27/*
25 * Ceph uses the messenger to exchange ceph_msg messages with other 28 * Ceph uses the messenger to exchange ceph_msg messages with other
26 * hosts in the system. The messenger provides ordered and reliable 29 * hosts in the system. The messenger provides ordered and reliable
@@ -738,6 +741,109 @@ static void iter_bio_next(struct bio **bio_iter, unsigned int *seg)
738} 741}
739#endif 742#endif
740 743
744/*
745 * Message data is handled (sent or received) in pieces, where each
746 * piece resides on a single page. The network layer might not
747 * consume an entire piece at once. A data item's cursor keeps
748 * track of which piece is next to process and how much remains to
749 * be processed in that piece. It also tracks whether the current
750 * piece is the last one in the data item.
751 */
752static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
753{
754 struct ceph_msg_data_cursor *cursor = &data->cursor;
755 struct ceph_pagelist *pagelist;
756 struct page *page;
757
758 if (data->type != CEPH_MSG_DATA_PAGELIST)
759 return;
760
761 pagelist = data->pagelist;
762 BUG_ON(!pagelist);
763 if (!pagelist->length)
764 return; /* pagelist can be assigned but empty */
765
766 BUG_ON(list_empty(&pagelist->head));
767 page = list_first_entry(&pagelist->head, struct page, lru);
768
769 cursor->page = page;
770 cursor->offset = 0;
771 cursor->last_piece = pagelist->length <= PAGE_SIZE;
772}
773
774/*
775 * Return the page containing the next piece to process for a given
776 * data item, and supply the page offset and length of that piece.
777 * Indicate whether this is the last piece in this data item.
778 */
779static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
780 size_t *page_offset,
781 size_t *length,
782 bool *last_piece)
783{
784 struct ceph_msg_data_cursor *cursor = &data->cursor;
785 struct ceph_pagelist *pagelist;
786 size_t piece_end;
787
788 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
789
790 pagelist = data->pagelist;
791 BUG_ON(!pagelist);
792
793 BUG_ON(!cursor->page);
794 BUG_ON(cursor->offset >= pagelist->length);
795
796 *last_piece = cursor->last_piece;
797 if (*last_piece) {
798 /* pagelist offset is always 0 */
799 piece_end = pagelist->length & ~PAGE_MASK;
800 if (!piece_end)
801 piece_end = PAGE_SIZE;
802 } else {
803 piece_end = PAGE_SIZE;
804 }
805 *page_offset = cursor->offset & ~PAGE_MASK;
806 *length = piece_end - *page_offset;
807
808 return data->cursor.page;
809}
810
811/*
812 * Returns true if the result moves the cursor on to the next piece
813 * (the next page) of the pagelist.
814 */
815static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
816{
817 struct ceph_msg_data_cursor *cursor = &data->cursor;
818 struct ceph_pagelist *pagelist;
819
820 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
821
822 pagelist = data->pagelist;
823 BUG_ON(!pagelist);
824 BUG_ON(!cursor->page);
825 BUG_ON(cursor->offset + bytes > pagelist->length);
826 BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
827
828 /* Advance the cursor offset */
829
830 cursor->offset += bytes;
831 /* pagelist offset is always 0 */
832 if (!bytes || cursor->offset & ~PAGE_MASK)
833 return false; /* more bytes to process in the current page */
834
835 /* Move on to the next page */
836
837 BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
838 cursor->page = list_entry_next(cursor->page, lru);
839
840 /* cursor offset is at page boundary; pagelist offset is always 0 */
841 if (pagelist->length - cursor->offset <= PAGE_SIZE)
842 cursor->last_piece = true;
843
844 return true;
845}
846
741static void prepare_message_data(struct ceph_msg *msg, 847static void prepare_message_data(struct ceph_msg *msg,
742 struct ceph_msg_pos *msg_pos) 848 struct ceph_msg_pos *msg_pos)
743{ 849{
@@ -755,6 +861,12 @@ static void prepare_message_data(struct ceph_msg *msg,
755 init_bio_iter(msg->b.bio, &msg->b.bio_iter, &msg->b.bio_seg); 861 init_bio_iter(msg->b.bio, &msg->b.bio_iter, &msg->b.bio_seg);
756#endif 862#endif
757 msg_pos->data_pos = 0; 863 msg_pos->data_pos = 0;
864
865 /* If there's a trail, initialize its cursor */
866
867 if (ceph_msg_has_trail(msg))
868 ceph_msg_data_cursor_init(&msg->t);
869
758 msg_pos->did_page_crc = false; 870 msg_pos->did_page_crc = false;
759} 871}
760 872
@@ -1045,6 +1157,12 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
1045 1157
1046 msg_pos->data_pos += sent; 1158 msg_pos->data_pos += sent;
1047 msg_pos->page_pos += sent; 1159 msg_pos->page_pos += sent;
1160 if (in_trail) {
1161 bool need_crc;
1162
1163 need_crc = ceph_msg_data_advance(&msg->t, sent);
1164 BUG_ON(need_crc && sent != len);
1165 }
1048 if (sent < len) 1166 if (sent < len)
1049 return; 1167 return;
1050 1168
@@ -1052,10 +1170,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
1052 msg_pos->page_pos = 0; 1170 msg_pos->page_pos = 0;
1053 msg_pos->page++; 1171 msg_pos->page++;
1054 msg_pos->did_page_crc = false; 1172 msg_pos->did_page_crc = false;
1055 if (in_trail) { 1173 if (ceph_msg_has_pagelist(msg)) {
1056 BUG_ON(!ceph_msg_has_trail(msg));
1057 list_rotate_left(&msg->t.pagelist->head);
1058 } else if (ceph_msg_has_pagelist(msg)) {
1059 list_rotate_left(&msg->l.pagelist->head); 1174 list_rotate_left(&msg->l.pagelist->head);
1060#ifdef CONFIG_BLOCK 1175#ifdef CONFIG_BLOCK
1061 } else if (ceph_msg_has_bio(msg)) { 1176 } else if (ceph_msg_has_bio(msg)) {
@@ -1141,6 +1256,8 @@ static int write_partial_message_data(struct ceph_connection *con)
1141 size_t length; 1256 size_t length;
1142 int max_write = PAGE_SIZE; 1257 int max_write = PAGE_SIZE;
1143 int bio_offset = 0; 1258 int bio_offset = 0;
1259 bool use_cursor = false;
1260 bool last_piece = true; /* preserve existing behavior */
1144 1261
1145 in_trail = in_trail || msg_pos->data_pos >= trail_off; 1262 in_trail = in_trail || msg_pos->data_pos >= trail_off;
1146 if (!in_trail) 1263 if (!in_trail)
@@ -1148,9 +1265,9 @@ static int write_partial_message_data(struct ceph_connection *con)
1148 1265
1149 if (in_trail) { 1266 if (in_trail) {
1150 BUG_ON(!ceph_msg_has_trail(msg)); 1267 BUG_ON(!ceph_msg_has_trail(msg));
1151 total_max_write = data_len - msg_pos->data_pos; 1268 use_cursor = true;
1152 page = list_first_entry(&msg->t.pagelist->head, 1269 page = ceph_msg_data_next(&msg->t, &page_offset,
1153 struct page, lru); 1270 &length, &last_piece);
1154 } else if (ceph_msg_has_pages(msg)) { 1271 } else if (ceph_msg_has_pages(msg)) {
1155 page = msg->p.pages[msg_pos->page]; 1272 page = msg->p.pages[msg_pos->page];
1156 } else if (ceph_msg_has_pagelist(msg)) { 1273 } else if (ceph_msg_has_pagelist(msg)) {
@@ -1168,8 +1285,9 @@ static int write_partial_message_data(struct ceph_connection *con)
1168 } else { 1285 } else {
1169 page = zero_page; 1286 page = zero_page;
1170 } 1287 }
1171 length = min_t(int, max_write - msg_pos->page_pos, 1288 if (!use_cursor)
1172 total_max_write); 1289 length = min_t(int, max_write - msg_pos->page_pos,
1290 total_max_write);
1173 1291
1174 page_offset = msg_pos->page_pos + bio_offset; 1292 page_offset = msg_pos->page_pos + bio_offset;
1175 if (do_datacrc && !msg_pos->did_page_crc) { 1293 if (do_datacrc && !msg_pos->did_page_crc) {
@@ -1180,7 +1298,7 @@ static int write_partial_message_data(struct ceph_connection *con)
1180 msg_pos->did_page_crc = true; 1298 msg_pos->did_page_crc = true;
1181 } 1299 }
1182 ret = ceph_tcp_sendpage(con->sock, page, page_offset, 1300 ret = ceph_tcp_sendpage(con->sock, page, page_offset,
1183 length, true); 1301 length, last_piece);
1184 if (ret <= 0) 1302 if (ret <= 0)
1185 goto out; 1303 goto out;
1186 1304