aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c1019
1 files changed, 706 insertions, 313 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 2c0669fb54e3..eb0a46a49bd4 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -21,6 +21,9 @@
21#include <linux/ceph/pagelist.h> 21#include <linux/ceph/pagelist.h>
22#include <linux/export.h> 22#include <linux/export.h>
23 23
24#define list_entry_next(pos, member) \
25 list_entry(pos->member.next, typeof(*pos), member)
26
24/* 27/*
25 * Ceph uses the messenger to exchange ceph_msg messages with other 28 * Ceph uses the messenger to exchange ceph_msg messages with other
26 * hosts in the system. The messenger provides ordered and reliable 29 * hosts in the system. The messenger provides ordered and reliable
@@ -149,6 +152,11 @@ static bool con_flag_test_and_set(struct ceph_connection *con,
149 return test_and_set_bit(con_flag, &con->flags); 152 return test_and_set_bit(con_flag, &con->flags);
150} 153}
151 154
155/* Slab caches for frequently-allocated structures */
156
157static struct kmem_cache *ceph_msg_cache;
158static struct kmem_cache *ceph_msg_data_cache;
159
152/* static tag bytes (protocol control messages) */ 160/* static tag bytes (protocol control messages) */
153static char tag_msg = CEPH_MSGR_TAG_MSG; 161static char tag_msg = CEPH_MSGR_TAG_MSG;
154static char tag_ack = CEPH_MSGR_TAG_ACK; 162static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -223,6 +231,41 @@ static void encode_my_addr(struct ceph_messenger *msgr)
223 */ 231 */
224static struct workqueue_struct *ceph_msgr_wq; 232static struct workqueue_struct *ceph_msgr_wq;
225 233
234static int ceph_msgr_slab_init(void)
235{
236 BUG_ON(ceph_msg_cache);
237 ceph_msg_cache = kmem_cache_create("ceph_msg",
238 sizeof (struct ceph_msg),
239 __alignof__(struct ceph_msg), 0, NULL);
240
241 if (!ceph_msg_cache)
242 return -ENOMEM;
243
244 BUG_ON(ceph_msg_data_cache);
245 ceph_msg_data_cache = kmem_cache_create("ceph_msg_data",
246 sizeof (struct ceph_msg_data),
247 __alignof__(struct ceph_msg_data),
248 0, NULL);
249 if (ceph_msg_data_cache)
250 return 0;
251
252 kmem_cache_destroy(ceph_msg_cache);
253 ceph_msg_cache = NULL;
254
255 return -ENOMEM;
256}
257
258static void ceph_msgr_slab_exit(void)
259{
260 BUG_ON(!ceph_msg_data_cache);
261 kmem_cache_destroy(ceph_msg_data_cache);
262 ceph_msg_data_cache = NULL;
263
264 BUG_ON(!ceph_msg_cache);
265 kmem_cache_destroy(ceph_msg_cache);
266 ceph_msg_cache = NULL;
267}
268
226static void _ceph_msgr_exit(void) 269static void _ceph_msgr_exit(void)
227{ 270{
228 if (ceph_msgr_wq) { 271 if (ceph_msgr_wq) {
@@ -230,6 +273,8 @@ static void _ceph_msgr_exit(void)
230 ceph_msgr_wq = NULL; 273 ceph_msgr_wq = NULL;
231 } 274 }
232 275
276 ceph_msgr_slab_exit();
277
233 BUG_ON(zero_page == NULL); 278 BUG_ON(zero_page == NULL);
234 kunmap(zero_page); 279 kunmap(zero_page);
235 page_cache_release(zero_page); 280 page_cache_release(zero_page);
@@ -242,6 +287,9 @@ int ceph_msgr_init(void)
242 zero_page = ZERO_PAGE(0); 287 zero_page = ZERO_PAGE(0);
243 page_cache_get(zero_page); 288 page_cache_get(zero_page);
244 289
290 if (ceph_msgr_slab_init())
291 return -ENOMEM;
292
245 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0); 293 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
246 if (ceph_msgr_wq) 294 if (ceph_msgr_wq)
247 return 0; 295 return 0;
@@ -471,6 +519,22 @@ static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
471 return r; 519 return r;
472} 520}
473 521
522static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
523 int page_offset, size_t length)
524{
525 void *kaddr;
526 int ret;
527
528 BUG_ON(page_offset + length > PAGE_SIZE);
529
530 kaddr = kmap(page);
531 BUG_ON(!kaddr);
532 ret = ceph_tcp_recvmsg(sock, kaddr + page_offset, length);
533 kunmap(page);
534
535 return ret;
536}
537
474/* 538/*
475 * write something. @more is true if caller will be sending more data 539 * write something. @more is true if caller will be sending more data
476 * shortly. 540 * shortly.
@@ -493,7 +557,7 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
493} 557}
494 558
495static int ceph_tcp_sendpage(struct socket *sock, struct page *page, 559static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
496 int offset, size_t size, int more) 560 int offset, size_t size, bool more)
497{ 561{
498 int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR); 562 int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
499 int ret; 563 int ret;
@@ -697,50 +761,397 @@ static void con_out_kvec_add(struct ceph_connection *con,
697} 761}
698 762
699#ifdef CONFIG_BLOCK 763#ifdef CONFIG_BLOCK
700static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) 764
765/*
766 * For a bio data item, a piece is whatever remains of the next
767 * entry in the current bio iovec, or the first entry in the next
768 * bio in the list.
769 */
770static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
771 size_t length)
701{ 772{
702 if (!bio) { 773 struct ceph_msg_data *data = cursor->data;
703 *iter = NULL; 774 struct bio *bio;
704 *seg = 0; 775
705 return; 776 BUG_ON(data->type != CEPH_MSG_DATA_BIO);
777
778 bio = data->bio;
779 BUG_ON(!bio);
780 BUG_ON(!bio->bi_vcnt);
781
782 cursor->resid = min(length, data->bio_length);
783 cursor->bio = bio;
784 cursor->vector_index = 0;
785 cursor->vector_offset = 0;
786 cursor->last_piece = length <= bio->bi_io_vec[0].bv_len;
787}
788
789static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
790 size_t *page_offset,
791 size_t *length)
792{
793 struct ceph_msg_data *data = cursor->data;
794 struct bio *bio;
795 struct bio_vec *bio_vec;
796 unsigned int index;
797
798 BUG_ON(data->type != CEPH_MSG_DATA_BIO);
799
800 bio = cursor->bio;
801 BUG_ON(!bio);
802
803 index = cursor->vector_index;
804 BUG_ON(index >= (unsigned int) bio->bi_vcnt);
805
806 bio_vec = &bio->bi_io_vec[index];
807 BUG_ON(cursor->vector_offset >= bio_vec->bv_len);
808 *page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset);
809 BUG_ON(*page_offset >= PAGE_SIZE);
810 if (cursor->last_piece) /* pagelist offset is always 0 */
811 *length = cursor->resid;
812 else
813 *length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
814 BUG_ON(*length > cursor->resid);
815 BUG_ON(*page_offset + *length > PAGE_SIZE);
816
817 return bio_vec->bv_page;
818}
819
820static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
821 size_t bytes)
822{
823 struct bio *bio;
824 struct bio_vec *bio_vec;
825 unsigned int index;
826
827 BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
828
829 bio = cursor->bio;
830 BUG_ON(!bio);
831
832 index = cursor->vector_index;
833 BUG_ON(index >= (unsigned int) bio->bi_vcnt);
834 bio_vec = &bio->bi_io_vec[index];
835
836 /* Advance the cursor offset */
837
838 BUG_ON(cursor->resid < bytes);
839 cursor->resid -= bytes;
840 cursor->vector_offset += bytes;
841 if (cursor->vector_offset < bio_vec->bv_len)
842 return false; /* more bytes to process in this segment */
843 BUG_ON(cursor->vector_offset != bio_vec->bv_len);
844
845 /* Move on to the next segment, and possibly the next bio */
846
847 if (++index == (unsigned int) bio->bi_vcnt) {
848 bio = bio->bi_next;
849 index = 0;
706 } 850 }
707 *iter = bio; 851 cursor->bio = bio;
708 *seg = bio->bi_idx; 852 cursor->vector_index = index;
853 cursor->vector_offset = 0;
854
855 if (!cursor->last_piece) {
856 BUG_ON(!cursor->resid);
857 BUG_ON(!bio);
858 /* A short read is OK, so use <= rather than == */
859 if (cursor->resid <= bio->bi_io_vec[index].bv_len)
860 cursor->last_piece = true;
861 }
862
863 return true;
709} 864}
865#endif /* CONFIG_BLOCK */
710 866
711static void iter_bio_next(struct bio **bio_iter, int *seg) 867/*
868 * For a page array, a piece comes from the first page in the array
869 * that has not already been fully consumed.
870 */
871static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
872 size_t length)
712{ 873{
713 if (*bio_iter == NULL) 874 struct ceph_msg_data *data = cursor->data;
714 return; 875 int page_count;
876
877 BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
715 878
716 BUG_ON(*seg >= (*bio_iter)->bi_vcnt); 879 BUG_ON(!data->pages);
880 BUG_ON(!data->length);
717 881
718 (*seg)++; 882 cursor->resid = min(length, data->length);
719 if (*seg == (*bio_iter)->bi_vcnt) 883 page_count = calc_pages_for(data->alignment, (u64)data->length);
720 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); 884 cursor->page_offset = data->alignment & ~PAGE_MASK;
885 cursor->page_index = 0;
886 BUG_ON(page_count > (int)USHRT_MAX);
887 cursor->page_count = (unsigned short)page_count;
888 BUG_ON(length > SIZE_MAX - cursor->page_offset);
889 cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE;
721} 890}
722#endif
723 891
724static void prepare_write_message_data(struct ceph_connection *con) 892static struct page *
893ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
894 size_t *page_offset, size_t *length)
725{ 895{
726 struct ceph_msg *msg = con->out_msg; 896 struct ceph_msg_data *data = cursor->data;
727 897
728 BUG_ON(!msg); 898 BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
729 BUG_ON(!msg->hdr.data_len); 899
900 BUG_ON(cursor->page_index >= cursor->page_count);
901 BUG_ON(cursor->page_offset >= PAGE_SIZE);
902
903 *page_offset = cursor->page_offset;
904 if (cursor->last_piece)
905 *length = cursor->resid;
906 else
907 *length = PAGE_SIZE - *page_offset;
908
909 return data->pages[cursor->page_index];
910}
911
912static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
913 size_t bytes)
914{
915 BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
916
917 BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
918
919 /* Advance the cursor page offset */
920
921 cursor->resid -= bytes;
922 cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK;
923 if (!bytes || cursor->page_offset)
924 return false; /* more bytes to process in the current page */
925
926 /* Move on to the next page; offset is already at 0 */
927
928 BUG_ON(cursor->page_index >= cursor->page_count);
929 cursor->page_index++;
930 cursor->last_piece = cursor->resid <= PAGE_SIZE;
931
932 return true;
933}
934
935/*
936 * For a pagelist, a piece is whatever remains to be consumed in the
937 * first page in the list, or the front of the next page.
938 */
939static void
940ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
941 size_t length)
942{
943 struct ceph_msg_data *data = cursor->data;
944 struct ceph_pagelist *pagelist;
945 struct page *page;
946
947 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
948
949 pagelist = data->pagelist;
950 BUG_ON(!pagelist);
951
952 if (!length)
953 return; /* pagelist can be assigned but empty */
954
955 BUG_ON(list_empty(&pagelist->head));
956 page = list_first_entry(&pagelist->head, struct page, lru);
957
958 cursor->resid = min(length, pagelist->length);
959 cursor->page = page;
960 cursor->offset = 0;
961 cursor->last_piece = cursor->resid <= PAGE_SIZE;
962}
963
964static struct page *
965ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
966 size_t *page_offset, size_t *length)
967{
968 struct ceph_msg_data *data = cursor->data;
969 struct ceph_pagelist *pagelist;
970
971 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
730 972
731 /* initialize page iterator */ 973 pagelist = data->pagelist;
732 con->out_msg_pos.page = 0; 974 BUG_ON(!pagelist);
733 if (msg->pages) 975
734 con->out_msg_pos.page_pos = msg->page_alignment; 976 BUG_ON(!cursor->page);
977 BUG_ON(cursor->offset + cursor->resid != pagelist->length);
978
979 /* offset of first page in pagelist is always 0 */
980 *page_offset = cursor->offset & ~PAGE_MASK;
981 if (cursor->last_piece)
982 *length = cursor->resid;
735 else 983 else
736 con->out_msg_pos.page_pos = 0; 984 *length = PAGE_SIZE - *page_offset;
985
986 return cursor->page;
987}
988
989static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
990 size_t bytes)
991{
992 struct ceph_msg_data *data = cursor->data;
993 struct ceph_pagelist *pagelist;
994
995 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
996
997 pagelist = data->pagelist;
998 BUG_ON(!pagelist);
999
1000 BUG_ON(cursor->offset + cursor->resid != pagelist->length);
1001 BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
1002
1003 /* Advance the cursor offset */
1004
1005 cursor->resid -= bytes;
1006 cursor->offset += bytes;
1007 /* offset of first page in pagelist is always 0 */
1008 if (!bytes || cursor->offset & ~PAGE_MASK)
1009 return false; /* more bytes to process in the current page */
1010
1011 /* Move on to the next page */
1012
1013 BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
1014 cursor->page = list_entry_next(cursor->page, lru);
1015 cursor->last_piece = cursor->resid <= PAGE_SIZE;
1016
1017 return true;
1018}
1019
1020/*
1021 * Message data is handled (sent or received) in pieces, where each
1022 * piece resides on a single page. The network layer might not
1023 * consume an entire piece at once. A data item's cursor keeps
1024 * track of which piece is next to process and how much remains to
1025 * be processed in that piece. It also tracks whether the current
1026 * piece is the last one in the data item.
1027 */
1028static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
1029{
1030 size_t length = cursor->total_resid;
1031
1032 switch (cursor->data->type) {
1033 case CEPH_MSG_DATA_PAGELIST:
1034 ceph_msg_data_pagelist_cursor_init(cursor, length);
1035 break;
1036 case CEPH_MSG_DATA_PAGES:
1037 ceph_msg_data_pages_cursor_init(cursor, length);
1038 break;
737#ifdef CONFIG_BLOCK 1039#ifdef CONFIG_BLOCK
738 if (msg->bio) 1040 case CEPH_MSG_DATA_BIO:
739 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); 1041 ceph_msg_data_bio_cursor_init(cursor, length);
740#endif 1042 break;
741 con->out_msg_pos.data_pos = 0; 1043#endif /* CONFIG_BLOCK */
742 con->out_msg_pos.did_page_crc = false; 1044 case CEPH_MSG_DATA_NONE:
743 con->out_more = 1; /* data + footer will follow */ 1045 default:
1046 /* BUG(); */
1047 break;
1048 }
1049 cursor->need_crc = true;
1050}
1051
1052static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
1053{
1054 struct ceph_msg_data_cursor *cursor = &msg->cursor;
1055 struct ceph_msg_data *data;
1056
1057 BUG_ON(!length);
1058 BUG_ON(length > msg->data_length);
1059 BUG_ON(list_empty(&msg->data));
1060
1061 cursor->data_head = &msg->data;
1062 cursor->total_resid = length;
1063 data = list_first_entry(&msg->data, struct ceph_msg_data, links);
1064 cursor->data = data;
1065
1066 __ceph_msg_data_cursor_init(cursor);
1067}
1068
1069/*
1070 * Return the page containing the next piece to process for a given
1071 * data item, and supply the page offset and length of that piece.
1072 * Indicate whether this is the last piece in this data item.
1073 */
1074static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
1075 size_t *page_offset, size_t *length,
1076 bool *last_piece)
1077{
1078 struct page *page;
1079
1080 switch (cursor->data->type) {
1081 case CEPH_MSG_DATA_PAGELIST:
1082 page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
1083 break;
1084 case CEPH_MSG_DATA_PAGES:
1085 page = ceph_msg_data_pages_next(cursor, page_offset, length);
1086 break;
1087#ifdef CONFIG_BLOCK
1088 case CEPH_MSG_DATA_BIO:
1089 page = ceph_msg_data_bio_next(cursor, page_offset, length);
1090 break;
1091#endif /* CONFIG_BLOCK */
1092 case CEPH_MSG_DATA_NONE:
1093 default:
1094 page = NULL;
1095 break;
1096 }
1097 BUG_ON(!page);
1098 BUG_ON(*page_offset + *length > PAGE_SIZE);
1099 BUG_ON(!*length);
1100 if (last_piece)
1101 *last_piece = cursor->last_piece;
1102
1103 return page;
1104}
1105
1106/*
1107 * Returns true if the result moves the cursor on to the next piece
1108 * of the data item.
1109 */
1110static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
1111 size_t bytes)
1112{
1113 bool new_piece;
1114
1115 BUG_ON(bytes > cursor->resid);
1116 switch (cursor->data->type) {
1117 case CEPH_MSG_DATA_PAGELIST:
1118 new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
1119 break;
1120 case CEPH_MSG_DATA_PAGES:
1121 new_piece = ceph_msg_data_pages_advance(cursor, bytes);
1122 break;
1123#ifdef CONFIG_BLOCK
1124 case CEPH_MSG_DATA_BIO:
1125 new_piece = ceph_msg_data_bio_advance(cursor, bytes);
1126 break;
1127#endif /* CONFIG_BLOCK */
1128 case CEPH_MSG_DATA_NONE:
1129 default:
1130 BUG();
1131 break;
1132 }
1133 cursor->total_resid -= bytes;
1134
1135 if (!cursor->resid && cursor->total_resid) {
1136 WARN_ON(!cursor->last_piece);
1137 BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
1138 cursor->data = list_entry_next(cursor->data, links);
1139 __ceph_msg_data_cursor_init(cursor);
1140 new_piece = true;
1141 }
1142 cursor->need_crc = new_piece;
1143
1144 return new_piece;
1145}
1146
1147static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
1148{
1149 BUG_ON(!msg);
1150 BUG_ON(!data_len);
1151
1152 /* Initialize data cursor */
1153
1154 ceph_msg_data_cursor_init(msg, (size_t)data_len);
744} 1155}
745 1156
746/* 1157/*
@@ -803,16 +1214,12 @@ static void prepare_write_message(struct ceph_connection *con)
803 m->hdr.seq = cpu_to_le64(++con->out_seq); 1214 m->hdr.seq = cpu_to_le64(++con->out_seq);
804 m->needs_out_seq = false; 1215 m->needs_out_seq = false;
805 } 1216 }
806#ifdef CONFIG_BLOCK 1217 WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
807 else
808 m->bio_iter = NULL;
809#endif
810 1218
811 dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", 1219 dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
812 m, con->out_seq, le16_to_cpu(m->hdr.type), 1220 m, con->out_seq, le16_to_cpu(m->hdr.type),
813 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), 1221 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
814 le32_to_cpu(m->hdr.data_len), 1222 m->data_length);
815 m->nr_pages);
816 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); 1223 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
817 1224
818 /* tag + hdr + front + middle */ 1225 /* tag + hdr + front + middle */
@@ -843,11 +1250,13 @@ static void prepare_write_message(struct ceph_connection *con)
843 1250
844 /* is there a data payload? */ 1251 /* is there a data payload? */
845 con->out_msg->footer.data_crc = 0; 1252 con->out_msg->footer.data_crc = 0;
846 if (m->hdr.data_len) 1253 if (m->data_length) {
847 prepare_write_message_data(con); 1254 prepare_message_data(con->out_msg, m->data_length);
848 else 1255 con->out_more = 1; /* data + footer will follow */
1256 } else {
849 /* no, queue up footer too and be done */ 1257 /* no, queue up footer too and be done */
850 prepare_write_message_footer(con); 1258 prepare_write_message_footer(con);
1259 }
851 1260
852 con_flag_set(con, CON_FLAG_WRITE_PENDING); 1261 con_flag_set(con, CON_FLAG_WRITE_PENDING);
853} 1262}
@@ -874,6 +1283,24 @@ static void prepare_write_ack(struct ceph_connection *con)
874} 1283}
875 1284
876/* 1285/*
1286 * Prepare to share the seq during handshake
1287 */
1288static void prepare_write_seq(struct ceph_connection *con)
1289{
1290 dout("prepare_write_seq %p %llu -> %llu\n", con,
1291 con->in_seq_acked, con->in_seq);
1292 con->in_seq_acked = con->in_seq;
1293
1294 con_out_kvec_reset(con);
1295
1296 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
1297 con_out_kvec_add(con, sizeof (con->out_temp_ack),
1298 &con->out_temp_ack);
1299
1300 con_flag_set(con, CON_FLAG_WRITE_PENDING);
1301}
1302
1303/*
877 * Prepare to write keepalive byte. 1304 * Prepare to write keepalive byte.
878 */ 1305 */
879static void prepare_write_keepalive(struct ceph_connection *con) 1306static void prepare_write_keepalive(struct ceph_connection *con)
@@ -1022,35 +1449,19 @@ out:
1022 return ret; /* done! */ 1449 return ret; /* done! */
1023} 1450}
1024 1451
1025static void out_msg_pos_next(struct ceph_connection *con, struct page *page, 1452static u32 ceph_crc32c_page(u32 crc, struct page *page,
1026 size_t len, size_t sent, bool in_trail) 1453 unsigned int page_offset,
1454 unsigned int length)
1027{ 1455{
1028 struct ceph_msg *msg = con->out_msg; 1456 char *kaddr;
1029 1457
1030 BUG_ON(!msg); 1458 kaddr = kmap(page);
1031 BUG_ON(!sent); 1459 BUG_ON(kaddr == NULL);
1032 1460 crc = crc32c(crc, kaddr + page_offset, length);
1033 con->out_msg_pos.data_pos += sent; 1461 kunmap(page);
1034 con->out_msg_pos.page_pos += sent;
1035 if (sent < len)
1036 return;
1037 1462
1038 BUG_ON(sent != len); 1463 return crc;
1039 con->out_msg_pos.page_pos = 0;
1040 con->out_msg_pos.page++;
1041 con->out_msg_pos.did_page_crc = false;
1042 if (in_trail)
1043 list_move_tail(&page->lru,
1044 &msg->trail->head);
1045 else if (msg->pagelist)
1046 list_move_tail(&page->lru,
1047 &msg->pagelist->head);
1048#ifdef CONFIG_BLOCK
1049 else if (msg->bio)
1050 iter_bio_next(&msg->bio_iter, &msg->bio_seg);
1051#endif
1052} 1464}
1053
1054/* 1465/*
1055 * Write as much message data payload as we can. If we finish, queue 1466 * Write as much message data payload as we can. If we finish, queue
1056 * up the footer. 1467 * up the footer.
@@ -1058,21 +1469,17 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
1058 * 0 -> socket full, but more to do 1469 * 0 -> socket full, but more to do
1059 * <0 -> error 1470 * <0 -> error
1060 */ 1471 */
1061static int write_partial_msg_pages(struct ceph_connection *con) 1472static int write_partial_message_data(struct ceph_connection *con)
1062{ 1473{
1063 struct ceph_msg *msg = con->out_msg; 1474 struct ceph_msg *msg = con->out_msg;
1064 unsigned int data_len = le32_to_cpu(msg->hdr.data_len); 1475 struct ceph_msg_data_cursor *cursor = &msg->cursor;
1065 size_t len;
1066 bool do_datacrc = !con->msgr->nocrc; 1476 bool do_datacrc = !con->msgr->nocrc;
1067 int ret; 1477 u32 crc;
1068 int total_max_write;
1069 bool in_trail = false;
1070 const size_t trail_len = (msg->trail ? msg->trail->length : 0);
1071 const size_t trail_off = data_len - trail_len;
1072 1478
1073 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", 1479 dout("%s %p msg %p\n", __func__, con, msg);
1074 con, msg, con->out_msg_pos.page, msg->nr_pages, 1480
1075 con->out_msg_pos.page_pos); 1481 if (list_empty(&msg->data))
1482 return -EINVAL;
1076 1483
1077 /* 1484 /*
1078 * Iterate through each page that contains data to be 1485 * Iterate through each page that contains data to be
@@ -1082,72 +1489,41 @@ static int write_partial_msg_pages(struct ceph_connection *con)
1082 * need to map the page. If we have no pages, they have 1489 * need to map the page. If we have no pages, they have
1083 * been revoked, so use the zero page. 1490 * been revoked, so use the zero page.
1084 */ 1491 */
1085 while (data_len > con->out_msg_pos.data_pos) { 1492 crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
1086 struct page *page = NULL; 1493 while (cursor->resid) {
1087 int max_write = PAGE_SIZE; 1494 struct page *page;
1088 int bio_offset = 0; 1495 size_t page_offset;
1089 1496 size_t length;
1090 in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off; 1497 bool last_piece;
1091 if (!in_trail) 1498 bool need_crc;
1092 total_max_write = trail_off - con->out_msg_pos.data_pos; 1499 int ret;
1093
1094 if (in_trail) {
1095 total_max_write = data_len - con->out_msg_pos.data_pos;
1096
1097 page = list_first_entry(&msg->trail->head,
1098 struct page, lru);
1099 } else if (msg->pages) {
1100 page = msg->pages[con->out_msg_pos.page];
1101 } else if (msg->pagelist) {
1102 page = list_first_entry(&msg->pagelist->head,
1103 struct page, lru);
1104#ifdef CONFIG_BLOCK
1105 } else if (msg->bio) {
1106 struct bio_vec *bv;
1107 1500
1108 bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg); 1501 page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
1109 page = bv->bv_page; 1502 &last_piece);
1110 bio_offset = bv->bv_offset; 1503 ret = ceph_tcp_sendpage(con->sock, page, page_offset,
1111 max_write = bv->bv_len; 1504 length, last_piece);
1112#endif 1505 if (ret <= 0) {
1113 } else { 1506 if (do_datacrc)
1114 page = zero_page; 1507 msg->footer.data_crc = cpu_to_le32(crc);
1115 }
1116 len = min_t(int, max_write - con->out_msg_pos.page_pos,
1117 total_max_write);
1118
1119 if (do_datacrc && !con->out_msg_pos.did_page_crc) {
1120 void *base;
1121 u32 crc = le32_to_cpu(msg->footer.data_crc);
1122 char *kaddr;
1123
1124 kaddr = kmap(page);
1125 BUG_ON(kaddr == NULL);
1126 base = kaddr + con->out_msg_pos.page_pos + bio_offset;
1127 crc = crc32c(crc, base, len);
1128 kunmap(page);
1129 msg->footer.data_crc = cpu_to_le32(crc);
1130 con->out_msg_pos.did_page_crc = true;
1131 }
1132 ret = ceph_tcp_sendpage(con->sock, page,
1133 con->out_msg_pos.page_pos + bio_offset,
1134 len, 1);
1135 if (ret <= 0)
1136 goto out;
1137 1508
1138 out_msg_pos_next(con, page, len, (size_t) ret, in_trail); 1509 return ret;
1510 }
1511 if (do_datacrc && cursor->need_crc)
1512 crc = ceph_crc32c_page(crc, page, page_offset, length);
1513 need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
1139 } 1514 }
1140 1515
1141 dout("write_partial_msg_pages %p msg %p done\n", con, msg); 1516 dout("%s %p msg %p done\n", __func__, con, msg);
1142 1517
1143 /* prepare and queue up footer, too */ 1518 /* prepare and queue up footer, too */
1144 if (!do_datacrc) 1519 if (do_datacrc)
1520 msg->footer.data_crc = cpu_to_le32(crc);
1521 else
1145 msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 1522 msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
1146 con_out_kvec_reset(con); 1523 con_out_kvec_reset(con);
1147 prepare_write_message_footer(con); 1524 prepare_write_message_footer(con);
1148 ret = 1; 1525
1149out: 1526 return 1; /* must return > 0 to indicate success */
1150 return ret;
1151} 1527}
1152 1528
1153/* 1529/*
@@ -1160,7 +1536,7 @@ static int write_partial_skip(struct ceph_connection *con)
1160 while (con->out_skip > 0) { 1536 while (con->out_skip > 0) {
1161 size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE); 1537 size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
1162 1538
1163 ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, 1); 1539 ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, true);
1164 if (ret <= 0) 1540 if (ret <= 0)
1165 goto out; 1541 goto out;
1166 con->out_skip -= ret; 1542 con->out_skip -= ret;
@@ -1191,6 +1567,13 @@ static void prepare_read_ack(struct ceph_connection *con)
1191 con->in_base_pos = 0; 1567 con->in_base_pos = 0;
1192} 1568}
1193 1569
1570static void prepare_read_seq(struct ceph_connection *con)
1571{
1572 dout("prepare_read_seq %p\n", con);
1573 con->in_base_pos = 0;
1574 con->in_tag = CEPH_MSGR_TAG_SEQ;
1575}
1576
1194static void prepare_read_tag(struct ceph_connection *con) 1577static void prepare_read_tag(struct ceph_connection *con)
1195{ 1578{
1196 dout("prepare_read_tag %p\n", con); 1579 dout("prepare_read_tag %p\n", con);
@@ -1597,7 +1980,6 @@ static int process_connect(struct ceph_connection *con)
1597 con->error_msg = "connect authorization failure"; 1980 con->error_msg = "connect authorization failure";
1598 return -1; 1981 return -1;
1599 } 1982 }
1600 con->auth_retry = 1;
1601 con_out_kvec_reset(con); 1983 con_out_kvec_reset(con);
1602 ret = prepare_write_connect(con); 1984 ret = prepare_write_connect(con);
1603 if (ret < 0) 1985 if (ret < 0)
@@ -1668,6 +2050,7 @@ static int process_connect(struct ceph_connection *con)
1668 prepare_read_connect(con); 2050 prepare_read_connect(con);
1669 break; 2051 break;
1670 2052
2053 case CEPH_MSGR_TAG_SEQ:
1671 case CEPH_MSGR_TAG_READY: 2054 case CEPH_MSGR_TAG_READY:
1672 if (req_feat & ~server_feat) { 2055 if (req_feat & ~server_feat) {
1673 pr_err("%s%lld %s protocol feature mismatch," 2056 pr_err("%s%lld %s protocol feature mismatch,"
@@ -1682,7 +2065,7 @@ static int process_connect(struct ceph_connection *con)
1682 2065
1683 WARN_ON(con->state != CON_STATE_NEGOTIATING); 2066 WARN_ON(con->state != CON_STATE_NEGOTIATING);
1684 con->state = CON_STATE_OPEN; 2067 con->state = CON_STATE_OPEN;
1685 2068 con->auth_retry = 0; /* we authenticated; clear flag */
1686 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 2069 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
1687 con->connect_seq++; 2070 con->connect_seq++;
1688 con->peer_features = server_feat; 2071 con->peer_features = server_feat;
@@ -1698,7 +2081,12 @@ static int process_connect(struct ceph_connection *con)
1698 2081
1699 con->delay = 0; /* reset backoff memory */ 2082 con->delay = 0; /* reset backoff memory */
1700 2083
1701 prepare_read_tag(con); 2084 if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) {
2085 prepare_write_seq(con);
2086 prepare_read_seq(con);
2087 } else {
2088 prepare_read_tag(con);
2089 }
1702 break; 2090 break;
1703 2091
1704 case CEPH_MSGR_TAG_WAIT: 2092 case CEPH_MSGR_TAG_WAIT:
@@ -1732,7 +2120,6 @@ static int read_partial_ack(struct ceph_connection *con)
1732 return read_partial(con, end, size, &con->in_temp_ack); 2120 return read_partial(con, end, size, &con->in_temp_ack);
1733} 2121}
1734 2122
1735
1736/* 2123/*
1737 * We can finally discard anything that's been acked. 2124 * We can finally discard anything that's been acked.
1738 */ 2125 */
@@ -1757,8 +2144,6 @@ static void process_ack(struct ceph_connection *con)
1757} 2144}
1758 2145
1759 2146
1760
1761
1762static int read_partial_message_section(struct ceph_connection *con, 2147static int read_partial_message_section(struct ceph_connection *con,
1763 struct kvec *section, 2148 struct kvec *section,
1764 unsigned int sec_len, u32 *crc) 2149 unsigned int sec_len, u32 *crc)
@@ -1782,77 +2167,49 @@ static int read_partial_message_section(struct ceph_connection *con,
1782 return 1; 2167 return 1;
1783} 2168}
1784 2169
1785static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip); 2170static int read_partial_msg_data(struct ceph_connection *con)
1786
1787static int read_partial_message_pages(struct ceph_connection *con,
1788 struct page **pages,
1789 unsigned int data_len, bool do_datacrc)
1790{ 2171{
1791 void *p; 2172 struct ceph_msg *msg = con->in_msg;
2173 struct ceph_msg_data_cursor *cursor = &msg->cursor;
2174 const bool do_datacrc = !con->msgr->nocrc;
2175 struct page *page;
2176 size_t page_offset;
2177 size_t length;
2178 u32 crc = 0;
1792 int ret; 2179 int ret;
1793 int left;
1794 2180
1795 left = min((int)(data_len - con->in_msg_pos.data_pos), 2181 BUG_ON(!msg);
1796 (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); 2182 if (list_empty(&msg->data))
1797 /* (page) data */ 2183 return -EIO;
1798 BUG_ON(pages == NULL);
1799 p = kmap(pages[con->in_msg_pos.page]);
1800 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1801 left);
1802 if (ret > 0 && do_datacrc)
1803 con->in_data_crc =
1804 crc32c(con->in_data_crc,
1805 p + con->in_msg_pos.page_pos, ret);
1806 kunmap(pages[con->in_msg_pos.page]);
1807 if (ret <= 0)
1808 return ret;
1809 con->in_msg_pos.data_pos += ret;
1810 con->in_msg_pos.page_pos += ret;
1811 if (con->in_msg_pos.page_pos == PAGE_SIZE) {
1812 con->in_msg_pos.page_pos = 0;
1813 con->in_msg_pos.page++;
1814 }
1815
1816 return ret;
1817}
1818
1819#ifdef CONFIG_BLOCK
1820static int read_partial_message_bio(struct ceph_connection *con,
1821 struct bio **bio_iter, int *bio_seg,
1822 unsigned int data_len, bool do_datacrc)
1823{
1824 struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
1825 void *p;
1826 int ret, left;
1827 2184
1828 left = min((int)(data_len - con->in_msg_pos.data_pos), 2185 if (do_datacrc)
1829 (int)(bv->bv_len - con->in_msg_pos.page_pos)); 2186 crc = con->in_data_crc;
2187 while (cursor->resid) {
2188 page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
2189 NULL);
2190 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
2191 if (ret <= 0) {
2192 if (do_datacrc)
2193 con->in_data_crc = crc;
1830 2194
1831 p = kmap(bv->bv_page) + bv->bv_offset; 2195 return ret;
2196 }
1832 2197
1833 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 2198 if (do_datacrc)
1834 left); 2199 crc = ceph_crc32c_page(crc, page, page_offset, ret);
1835 if (ret > 0 && do_datacrc) 2200 (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
1836 con->in_data_crc =
1837 crc32c(con->in_data_crc,
1838 p + con->in_msg_pos.page_pos, ret);
1839 kunmap(bv->bv_page);
1840 if (ret <= 0)
1841 return ret;
1842 con->in_msg_pos.data_pos += ret;
1843 con->in_msg_pos.page_pos += ret;
1844 if (con->in_msg_pos.page_pos == bv->bv_len) {
1845 con->in_msg_pos.page_pos = 0;
1846 iter_bio_next(bio_iter, bio_seg);
1847 } 2201 }
2202 if (do_datacrc)
2203 con->in_data_crc = crc;
1848 2204
1849 return ret; 2205 return 1; /* must return > 0 to indicate success */
1850} 2206}
1851#endif
1852 2207
1853/* 2208/*
1854 * read (part of) a message. 2209 * read (part of) a message.
1855 */ 2210 */
2211static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
2212
1856static int read_partial_message(struct ceph_connection *con) 2213static int read_partial_message(struct ceph_connection *con)
1857{ 2214{
1858 struct ceph_msg *m = con->in_msg; 2215 struct ceph_msg *m = con->in_msg;
@@ -1885,7 +2242,7 @@ static int read_partial_message(struct ceph_connection *con)
1885 if (front_len > CEPH_MSG_MAX_FRONT_LEN) 2242 if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1886 return -EIO; 2243 return -EIO;
1887 middle_len = le32_to_cpu(con->in_hdr.middle_len); 2244 middle_len = le32_to_cpu(con->in_hdr.middle_len);
1888 if (middle_len > CEPH_MSG_MAX_DATA_LEN) 2245 if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
1889 return -EIO; 2246 return -EIO;
1890 data_len = le32_to_cpu(con->in_hdr.data_len); 2247 data_len = le32_to_cpu(con->in_hdr.data_len);
1891 if (data_len > CEPH_MSG_MAX_DATA_LEN) 2248 if (data_len > CEPH_MSG_MAX_DATA_LEN)
@@ -1914,14 +2271,22 @@ static int read_partial_message(struct ceph_connection *con)
1914 int skip = 0; 2271 int skip = 0;
1915 2272
1916 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, 2273 dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
1917 con->in_hdr.front_len, con->in_hdr.data_len); 2274 front_len, data_len);
1918 ret = ceph_con_in_msg_alloc(con, &skip); 2275 ret = ceph_con_in_msg_alloc(con, &skip);
1919 if (ret < 0) 2276 if (ret < 0)
1920 return ret; 2277 return ret;
2278
2279 BUG_ON(!con->in_msg ^ skip);
2280 if (con->in_msg && data_len > con->in_msg->data_length) {
2281 pr_warning("%s skipping long message (%u > %zd)\n",
2282 __func__, data_len, con->in_msg->data_length);
2283 ceph_msg_put(con->in_msg);
2284 con->in_msg = NULL;
2285 skip = 1;
2286 }
1921 if (skip) { 2287 if (skip) {
1922 /* skip this message */ 2288 /* skip this message */
1923 dout("alloc_msg said skip message\n"); 2289 dout("alloc_msg said skip message\n");
1924 BUG_ON(con->in_msg);
1925 con->in_base_pos = -front_len - middle_len - data_len - 2290 con->in_base_pos = -front_len - middle_len - data_len -
1926 sizeof(m->footer); 2291 sizeof(m->footer);
1927 con->in_tag = CEPH_MSGR_TAG_READY; 2292 con->in_tag = CEPH_MSGR_TAG_READY;
@@ -1936,17 +2301,10 @@ static int read_partial_message(struct ceph_connection *con)
1936 if (m->middle) 2301 if (m->middle)
1937 m->middle->vec.iov_len = 0; 2302 m->middle->vec.iov_len = 0;
1938 2303
1939 con->in_msg_pos.page = 0; 2304 /* prepare for data payload, if any */
1940 if (m->pages)
1941 con->in_msg_pos.page_pos = m->page_alignment;
1942 else
1943 con->in_msg_pos.page_pos = 0;
1944 con->in_msg_pos.data_pos = 0;
1945 2305
1946#ifdef CONFIG_BLOCK 2306 if (data_len)
1947 if (m->bio) 2307 prepare_message_data(con->in_msg, data_len);
1948 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
1949#endif
1950 } 2308 }
1951 2309
1952 /* front */ 2310 /* front */
@@ -1965,24 +2323,10 @@ static int read_partial_message(struct ceph_connection *con)
1965 } 2323 }
1966 2324
1967 /* (page) data */ 2325 /* (page) data */
1968 while (con->in_msg_pos.data_pos < data_len) { 2326 if (data_len) {
1969 if (m->pages) { 2327 ret = read_partial_msg_data(con);
1970 ret = read_partial_message_pages(con, m->pages, 2328 if (ret <= 0)
1971 data_len, do_datacrc); 2329 return ret;
1972 if (ret <= 0)
1973 return ret;
1974#ifdef CONFIG_BLOCK
1975 } else if (m->bio) {
1976 BUG_ON(!m->bio_iter);
1977 ret = read_partial_message_bio(con,
1978 &m->bio_iter, &m->bio_seg,
1979 data_len, do_datacrc);
1980 if (ret <= 0)
1981 return ret;
1982#endif
1983 } else {
1984 BUG_ON(1);
1985 }
1986 } 2330 }
1987 2331
1988 /* footer */ 2332 /* footer */
@@ -2108,13 +2452,13 @@ more_kvec:
2108 goto do_next; 2452 goto do_next;
2109 } 2453 }
2110 2454
2111 ret = write_partial_msg_pages(con); 2455 ret = write_partial_message_data(con);
2112 if (ret == 1) 2456 if (ret == 1)
2113 goto more_kvec; /* we need to send the footer, too! */ 2457 goto more_kvec; /* we need to send the footer, too! */
2114 if (ret == 0) 2458 if (ret == 0)
2115 goto out; 2459 goto out;
2116 if (ret < 0) { 2460 if (ret < 0) {
2117 dout("try_write write_partial_msg_pages err %d\n", 2461 dout("try_write write_partial_message_data err %d\n",
2118 ret); 2462 ret);
2119 goto out; 2463 goto out;
2120 } 2464 }
@@ -2266,7 +2610,12 @@ more:
2266 prepare_read_tag(con); 2610 prepare_read_tag(con);
2267 goto more; 2611 goto more;
2268 } 2612 }
2269 if (con->in_tag == CEPH_MSGR_TAG_ACK) { 2613 if (con->in_tag == CEPH_MSGR_TAG_ACK ||
2614 con->in_tag == CEPH_MSGR_TAG_SEQ) {
2615 /*
2616 * the final handshake seq exchange is semantically
2617 * equivalent to an ACK
2618 */
2270 ret = read_partial_ack(con); 2619 ret = read_partial_ack(con);
2271 if (ret <= 0) 2620 if (ret <= 0)
2272 goto out; 2621 goto out;
@@ -2672,6 +3021,88 @@ void ceph_con_keepalive(struct ceph_connection *con)
2672} 3021}
2673EXPORT_SYMBOL(ceph_con_keepalive); 3022EXPORT_SYMBOL(ceph_con_keepalive);
2674 3023
3024static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
3025{
3026 struct ceph_msg_data *data;
3027
3028 if (WARN_ON(!ceph_msg_data_type_valid(type)))
3029 return NULL;
3030
3031 data = kmem_cache_zalloc(ceph_msg_data_cache, GFP_NOFS);
3032 if (data)
3033 data->type = type;
3034 INIT_LIST_HEAD(&data->links);
3035
3036 return data;
3037}
3038
3039static void ceph_msg_data_destroy(struct ceph_msg_data *data)
3040{
3041 if (!data)
3042 return;
3043
3044 WARN_ON(!list_empty(&data->links));
3045 if (data->type == CEPH_MSG_DATA_PAGELIST) {
3046 ceph_pagelist_release(data->pagelist);
3047 kfree(data->pagelist);
3048 }
3049 kmem_cache_free(ceph_msg_data_cache, data);
3050}
3051
3052void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
3053 size_t length, size_t alignment)
3054{
3055 struct ceph_msg_data *data;
3056
3057 BUG_ON(!pages);
3058 BUG_ON(!length);
3059
3060 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
3061 BUG_ON(!data);
3062 data->pages = pages;
3063 data->length = length;
3064 data->alignment = alignment & ~PAGE_MASK;
3065
3066 list_add_tail(&data->links, &msg->data);
3067 msg->data_length += length;
3068}
3069EXPORT_SYMBOL(ceph_msg_data_add_pages);
3070
3071void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
3072 struct ceph_pagelist *pagelist)
3073{
3074 struct ceph_msg_data *data;
3075
3076 BUG_ON(!pagelist);
3077 BUG_ON(!pagelist->length);
3078
3079 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
3080 BUG_ON(!data);
3081 data->pagelist = pagelist;
3082
3083 list_add_tail(&data->links, &msg->data);
3084 msg->data_length += pagelist->length;
3085}
3086EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
3087
3088#ifdef CONFIG_BLOCK
3089void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
3090 size_t length)
3091{
3092 struct ceph_msg_data *data;
3093
3094 BUG_ON(!bio);
3095
3096 data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
3097 BUG_ON(!data);
3098 data->bio = bio;
3099 data->bio_length = length;
3100
3101 list_add_tail(&data->links, &msg->data);
3102 msg->data_length += length;
3103}
3104EXPORT_SYMBOL(ceph_msg_data_add_bio);
3105#endif /* CONFIG_BLOCK */
2675 3106
2676/* 3107/*
2677 * construct a new message with given type, size 3108 * construct a new message with given type, size
@@ -2682,49 +3113,20 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
2682{ 3113{
2683 struct ceph_msg *m; 3114 struct ceph_msg *m;
2684 3115
2685 m = kmalloc(sizeof(*m), flags); 3116 m = kmem_cache_zalloc(ceph_msg_cache, flags);
2686 if (m == NULL) 3117 if (m == NULL)
2687 goto out; 3118 goto out;
2688 kref_init(&m->kref);
2689 3119
2690 m->con = NULL;
2691 INIT_LIST_HEAD(&m->list_head);
2692
2693 m->hdr.tid = 0;
2694 m->hdr.type = cpu_to_le16(type); 3120 m->hdr.type = cpu_to_le16(type);
2695 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); 3121 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
2696 m->hdr.version = 0;
2697 m->hdr.front_len = cpu_to_le32(front_len); 3122 m->hdr.front_len = cpu_to_le32(front_len);
2698 m->hdr.middle_len = 0;
2699 m->hdr.data_len = 0;
2700 m->hdr.data_off = 0;
2701 m->hdr.reserved = 0;
2702 m->footer.front_crc = 0;
2703 m->footer.middle_crc = 0;
2704 m->footer.data_crc = 0;
2705 m->footer.flags = 0;
2706 m->front_max = front_len;
2707 m->front_is_vmalloc = false;
2708 m->more_to_follow = false;
2709 m->ack_stamp = 0;
2710 m->pool = NULL;
2711
2712 /* middle */
2713 m->middle = NULL;
2714 3123
2715 /* data */ 3124 INIT_LIST_HEAD(&m->list_head);
2716 m->nr_pages = 0; 3125 kref_init(&m->kref);
2717 m->page_alignment = 0; 3126 INIT_LIST_HEAD(&m->data);
2718 m->pages = NULL;
2719 m->pagelist = NULL;
2720#ifdef CONFIG_BLOCK
2721 m->bio = NULL;
2722 m->bio_iter = NULL;
2723 m->bio_seg = 0;
2724#endif /* CONFIG_BLOCK */
2725 m->trail = NULL;
2726 3127
2727 /* front */ 3128 /* front */
3129 m->front_max = front_len;
2728 if (front_len) { 3130 if (front_len) {
2729 if (front_len > PAGE_CACHE_SIZE) { 3131 if (front_len > PAGE_CACHE_SIZE) {
2730 m->front.iov_base = __vmalloc(front_len, flags, 3132 m->front.iov_base = __vmalloc(front_len, flags,
@@ -2802,49 +3204,37 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
2802static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) 3204static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
2803{ 3205{
2804 struct ceph_msg_header *hdr = &con->in_hdr; 3206 struct ceph_msg_header *hdr = &con->in_hdr;
2805 int type = le16_to_cpu(hdr->type);
2806 int front_len = le32_to_cpu(hdr->front_len);
2807 int middle_len = le32_to_cpu(hdr->middle_len); 3207 int middle_len = le32_to_cpu(hdr->middle_len);
3208 struct ceph_msg *msg;
2808 int ret = 0; 3209 int ret = 0;
2809 3210
2810 BUG_ON(con->in_msg != NULL); 3211 BUG_ON(con->in_msg != NULL);
3212 BUG_ON(!con->ops->alloc_msg);
2811 3213
2812 if (con->ops->alloc_msg) { 3214 mutex_unlock(&con->mutex);
2813 struct ceph_msg *msg; 3215 msg = con->ops->alloc_msg(con, hdr, skip);
2814 3216 mutex_lock(&con->mutex);
2815 mutex_unlock(&con->mutex); 3217 if (con->state != CON_STATE_OPEN) {
2816 msg = con->ops->alloc_msg(con, hdr, skip); 3218 if (msg)
2817 mutex_lock(&con->mutex); 3219 ceph_msg_put(msg);
2818 if (con->state != CON_STATE_OPEN) { 3220 return -EAGAIN;
2819 if (msg)
2820 ceph_msg_put(msg);
2821 return -EAGAIN;
2822 }
2823 con->in_msg = msg;
2824 if (con->in_msg) {
2825 con->in_msg->con = con->ops->get(con);
2826 BUG_ON(con->in_msg->con == NULL);
2827 }
2828 if (*skip) {
2829 con->in_msg = NULL;
2830 return 0;
2831 }
2832 if (!con->in_msg) {
2833 con->error_msg =
2834 "error allocating memory for incoming message";
2835 return -ENOMEM;
2836 }
2837 } 3221 }
2838 if (!con->in_msg) { 3222 if (msg) {
2839 con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 3223 BUG_ON(*skip);
2840 if (!con->in_msg) { 3224 con->in_msg = msg;
2841 pr_err("unable to allocate msg type %d len %d\n",
2842 type, front_len);
2843 return -ENOMEM;
2844 }
2845 con->in_msg->con = con->ops->get(con); 3225 con->in_msg->con = con->ops->get(con);
2846 BUG_ON(con->in_msg->con == NULL); 3226 BUG_ON(con->in_msg->con == NULL);
2847 con->in_msg->page_alignment = le16_to_cpu(hdr->data_off); 3227 } else {
3228 /*
3229 * Null message pointer means either we should skip
3230 * this message or we couldn't allocate memory. The
3231 * former is not an error.
3232 */
3233 if (*skip)
3234 return 0;
3235 con->error_msg = "error allocating memory for incoming message";
3236
3237 return -ENOMEM;
2848 } 3238 }
2849 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 3239 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
2850 3240
@@ -2870,7 +3260,7 @@ void ceph_msg_kfree(struct ceph_msg *m)
2870 vfree(m->front.iov_base); 3260 vfree(m->front.iov_base);
2871 else 3261 else
2872 kfree(m->front.iov_base); 3262 kfree(m->front.iov_base);
2873 kfree(m); 3263 kmem_cache_free(ceph_msg_cache, m);
2874} 3264}
2875 3265
2876/* 3266/*
@@ -2879,6 +3269,9 @@ void ceph_msg_kfree(struct ceph_msg *m)
2879void ceph_msg_last_put(struct kref *kref) 3269void ceph_msg_last_put(struct kref *kref)
2880{ 3270{
2881 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); 3271 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
3272 LIST_HEAD(data);
3273 struct list_head *links;
3274 struct list_head *next;
2882 3275
2883 dout("ceph_msg_put last one on %p\n", m); 3276 dout("ceph_msg_put last one on %p\n", m);
2884 WARN_ON(!list_empty(&m->list_head)); 3277 WARN_ON(!list_empty(&m->list_head));
@@ -2888,16 +3281,16 @@ void ceph_msg_last_put(struct kref *kref)
2888 ceph_buffer_put(m->middle); 3281 ceph_buffer_put(m->middle);
2889 m->middle = NULL; 3282 m->middle = NULL;
2890 } 3283 }
2891 m->nr_pages = 0;
2892 m->pages = NULL;
2893 3284
2894 if (m->pagelist) { 3285 list_splice_init(&m->data, &data);
2895 ceph_pagelist_release(m->pagelist); 3286 list_for_each_safe(links, next, &data) {
2896 kfree(m->pagelist); 3287 struct ceph_msg_data *data;
2897 m->pagelist = NULL;
2898 }
2899 3288
2900 m->trail = NULL; 3289 data = list_entry(links, struct ceph_msg_data, links);
3290 list_del_init(links);
3291 ceph_msg_data_destroy(data);
3292 }
3293 m->data_length = 0;
2901 3294
2902 if (m->pool) 3295 if (m->pool)
2903 ceph_msgpool_put(m->pool, m); 3296 ceph_msgpool_put(m->pool, m);
@@ -2908,8 +3301,8 @@ EXPORT_SYMBOL(ceph_msg_last_put);
2908 3301
2909void ceph_msg_dump(struct ceph_msg *msg) 3302void ceph_msg_dump(struct ceph_msg *msg)
2910{ 3303{
2911 pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg, 3304 pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
2912 msg->front_max, msg->nr_pages); 3305 msg->front_max, msg->data_length);
2913 print_hex_dump(KERN_DEBUG, "header: ", 3306 print_hex_dump(KERN_DEBUG, "header: ",
2914 DUMP_PREFIX_OFFSET, 16, 1, 3307 DUMP_PREFIX_OFFSET, 16, 1,
2915 &msg->hdr, sizeof(msg->hdr), true); 3308 &msg->hdr, sizeof(msg->hdr), true);