aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/linux/ceph/messenger.h8
-rw-r--r--net/ceph/messenger.c117
2 files changed, 99 insertions, 26 deletions
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 14862438faff..716c3fdeb257 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -97,8 +97,12 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
97 97
98struct ceph_msg_data_cursor { 98struct ceph_msg_data_cursor {
99 bool last_piece; /* now at last piece of data item */ 99 bool last_piece; /* now at last piece of data item */
100 struct page *page; /* current page in pagelist */ 100 union {
101 size_t offset; /* pagelist bytes consumed */ 101 struct { /* pagelist */
102 struct page *page; /* page from list */
103 size_t offset; /* bytes from list */
104 };
105 };
102}; 106};
103 107
104struct ceph_msg_data { 108struct ceph_msg_data {
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index b978cf8b27ff..4cc27a136e35 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -742,21 +742,16 @@ static void iter_bio_next(struct bio **bio_iter, unsigned int *seg)
742#endif 742#endif
743 743
744/* 744/*
745 * Message data is handled (sent or received) in pieces, where each 745 * For a pagelist, a piece is whatever remains to be consumed in the
746 * piece resides on a single page. The network layer might not 746 * first page in the list, or the front of the next page.
747 * consume an entire piece at once. A data item's cursor keeps
748 * track of which piece is next to process and how much remains to
749 * be processed in that piece. It also tracks whether the current
750 * piece is the last one in the data item.
751 */ 747 */
752static void ceph_msg_data_cursor_init(struct ceph_msg_data *data) 748static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data)
753{ 749{
754 struct ceph_msg_data_cursor *cursor = &data->cursor; 750 struct ceph_msg_data_cursor *cursor = &data->cursor;
755 struct ceph_pagelist *pagelist; 751 struct ceph_pagelist *pagelist;
756 struct page *page; 752 struct page *page;
757 753
758 if (data->type != CEPH_MSG_DATA_PAGELIST) 754 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
759 return;
760 755
761 pagelist = data->pagelist; 756 pagelist = data->pagelist;
762 BUG_ON(!pagelist); 757 BUG_ON(!pagelist);
@@ -771,15 +766,9 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
771 cursor->last_piece = pagelist->length <= PAGE_SIZE; 766 cursor->last_piece = pagelist->length <= PAGE_SIZE;
772} 767}
773 768
774/* 769static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
775 * Return the page containing the next piece to process for a given
776 * data item, and supply the page offset and length of that piece.
777 * Indicate whether this is the last piece in this data item.
778 */
779static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
780 size_t *page_offset, 770 size_t *page_offset,
781 size_t *length, 771 size_t *length)
782 bool *last_piece)
783{ 772{
784 struct ceph_msg_data_cursor *cursor = &data->cursor; 773 struct ceph_msg_data_cursor *cursor = &data->cursor;
785 struct ceph_pagelist *pagelist; 774 struct ceph_pagelist *pagelist;
@@ -793,8 +782,7 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
793 BUG_ON(!cursor->page); 782 BUG_ON(!cursor->page);
794 BUG_ON(cursor->offset >= pagelist->length); 783 BUG_ON(cursor->offset >= pagelist->length);
795 784
796 *last_piece = cursor->last_piece; 785 if (cursor->last_piece) {
797 if (*last_piece) {
798 /* pagelist offset is always 0 */ 786 /* pagelist offset is always 0 */
799 piece_end = pagelist->length & ~PAGE_MASK; 787 piece_end = pagelist->length & ~PAGE_MASK;
800 if (!piece_end) 788 if (!piece_end)
@@ -808,11 +796,8 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
808 return data->cursor.page; 796 return data->cursor.page;
809} 797}
810 798
811/* 799static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
812 * Returns true if the result moves the cursor on to the next piece 800 size_t bytes)
813 * (the next page) of the pagelist.
814 */
815static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
816{ 801{
817 struct ceph_msg_data_cursor *cursor = &data->cursor; 802 struct ceph_msg_data_cursor *cursor = &data->cursor;
818 struct ceph_pagelist *pagelist; 803 struct ceph_pagelist *pagelist;
@@ -844,6 +829,90 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
844 return true; 829 return true;
845} 830}
846 831
832/*
833 * Message data is handled (sent or received) in pieces, where each
834 * piece resides on a single page. The network layer might not
835 * consume an entire piece at once. A data item's cursor keeps
836 * track of which piece is next to process and how much remains to
837 * be processed in that piece. It also tracks whether the current
838 * piece is the last one in the data item.
839 */
840static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
841{
842 switch (data->type) {
843 case CEPH_MSG_DATA_PAGELIST:
844 ceph_msg_data_pagelist_cursor_init(data);
845 break;
846 case CEPH_MSG_DATA_NONE:
847 case CEPH_MSG_DATA_PAGES:
848#ifdef CONFIG_BLOCK
849 case CEPH_MSG_DATA_BIO:
850#endif /* CONFIG_BLOCK */
851 default:
852 /* BUG(); */
853 break;
854 }
855}
856
857/*
858 * Return the page containing the next piece to process for a given
859 * data item, and supply the page offset and length of that piece.
860 * Indicate whether this is the last piece in this data item.
861 */
862static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
863 size_t *page_offset,
864 size_t *length,
865 bool *last_piece)
866{
867 struct page *page;
868
869 switch (data->type) {
870 case CEPH_MSG_DATA_PAGELIST:
871 page = ceph_msg_data_pagelist_next(data, page_offset, length);
872 break;
873 case CEPH_MSG_DATA_NONE:
874 case CEPH_MSG_DATA_PAGES:
875#ifdef CONFIG_BLOCK
876 case CEPH_MSG_DATA_BIO:
877#endif /* CONFIG_BLOCK */
878 default:
879 page = NULL;
880 break;
881 }
882 BUG_ON(!page);
883 BUG_ON(*page_offset + *length > PAGE_SIZE);
884 BUG_ON(!*length);
885 if (last_piece)
886 *last_piece = data->cursor.last_piece;
887
888 return page;
889}
890
891/*
892 * Returns true if the result moves the cursor on to the next piece
893 * of the data item.
894 */
895static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
896{
897 bool new_piece;
898
899 switch (data->type) {
900 case CEPH_MSG_DATA_PAGELIST:
901 new_piece = ceph_msg_data_pagelist_advance(data, bytes);
902 break;
903 case CEPH_MSG_DATA_NONE:
904 case CEPH_MSG_DATA_PAGES:
905#ifdef CONFIG_BLOCK
906 case CEPH_MSG_DATA_BIO:
907#endif /* CONFIG_BLOCK */
908 default:
909 BUG();
910 break;
911 }
912
913 return new_piece;
914}
915
847static void prepare_message_data(struct ceph_msg *msg, 916static void prepare_message_data(struct ceph_msg *msg,
848 struct ceph_msg_pos *msg_pos) 917 struct ceph_msg_pos *msg_pos)
849{ 918{