aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-03-12 00:34:24 -0400
committerSage Weil <sage@inktank.com>2013-05-02 00:17:37 -0400
commit6644ed7b7e04f8e588aebdaa58cededb9416ab95 (patch)
tree2770308c0a48245b81754238f0c37e87605fde3b /net/ceph/messenger.c
parent8ea299bcbc85aeaf5348d99614b35433287bec6b (diff)
libceph: make message data be a pointer
Begin the transition from a single message data item to a list of them by replacing the "data" structure in a message with a pointer to a ceph_msg_data structure. A null pointer will indicate the message has no data; replace the use of ceph_msg_has_data() with a simple check for a null pointer. Create functions ceph_msg_data_create() and ceph_msg_data_destroy() to dynamically allocate and free a data item structure of a given type. When a message has its data item "set," allocate one of these to hold the data description, and free it when the last reference to the message is dropped. This partially resolves: http://tracker.ceph.com/issues/4429 Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c94
1 files changed, 61 insertions, 33 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index dd4b8226a48a..d4e46d8a088c 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1086,7 +1086,7 @@ static void prepare_message_data(struct ceph_msg *msg)
1086 1086
1087 /* Initialize data cursor */ 1087 /* Initialize data cursor */
1088 1088
1089 ceph_msg_data_cursor_init(&msg->data, data_len); 1089 ceph_msg_data_cursor_init(msg->data, data_len);
1090} 1090}
1091 1091
1092/* 1092/*
@@ -1406,13 +1406,13 @@ static u32 ceph_crc32c_page(u32 crc, struct page *page,
1406static int write_partial_message_data(struct ceph_connection *con) 1406static int write_partial_message_data(struct ceph_connection *con)
1407{ 1407{
1408 struct ceph_msg *msg = con->out_msg; 1408 struct ceph_msg *msg = con->out_msg;
1409 struct ceph_msg_data_cursor *cursor = &msg->data.cursor; 1409 struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
1410 bool do_datacrc = !con->msgr->nocrc; 1410 bool do_datacrc = !con->msgr->nocrc;
1411 u32 crc; 1411 u32 crc;
1412 1412
1413 dout("%s %p msg %p\n", __func__, con, msg); 1413 dout("%s %p msg %p\n", __func__, con, msg);
1414 1414
1415 if (WARN_ON(!ceph_msg_has_data(msg))) 1415 if (WARN_ON(!msg->data))
1416 return -EINVAL; 1416 return -EINVAL;
1417 1417
1418 /* 1418 /*
@@ -1432,7 +1432,7 @@ static int write_partial_message_data(struct ceph_connection *con)
1432 bool need_crc; 1432 bool need_crc;
1433 int ret; 1433 int ret;
1434 1434
1435 page = ceph_msg_data_next(&msg->data, &page_offset, &length, 1435 page = ceph_msg_data_next(msg->data, &page_offset, &length,
1436 &last_piece); 1436 &last_piece);
1437 ret = ceph_tcp_sendpage(con->sock, page, page_offset, 1437 ret = ceph_tcp_sendpage(con->sock, page, page_offset,
1438 length, last_piece); 1438 length, last_piece);
@@ -1444,7 +1444,7 @@ static int write_partial_message_data(struct ceph_connection *con)
1444 } 1444 }
1445 if (do_datacrc && cursor->need_crc) 1445 if (do_datacrc && cursor->need_crc)
1446 crc = ceph_crc32c_page(crc, page, page_offset, length); 1446 crc = ceph_crc32c_page(crc, page, page_offset, length);
1447 need_crc = ceph_msg_data_advance(&msg->data, (size_t) ret); 1447 need_crc = ceph_msg_data_advance(msg->data, (size_t)ret);
1448 } 1448 }
1449 1449
1450 dout("%s %p msg %p done\n", __func__, con, msg); 1450 dout("%s %p msg %p done\n", __func__, con, msg);
@@ -2104,7 +2104,7 @@ static int read_partial_message_section(struct ceph_connection *con,
2104static int read_partial_msg_data(struct ceph_connection *con) 2104static int read_partial_msg_data(struct ceph_connection *con)
2105{ 2105{
2106 struct ceph_msg *msg = con->in_msg; 2106 struct ceph_msg *msg = con->in_msg;
2107 struct ceph_msg_data_cursor *cursor = &msg->data.cursor; 2107 struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
2108 const bool do_datacrc = !con->msgr->nocrc; 2108 const bool do_datacrc = !con->msgr->nocrc;
2109 struct page *page; 2109 struct page *page;
2110 size_t page_offset; 2110 size_t page_offset;
@@ -2113,13 +2113,13 @@ static int read_partial_msg_data(struct ceph_connection *con)
2113 int ret; 2113 int ret;
2114 2114
2115 BUG_ON(!msg); 2115 BUG_ON(!msg);
2116 if (WARN_ON(!ceph_msg_has_data(msg))) 2116 if (!msg->data)
2117 return -EIO; 2117 return -EIO;
2118 2118
2119 if (do_datacrc) 2119 if (do_datacrc)
2120 crc = con->in_data_crc; 2120 crc = con->in_data_crc;
2121 while (cursor->resid) { 2121 while (cursor->resid) {
2122 page = ceph_msg_data_next(&msg->data, &page_offset, &length, 2122 page = ceph_msg_data_next(msg->data, &page_offset, &length,
2123 NULL); 2123 NULL);
2124 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); 2124 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
2125 if (ret <= 0) { 2125 if (ret <= 0) {
@@ -2131,7 +2131,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
2131 2131
2132 if (do_datacrc) 2132 if (do_datacrc)
2133 crc = ceph_crc32c_page(crc, page, page_offset, ret); 2133 crc = ceph_crc32c_page(crc, page, page_offset, ret);
2134 (void) ceph_msg_data_advance(&msg->data, (size_t) ret); 2134 (void) ceph_msg_data_advance(msg->data, (size_t)ret);
2135 } 2135 }
2136 if (do_datacrc) 2136 if (do_datacrc)
2137 con->in_data_crc = crc; 2137 con->in_data_crc = crc;
@@ -2947,44 +2947,80 @@ void ceph_con_keepalive(struct ceph_connection *con)
2947} 2947}
2948EXPORT_SYMBOL(ceph_con_keepalive); 2948EXPORT_SYMBOL(ceph_con_keepalive);
2949 2949
2950static void ceph_msg_data_init(struct ceph_msg_data *data) 2950static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
2951{ 2951{
2952 data->type = CEPH_MSG_DATA_NONE; 2952 struct ceph_msg_data *data;
2953
2954 if (WARN_ON(!ceph_msg_data_type_valid(type)))
2955 return NULL;
2956
2957 data = kzalloc(sizeof (*data), GFP_NOFS);
2958 if (data)
2959 data->type = type;
2960
2961 return data;
2962}
2963
2964static void ceph_msg_data_destroy(struct ceph_msg_data *data)
2965{
2966 if (!data)
2967 return;
2968
2969 if (data->type == CEPH_MSG_DATA_PAGELIST) {
2970 ceph_pagelist_release(data->pagelist);
2971 kfree(data->pagelist);
2972 }
2973 kfree(data);
2953} 2974}
2954 2975
2955void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages, 2976void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
2956 size_t length, size_t alignment) 2977 size_t length, size_t alignment)
2957{ 2978{
2979 struct ceph_msg_data *data;
2980
2958 BUG_ON(!pages); 2981 BUG_ON(!pages);
2959 BUG_ON(!length); 2982 BUG_ON(!length);
2960 BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE); 2983 BUG_ON(msg->data != NULL);
2984
2985 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
2986 BUG_ON(!data);
2987 data->pages = pages;
2988 data->length = length;
2989 data->alignment = alignment & ~PAGE_MASK;
2961 2990
2962 msg->data.type = CEPH_MSG_DATA_PAGES; 2991 msg->data = data;
2963 msg->data.pages = pages;
2964 msg->data.length = length;
2965 msg->data.alignment = alignment & ~PAGE_MASK;
2966} 2992}
2967EXPORT_SYMBOL(ceph_msg_data_set_pages); 2993EXPORT_SYMBOL(ceph_msg_data_set_pages);
2968 2994
2969void ceph_msg_data_set_pagelist(struct ceph_msg *msg, 2995void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
2970 struct ceph_pagelist *pagelist) 2996 struct ceph_pagelist *pagelist)
2971{ 2997{
2998 struct ceph_msg_data *data;
2999
2972 BUG_ON(!pagelist); 3000 BUG_ON(!pagelist);
2973 BUG_ON(!pagelist->length); 3001 BUG_ON(!pagelist->length);
2974 BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE); 3002 BUG_ON(msg->data != NULL);
2975 3003
2976 msg->data.type = CEPH_MSG_DATA_PAGELIST; 3004 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
2977 msg->data.pagelist = pagelist; 3005 BUG_ON(!data);
3006 data->pagelist = pagelist;
3007
3008 msg->data = data;
2978} 3009}
2979EXPORT_SYMBOL(ceph_msg_data_set_pagelist); 3010EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
2980 3011
2981void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio) 3012void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
2982{ 3013{
3014 struct ceph_msg_data *data;
3015
2983 BUG_ON(!bio); 3016 BUG_ON(!bio);
2984 BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE); 3017 BUG_ON(msg->data != NULL);
2985 3018
2986 msg->data.type = CEPH_MSG_DATA_BIO; 3019 data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
2987 msg->data.bio = bio; 3020 BUG_ON(!data);
3021 data->bio = bio;
3022
3023 msg->data = data;
2988} 3024}
2989EXPORT_SYMBOL(ceph_msg_data_set_bio); 3025EXPORT_SYMBOL(ceph_msg_data_set_bio);
2990 3026
@@ -3008,8 +3044,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
3008 INIT_LIST_HEAD(&m->list_head); 3044 INIT_LIST_HEAD(&m->list_head);
3009 kref_init(&m->kref); 3045 kref_init(&m->kref);
3010 3046
3011 ceph_msg_data_init(&m->data);
3012
3013 /* front */ 3047 /* front */
3014 m->front_max = front_len; 3048 m->front_max = front_len;
3015 if (front_len) { 3049 if (front_len) {
@@ -3163,14 +3197,8 @@ void ceph_msg_last_put(struct kref *kref)
3163 ceph_buffer_put(m->middle); 3197 ceph_buffer_put(m->middle);
3164 m->middle = NULL; 3198 m->middle = NULL;
3165 } 3199 }
3166 if (ceph_msg_has_data(m)) { 3200 ceph_msg_data_destroy(m->data);
3167 if (m->data.type == CEPH_MSG_DATA_PAGELIST) { 3201 m->data = NULL;
3168 ceph_pagelist_release(m->data.pagelist);
3169 kfree(m->data.pagelist);
3170 }
3171 memset(&m->data, 0, sizeof m->data);
3172 ceph_msg_data_init(&m->data);
3173 }
3174 3202
3175 if (m->pool) 3203 if (m->pool)
3176 ceph_msgpool_put(m->pool, m); 3204 ceph_msgpool_put(m->pool, m);
@@ -3182,7 +3210,7 @@ EXPORT_SYMBOL(ceph_msg_last_put);
3182void ceph_msg_dump(struct ceph_msg *msg) 3210void ceph_msg_dump(struct ceph_msg *msg)
3183{ 3211{
3184 pr_debug("msg_dump %p (front_max %d length %zd)\n", msg, 3212 pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
3185 msg->front_max, msg->data.length); 3213 msg->front_max, msg->data->length);
3186 print_hex_dump(KERN_DEBUG, "header: ", 3214 print_hex_dump(KERN_DEBUG, "header: ",
3187 DUMP_PREFIX_OFFSET, 16, 1, 3215 DUMP_PREFIX_OFFSET, 16, 1,
3188 &msg->hdr, sizeof(msg->hdr), true); 3216 &msg->hdr, sizeof(msg->hdr), true);