aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-03-12 00:34:22 -0400
committerSage Weil <sage@inktank.com>2013-05-02 00:17:24 -0400
commit25aff7c559c8b54a810bc094d59fe037cfed6b18 (patch)
tree93db1b7a4941ab629abc36e31f8af51aa134aa3e /net
parent28a89ddece39890c255a0c41baf622731a08c288 (diff)
libceph: record residual bytes for all message data types
All of the data types can use this, not just the page array. Until now, only the bio type doesn't have it available, and only the initiator of the request (the rbd client) is able to supply the length of the full request without re-scanning the bio list. Change the cursor init routines so the length is supplied based on the message header "data_len" field, and use that length to intiialize the "resid" field of the cursor. In addition, change the way "last_piece" is defined so it is based on the residual number of bytes in the original request. This is necessary (at least for bio messages) because it is possible for a read request to succeed without consuming all of the space available in the data buffer. This resolves: http://tracker.ceph.com/issues/4427 Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'net')
-rw-r--r--net/ceph/messenger.c111
1 files changed, 62 insertions, 49 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 95f90b01f753..0ac4f6cb7339 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -745,7 +745,8 @@ static void iter_bio_next(struct bio **bio_iter, unsigned int *seg)
745 * entry in the current bio iovec, or the first entry in the next 745 * entry in the current bio iovec, or the first entry in the next
746 * bio in the list. 746 * bio in the list.
747 */ 747 */
748static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data) 748static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data,
749 size_t length)
749{ 750{
750 struct ceph_msg_data_cursor *cursor = &data->cursor; 751 struct ceph_msg_data_cursor *cursor = &data->cursor;
751 struct bio *bio; 752 struct bio *bio;
@@ -755,12 +756,12 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data)
755 bio = data->bio; 756 bio = data->bio;
756 BUG_ON(!bio); 757 BUG_ON(!bio);
757 BUG_ON(!bio->bi_vcnt); 758 BUG_ON(!bio->bi_vcnt);
758 /* resid = bio->bi_size */
759 759
760 cursor->resid = length;
760 cursor->bio = bio; 761 cursor->bio = bio;
761 cursor->vector_index = 0; 762 cursor->vector_index = 0;
762 cursor->vector_offset = 0; 763 cursor->vector_offset = 0;
763 cursor->last_piece = !bio->bi_next && bio->bi_vcnt == 1; 764 cursor->last_piece = length <= bio->bi_io_vec[0].bv_len;
764} 765}
765 766
766static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data, 767static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
@@ -784,8 +785,12 @@ static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
784 BUG_ON(cursor->vector_offset >= bio_vec->bv_len); 785 BUG_ON(cursor->vector_offset >= bio_vec->bv_len);
785 *page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset); 786 *page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset);
786 BUG_ON(*page_offset >= PAGE_SIZE); 787 BUG_ON(*page_offset >= PAGE_SIZE);
787 *length = (size_t) (bio_vec->bv_len - cursor->vector_offset); 788 if (cursor->last_piece) /* pagelist offset is always 0 */
789 *length = cursor->resid;
790 else
791 *length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
788 BUG_ON(*length > PAGE_SIZE); 792 BUG_ON(*length > PAGE_SIZE);
793 BUG_ON(*length > cursor->resid);
789 794
790 return bio_vec->bv_page; 795 return bio_vec->bv_page;
791} 796}
@@ -805,26 +810,33 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
805 index = cursor->vector_index; 810 index = cursor->vector_index;
806 BUG_ON(index >= (unsigned int) bio->bi_vcnt); 811 BUG_ON(index >= (unsigned int) bio->bi_vcnt);
807 bio_vec = &bio->bi_io_vec[index]; 812 bio_vec = &bio->bi_io_vec[index];
808 BUG_ON(cursor->vector_offset + bytes > bio_vec->bv_len);
809 813
810 /* Advance the cursor offset */ 814 /* Advance the cursor offset */
811 815
816 BUG_ON(cursor->resid < bytes);
817 cursor->resid -= bytes;
812 cursor->vector_offset += bytes; 818 cursor->vector_offset += bytes;
813 if (cursor->vector_offset < bio_vec->bv_len) 819 if (cursor->vector_offset < bio_vec->bv_len)
814 return false; /* more bytes to process in this segment */ 820 return false; /* more bytes to process in this segment */
821 BUG_ON(cursor->vector_offset != bio_vec->bv_len);
815 822
816 /* Move on to the next segment, and possibly the next bio */ 823 /* Move on to the next segment, and possibly the next bio */
817 824
818 if (++cursor->vector_index == (unsigned int) bio->bi_vcnt) { 825 if (++index == (unsigned int) bio->bi_vcnt) {
819 bio = bio->bi_next; 826 bio = bio->bi_next;
820 cursor->bio = bio; 827 index = 0;
821 cursor->vector_index = 0;
822 } 828 }
829 cursor->bio = bio;
830 cursor->vector_index = index;
823 cursor->vector_offset = 0; 831 cursor->vector_offset = 0;
824 832
825 if (!cursor->last_piece && bio && !bio->bi_next) 833 if (!cursor->last_piece) {
826 if (cursor->vector_index == (unsigned int) bio->bi_vcnt - 1) 834 BUG_ON(!cursor->resid);
835 BUG_ON(!bio);
836 /* A short read is OK, so use <= rather than == */
837 if (cursor->resid <= bio->bi_io_vec[index].bv_len)
827 cursor->last_piece = true; 838 cursor->last_piece = true;
839 }
828 840
829 return true; 841 return true;
830} 842}
@@ -834,7 +846,8 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
834 * For a page array, a piece comes from the first page in the array 846 * For a page array, a piece comes from the first page in the array
835 * that has not already been fully consumed. 847 * that has not already been fully consumed.
836 */ 848 */
837static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data) 849static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data,
850 size_t length)
838{ 851{
839 struct ceph_msg_data_cursor *cursor = &data->cursor; 852 struct ceph_msg_data_cursor *cursor = &data->cursor;
840 int page_count; 853 int page_count;
@@ -843,14 +856,15 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data)
843 856
844 BUG_ON(!data->pages); 857 BUG_ON(!data->pages);
845 BUG_ON(!data->length); 858 BUG_ON(!data->length);
859 BUG_ON(length != data->length);
846 860
861 cursor->resid = length;
847 page_count = calc_pages_for(data->alignment, (u64)data->length); 862 page_count = calc_pages_for(data->alignment, (u64)data->length);
848 BUG_ON(page_count > (int) USHRT_MAX);
849 cursor->resid = data->length;
850 cursor->page_offset = data->alignment & ~PAGE_MASK; 863 cursor->page_offset = data->alignment & ~PAGE_MASK;
851 cursor->page_index = 0; 864 cursor->page_index = 0;
865 BUG_ON(page_count > (int) USHRT_MAX);
852 cursor->page_count = (unsigned short) page_count; 866 cursor->page_count = (unsigned short) page_count;
853 cursor->last_piece = cursor->page_count == 1; 867 cursor->last_piece = length <= PAGE_SIZE;
854} 868}
855 869
856static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data, 870static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
@@ -863,15 +877,12 @@ static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
863 877
864 BUG_ON(cursor->page_index >= cursor->page_count); 878 BUG_ON(cursor->page_index >= cursor->page_count);
865 BUG_ON(cursor->page_offset >= PAGE_SIZE); 879 BUG_ON(cursor->page_offset >= PAGE_SIZE);
866 BUG_ON(!cursor->resid);
867 880
868 *page_offset = cursor->page_offset; 881 *page_offset = cursor->page_offset;
869 if (cursor->last_piece) { 882 if (cursor->last_piece)
870 BUG_ON(*page_offset + cursor->resid > PAGE_SIZE);
871 *length = cursor->resid; 883 *length = cursor->resid;
872 } else { 884 else
873 *length = PAGE_SIZE - *page_offset; 885 *length = PAGE_SIZE - *page_offset;
874 }
875 886
876 return data->pages[cursor->page_index]; 887 return data->pages[cursor->page_index];
877} 888}
@@ -884,7 +895,6 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
884 BUG_ON(data->type != CEPH_MSG_DATA_PAGES); 895 BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
885 896
886 BUG_ON(cursor->page_offset + bytes > PAGE_SIZE); 897 BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
887 BUG_ON(bytes > cursor->resid);
888 898
889 /* Advance the cursor page offset */ 899 /* Advance the cursor page offset */
890 900
@@ -898,7 +908,7 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
898 BUG_ON(cursor->page_index >= cursor->page_count); 908 BUG_ON(cursor->page_index >= cursor->page_count);
899 cursor->page_offset = 0; 909 cursor->page_offset = 0;
900 cursor->page_index++; 910 cursor->page_index++;
901 cursor->last_piece = cursor->page_index == cursor->page_count - 1; 911 cursor->last_piece = cursor->resid <= PAGE_SIZE;
902 912
903 return true; 913 return true;
904} 914}
@@ -907,7 +917,8 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
907 * For a pagelist, a piece is whatever remains to be consumed in the 917 * For a pagelist, a piece is whatever remains to be consumed in the
908 * first page in the list, or the front of the next page. 918 * first page in the list, or the front of the next page.
909 */ 919 */
910static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data) 920static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data,
921 size_t length)
911{ 922{
912 struct ceph_msg_data_cursor *cursor = &data->cursor; 923 struct ceph_msg_data_cursor *cursor = &data->cursor;
913 struct ceph_pagelist *pagelist; 924 struct ceph_pagelist *pagelist;
@@ -917,15 +928,18 @@ static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data)
917 928
918 pagelist = data->pagelist; 929 pagelist = data->pagelist;
919 BUG_ON(!pagelist); 930 BUG_ON(!pagelist);
920 if (!pagelist->length) 931 BUG_ON(length != pagelist->length);
932
933 if (!length)
921 return; /* pagelist can be assigned but empty */ 934 return; /* pagelist can be assigned but empty */
922 935
923 BUG_ON(list_empty(&pagelist->head)); 936 BUG_ON(list_empty(&pagelist->head));
924 page = list_first_entry(&pagelist->head, struct page, lru); 937 page = list_first_entry(&pagelist->head, struct page, lru);
925 938
939 cursor->resid = length;
926 cursor->page = page; 940 cursor->page = page;
927 cursor->offset = 0; 941 cursor->offset = 0;
928 cursor->last_piece = pagelist->length <= PAGE_SIZE; 942 cursor->last_piece = length <= PAGE_SIZE;
929} 943}
930 944
931static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data, 945static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
@@ -934,7 +948,6 @@ static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
934{ 948{
935 struct ceph_msg_data_cursor *cursor = &data->cursor; 949 struct ceph_msg_data_cursor *cursor = &data->cursor;
936 struct ceph_pagelist *pagelist; 950 struct ceph_pagelist *pagelist;
937 size_t piece_end;
938 951
939 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); 952 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
940 953
@@ -942,18 +955,13 @@ static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
942 BUG_ON(!pagelist); 955 BUG_ON(!pagelist);
943 956
944 BUG_ON(!cursor->page); 957 BUG_ON(!cursor->page);
945 BUG_ON(cursor->offset >= pagelist->length); 958 BUG_ON(cursor->offset + cursor->resid != pagelist->length);
946 959
947 if (cursor->last_piece) {
948 /* pagelist offset is always 0 */
949 piece_end = pagelist->length & ~PAGE_MASK;
950 if (!piece_end)
951 piece_end = PAGE_SIZE;
952 } else {
953 piece_end = PAGE_SIZE;
954 }
955 *page_offset = cursor->offset & ~PAGE_MASK; 960 *page_offset = cursor->offset & ~PAGE_MASK;
956 *length = piece_end - *page_offset; 961 if (cursor->last_piece) /* pagelist offset is always 0 */
962 *length = cursor->resid;
963 else
964 *length = PAGE_SIZE - *page_offset;
957 965
958 return data->cursor.page; 966 return data->cursor.page;
959} 967}
@@ -968,12 +976,13 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
968 976
969 pagelist = data->pagelist; 977 pagelist = data->pagelist;
970 BUG_ON(!pagelist); 978 BUG_ON(!pagelist);
971 BUG_ON(!cursor->page); 979
972 BUG_ON(cursor->offset + bytes > pagelist->length); 980 BUG_ON(cursor->offset + cursor->resid != pagelist->length);
973 BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE); 981 BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
974 982
975 /* Advance the cursor offset */ 983 /* Advance the cursor offset */
976 984
985 cursor->resid -= bytes;
977 cursor->offset += bytes; 986 cursor->offset += bytes;
978 /* pagelist offset is always 0 */ 987 /* pagelist offset is always 0 */
979 if (!bytes || cursor->offset & ~PAGE_MASK) 988 if (!bytes || cursor->offset & ~PAGE_MASK)
@@ -983,10 +992,7 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
983 992
984 BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head)); 993 BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
985 cursor->page = list_entry_next(cursor->page, lru); 994 cursor->page = list_entry_next(cursor->page, lru);
986 995 cursor->last_piece = cursor->resid <= PAGE_SIZE;
987 /* cursor offset is at page boundary; pagelist offset is always 0 */
988 if (pagelist->length - cursor->offset <= PAGE_SIZE)
989 cursor->last_piece = true;
990 996
991 return true; 997 return true;
992} 998}
@@ -999,18 +1005,19 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
999 * be processed in that piece. It also tracks whether the current 1005 * be processed in that piece. It also tracks whether the current
1000 * piece is the last one in the data item. 1006 * piece is the last one in the data item.
1001 */ 1007 */
1002static void ceph_msg_data_cursor_init(struct ceph_msg_data *data) 1008static void ceph_msg_data_cursor_init(struct ceph_msg_data *data,
1009 size_t length)
1003{ 1010{
1004 switch (data->type) { 1011 switch (data->type) {
1005 case CEPH_MSG_DATA_PAGELIST: 1012 case CEPH_MSG_DATA_PAGELIST:
1006 ceph_msg_data_pagelist_cursor_init(data); 1013 ceph_msg_data_pagelist_cursor_init(data, length);
1007 break; 1014 break;
1008 case CEPH_MSG_DATA_PAGES: 1015 case CEPH_MSG_DATA_PAGES:
1009 ceph_msg_data_pages_cursor_init(data); 1016 ceph_msg_data_pages_cursor_init(data, length);
1010 break; 1017 break;
1011#ifdef CONFIG_BLOCK 1018#ifdef CONFIG_BLOCK
1012 case CEPH_MSG_DATA_BIO: 1019 case CEPH_MSG_DATA_BIO:
1013 ceph_msg_data_bio_cursor_init(data); 1020 ceph_msg_data_bio_cursor_init(data, length);
1014 break; 1021 break;
1015#endif /* CONFIG_BLOCK */ 1022#endif /* CONFIG_BLOCK */
1016 case CEPH_MSG_DATA_NONE: 1023 case CEPH_MSG_DATA_NONE:
@@ -1064,8 +1071,10 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
1064 */ 1071 */
1065static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes) 1072static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
1066{ 1073{
1074 struct ceph_msg_data_cursor *cursor = &data->cursor;
1067 bool new_piece; 1075 bool new_piece;
1068 1076
1077 BUG_ON(bytes > cursor->resid);
1069 switch (data->type) { 1078 switch (data->type) {
1070 case CEPH_MSG_DATA_PAGELIST: 1079 case CEPH_MSG_DATA_PAGELIST:
1071 new_piece = ceph_msg_data_pagelist_advance(data, bytes); 1080 new_piece = ceph_msg_data_pagelist_advance(data, bytes);
@@ -1090,8 +1099,12 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
1090static void prepare_message_data(struct ceph_msg *msg, 1099static void prepare_message_data(struct ceph_msg *msg,
1091 struct ceph_msg_pos *msg_pos) 1100 struct ceph_msg_pos *msg_pos)
1092{ 1101{
1102 size_t data_len;
1103
1093 BUG_ON(!msg); 1104 BUG_ON(!msg);
1094 BUG_ON(!msg->hdr.data_len); 1105
1106 data_len = le32_to_cpu(msg->hdr.data_len);
1107 BUG_ON(!data_len);
1095 1108
1096 /* initialize page iterator */ 1109 /* initialize page iterator */
1097 msg_pos->page = 0; 1110 msg_pos->page = 0;
@@ -1109,12 +1122,12 @@ static void prepare_message_data(struct ceph_msg *msg,
1109 1122
1110#ifdef CONFIG_BLOCK 1123#ifdef CONFIG_BLOCK
1111 if (ceph_msg_has_bio(msg)) 1124 if (ceph_msg_has_bio(msg))
1112 ceph_msg_data_cursor_init(&msg->b); 1125 ceph_msg_data_cursor_init(&msg->b, data_len);
1113#endif /* CONFIG_BLOCK */ 1126#endif /* CONFIG_BLOCK */
1114 if (ceph_msg_has_pages(msg)) 1127 if (ceph_msg_has_pages(msg))
1115 ceph_msg_data_cursor_init(&msg->p); 1128 ceph_msg_data_cursor_init(&msg->p, data_len);
1116 if (ceph_msg_has_pagelist(msg)) 1129 if (ceph_msg_has_pagelist(msg))
1117 ceph_msg_data_cursor_init(&msg->l); 1130 ceph_msg_data_cursor_init(&msg->l, data_len);
1118 1131
1119 msg_pos->did_page_crc = false; 1132 msg_pos->did_page_crc = false;
1120} 1133}