diff options
author | Alex Elder <elder@inktank.com> | 2013-03-12 00:34:24 -0400 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-05-02 00:17:37 -0400 |
commit | 6644ed7b7e04f8e588aebdaa58cededb9416ab95 (patch) | |
tree | 2770308c0a48245b81754238f0c37e87605fde3b /net/ceph/messenger.c | |
parent | 8ea299bcbc85aeaf5348d99614b35433287bec6b (diff) |
libceph: make message data be a pointer
Begin the transition from a single message data item to a list of
them by replacing the "data" structure in a message with a pointer
to a ceph_msg_data structure.
A null pointer will indicate the message has no data; replace the
use of ceph_msg_has_data() with a simple check for a null pointer.
Create functions ceph_msg_data_create() and ceph_msg_data_destroy()
to dynamically allocate and free a data item structure of a given type.
When a message has its data item "set," allocate one of these to
hold the data description, and free it when the last reference to
the message is dropped.
This partially resolves:
http://tracker.ceph.com/issues/4429
Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r-- | net/ceph/messenger.c | 94 |
1 files changed, 61 insertions, 33 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index dd4b8226a48a..d4e46d8a088c 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c | |||
@@ -1086,7 +1086,7 @@ static void prepare_message_data(struct ceph_msg *msg) | |||
1086 | 1086 | ||
1087 | /* Initialize data cursor */ | 1087 | /* Initialize data cursor */ |
1088 | 1088 | ||
1089 | ceph_msg_data_cursor_init(&msg->data, data_len); | 1089 | ceph_msg_data_cursor_init(msg->data, data_len); |
1090 | } | 1090 | } |
1091 | 1091 | ||
1092 | /* | 1092 | /* |
@@ -1406,13 +1406,13 @@ static u32 ceph_crc32c_page(u32 crc, struct page *page, | |||
1406 | static int write_partial_message_data(struct ceph_connection *con) | 1406 | static int write_partial_message_data(struct ceph_connection *con) |
1407 | { | 1407 | { |
1408 | struct ceph_msg *msg = con->out_msg; | 1408 | struct ceph_msg *msg = con->out_msg; |
1409 | struct ceph_msg_data_cursor *cursor = &msg->data.cursor; | 1409 | struct ceph_msg_data_cursor *cursor = &msg->data->cursor; |
1410 | bool do_datacrc = !con->msgr->nocrc; | 1410 | bool do_datacrc = !con->msgr->nocrc; |
1411 | u32 crc; | 1411 | u32 crc; |
1412 | 1412 | ||
1413 | dout("%s %p msg %p\n", __func__, con, msg); | 1413 | dout("%s %p msg %p\n", __func__, con, msg); |
1414 | 1414 | ||
1415 | if (WARN_ON(!ceph_msg_has_data(msg))) | 1415 | if (WARN_ON(!msg->data)) |
1416 | return -EINVAL; | 1416 | return -EINVAL; |
1417 | 1417 | ||
1418 | /* | 1418 | /* |
@@ -1432,7 +1432,7 @@ static int write_partial_message_data(struct ceph_connection *con) | |||
1432 | bool need_crc; | 1432 | bool need_crc; |
1433 | int ret; | 1433 | int ret; |
1434 | 1434 | ||
1435 | page = ceph_msg_data_next(&msg->data, &page_offset, &length, | 1435 | page = ceph_msg_data_next(msg->data, &page_offset, &length, |
1436 | &last_piece); | 1436 | &last_piece); |
1437 | ret = ceph_tcp_sendpage(con->sock, page, page_offset, | 1437 | ret = ceph_tcp_sendpage(con->sock, page, page_offset, |
1438 | length, last_piece); | 1438 | length, last_piece); |
@@ -1444,7 +1444,7 @@ static int write_partial_message_data(struct ceph_connection *con) | |||
1444 | } | 1444 | } |
1445 | if (do_datacrc && cursor->need_crc) | 1445 | if (do_datacrc && cursor->need_crc) |
1446 | crc = ceph_crc32c_page(crc, page, page_offset, length); | 1446 | crc = ceph_crc32c_page(crc, page, page_offset, length); |
1447 | need_crc = ceph_msg_data_advance(&msg->data, (size_t) ret); | 1447 | need_crc = ceph_msg_data_advance(msg->data, (size_t)ret); |
1448 | } | 1448 | } |
1449 | 1449 | ||
1450 | dout("%s %p msg %p done\n", __func__, con, msg); | 1450 | dout("%s %p msg %p done\n", __func__, con, msg); |
@@ -2104,7 +2104,7 @@ static int read_partial_message_section(struct ceph_connection *con, | |||
2104 | static int read_partial_msg_data(struct ceph_connection *con) | 2104 | static int read_partial_msg_data(struct ceph_connection *con) |
2105 | { | 2105 | { |
2106 | struct ceph_msg *msg = con->in_msg; | 2106 | struct ceph_msg *msg = con->in_msg; |
2107 | struct ceph_msg_data_cursor *cursor = &msg->data.cursor; | 2107 | struct ceph_msg_data_cursor *cursor = &msg->data->cursor; |
2108 | const bool do_datacrc = !con->msgr->nocrc; | 2108 | const bool do_datacrc = !con->msgr->nocrc; |
2109 | struct page *page; | 2109 | struct page *page; |
2110 | size_t page_offset; | 2110 | size_t page_offset; |
@@ -2113,13 +2113,13 @@ static int read_partial_msg_data(struct ceph_connection *con) | |||
2113 | int ret; | 2113 | int ret; |
2114 | 2114 | ||
2115 | BUG_ON(!msg); | 2115 | BUG_ON(!msg); |
2116 | if (WARN_ON(!ceph_msg_has_data(msg))) | 2116 | if (!msg->data) |
2117 | return -EIO; | 2117 | return -EIO; |
2118 | 2118 | ||
2119 | if (do_datacrc) | 2119 | if (do_datacrc) |
2120 | crc = con->in_data_crc; | 2120 | crc = con->in_data_crc; |
2121 | while (cursor->resid) { | 2121 | while (cursor->resid) { |
2122 | page = ceph_msg_data_next(&msg->data, &page_offset, &length, | 2122 | page = ceph_msg_data_next(msg->data, &page_offset, &length, |
2123 | NULL); | 2123 | NULL); |
2124 | ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); | 2124 | ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); |
2125 | if (ret <= 0) { | 2125 | if (ret <= 0) { |
@@ -2131,7 +2131,7 @@ static int read_partial_msg_data(struct ceph_connection *con) | |||
2131 | 2131 | ||
2132 | if (do_datacrc) | 2132 | if (do_datacrc) |
2133 | crc = ceph_crc32c_page(crc, page, page_offset, ret); | 2133 | crc = ceph_crc32c_page(crc, page, page_offset, ret); |
2134 | (void) ceph_msg_data_advance(&msg->data, (size_t) ret); | 2134 | (void) ceph_msg_data_advance(msg->data, (size_t)ret); |
2135 | } | 2135 | } |
2136 | if (do_datacrc) | 2136 | if (do_datacrc) |
2137 | con->in_data_crc = crc; | 2137 | con->in_data_crc = crc; |
@@ -2947,44 +2947,80 @@ void ceph_con_keepalive(struct ceph_connection *con) | |||
2947 | } | 2947 | } |
2948 | EXPORT_SYMBOL(ceph_con_keepalive); | 2948 | EXPORT_SYMBOL(ceph_con_keepalive); |
2949 | 2949 | ||
2950 | static void ceph_msg_data_init(struct ceph_msg_data *data) | 2950 | static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type) |
2951 | { | 2951 | { |
2952 | data->type = CEPH_MSG_DATA_NONE; | 2952 | struct ceph_msg_data *data; |
2953 | |||
2954 | if (WARN_ON(!ceph_msg_data_type_valid(type))) | ||
2955 | return NULL; | ||
2956 | |||
2957 | data = kzalloc(sizeof (*data), GFP_NOFS); | ||
2958 | if (data) | ||
2959 | data->type = type; | ||
2960 | |||
2961 | return data; | ||
2962 | } | ||
2963 | |||
2964 | static void ceph_msg_data_destroy(struct ceph_msg_data *data) | ||
2965 | { | ||
2966 | if (!data) | ||
2967 | return; | ||
2968 | |||
2969 | if (data->type == CEPH_MSG_DATA_PAGELIST) { | ||
2970 | ceph_pagelist_release(data->pagelist); | ||
2971 | kfree(data->pagelist); | ||
2972 | } | ||
2973 | kfree(data); | ||
2953 | } | 2974 | } |
2954 | 2975 | ||
2955 | void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages, | 2976 | void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages, |
2956 | size_t length, size_t alignment) | 2977 | size_t length, size_t alignment) |
2957 | { | 2978 | { |
2979 | struct ceph_msg_data *data; | ||
2980 | |||
2958 | BUG_ON(!pages); | 2981 | BUG_ON(!pages); |
2959 | BUG_ON(!length); | 2982 | BUG_ON(!length); |
2960 | BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE); | 2983 | BUG_ON(msg->data != NULL); |
2984 | |||
2985 | data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES); | ||
2986 | BUG_ON(!data); | ||
2987 | data->pages = pages; | ||
2988 | data->length = length; | ||
2989 | data->alignment = alignment & ~PAGE_MASK; | ||
2961 | 2990 | ||
2962 | msg->data.type = CEPH_MSG_DATA_PAGES; | 2991 | msg->data = data; |
2963 | msg->data.pages = pages; | ||
2964 | msg->data.length = length; | ||
2965 | msg->data.alignment = alignment & ~PAGE_MASK; | ||
2966 | } | 2992 | } |
2967 | EXPORT_SYMBOL(ceph_msg_data_set_pages); | 2993 | EXPORT_SYMBOL(ceph_msg_data_set_pages); |
2968 | 2994 | ||
2969 | void ceph_msg_data_set_pagelist(struct ceph_msg *msg, | 2995 | void ceph_msg_data_set_pagelist(struct ceph_msg *msg, |
2970 | struct ceph_pagelist *pagelist) | 2996 | struct ceph_pagelist *pagelist) |
2971 | { | 2997 | { |
2998 | struct ceph_msg_data *data; | ||
2999 | |||
2972 | BUG_ON(!pagelist); | 3000 | BUG_ON(!pagelist); |
2973 | BUG_ON(!pagelist->length); | 3001 | BUG_ON(!pagelist->length); |
2974 | BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE); | 3002 | BUG_ON(msg->data != NULL); |
2975 | 3003 | ||
2976 | msg->data.type = CEPH_MSG_DATA_PAGELIST; | 3004 | data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST); |
2977 | msg->data.pagelist = pagelist; | 3005 | BUG_ON(!data); |
3006 | data->pagelist = pagelist; | ||
3007 | |||
3008 | msg->data = data; | ||
2978 | } | 3009 | } |
2979 | EXPORT_SYMBOL(ceph_msg_data_set_pagelist); | 3010 | EXPORT_SYMBOL(ceph_msg_data_set_pagelist); |
2980 | 3011 | ||
2981 | void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio) | 3012 | void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio) |
2982 | { | 3013 | { |
3014 | struct ceph_msg_data *data; | ||
3015 | |||
2983 | BUG_ON(!bio); | 3016 | BUG_ON(!bio); |
2984 | BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE); | 3017 | BUG_ON(msg->data != NULL); |
2985 | 3018 | ||
2986 | msg->data.type = CEPH_MSG_DATA_BIO; | 3019 | data = ceph_msg_data_create(CEPH_MSG_DATA_BIO); |
2987 | msg->data.bio = bio; | 3020 | BUG_ON(!data); |
3021 | data->bio = bio; | ||
3022 | |||
3023 | msg->data = data; | ||
2988 | } | 3024 | } |
2989 | EXPORT_SYMBOL(ceph_msg_data_set_bio); | 3025 | EXPORT_SYMBOL(ceph_msg_data_set_bio); |
2990 | 3026 | ||
@@ -3008,8 +3044,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, | |||
3008 | INIT_LIST_HEAD(&m->list_head); | 3044 | INIT_LIST_HEAD(&m->list_head); |
3009 | kref_init(&m->kref); | 3045 | kref_init(&m->kref); |
3010 | 3046 | ||
3011 | ceph_msg_data_init(&m->data); | ||
3012 | |||
3013 | /* front */ | 3047 | /* front */ |
3014 | m->front_max = front_len; | 3048 | m->front_max = front_len; |
3015 | if (front_len) { | 3049 | if (front_len) { |
@@ -3163,14 +3197,8 @@ void ceph_msg_last_put(struct kref *kref) | |||
3163 | ceph_buffer_put(m->middle); | 3197 | ceph_buffer_put(m->middle); |
3164 | m->middle = NULL; | 3198 | m->middle = NULL; |
3165 | } | 3199 | } |
3166 | if (ceph_msg_has_data(m)) { | 3200 | ceph_msg_data_destroy(m->data); |
3167 | if (m->data.type == CEPH_MSG_DATA_PAGELIST) { | 3201 | m->data = NULL; |
3168 | ceph_pagelist_release(m->data.pagelist); | ||
3169 | kfree(m->data.pagelist); | ||
3170 | } | ||
3171 | memset(&m->data, 0, sizeof m->data); | ||
3172 | ceph_msg_data_init(&m->data); | ||
3173 | } | ||
3174 | 3202 | ||
3175 | if (m->pool) | 3203 | if (m->pool) |
3176 | ceph_msgpool_put(m->pool, m); | 3204 | ceph_msgpool_put(m->pool, m); |
@@ -3182,7 +3210,7 @@ EXPORT_SYMBOL(ceph_msg_last_put); | |||
3182 | void ceph_msg_dump(struct ceph_msg *msg) | 3210 | void ceph_msg_dump(struct ceph_msg *msg) |
3183 | { | 3211 | { |
3184 | pr_debug("msg_dump %p (front_max %d length %zd)\n", msg, | 3212 | pr_debug("msg_dump %p (front_max %d length %zd)\n", msg, |
3185 | msg->front_max, msg->data.length); | 3213 | msg->front_max, msg->data->length); |
3186 | print_hex_dump(KERN_DEBUG, "header: ", | 3214 | print_hex_dump(KERN_DEBUG, "header: ", |
3187 | DUMP_PREFIX_OFFSET, 16, 1, | 3215 | DUMP_PREFIX_OFFSET, 16, 1, |
3188 | &msg->hdr, sizeof(msg->hdr), true); | 3216 | &msg->hdr, sizeof(msg->hdr), true); |