aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2018-10-31 17:42:31 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2018-10-31 17:42:31 -0400
commit31990f0f5366a8f66688edae8688723b22034108 (patch)
tree07078a732a5f02d2330f3cb873286f9ac53ea969 /net/ceph
parenta9ac6cc47bbb0fdd042012044f737ba13da10cb4 (diff)
parentea4cdc548e5e74a529cdd1aea885d74b4aa8f1b3 (diff)
Merge tag 'ceph-for-4.20-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov: "The highlights are: - a series that fixes some old memory allocation issues in libceph (myself). We no longer allocate memory in places where allocation failures cannot be handled and BUG when the allocation fails. - support for copy_file_range() syscall (Luis Henriques). If size and alignment conditions are met, it leverages RADOS copy-from operation. Otherwise, a local copy is performed. - a patch that reduces memory requirement of ceph_sync_read() from the size of the entire read to the size of one object (Zheng Yan). - fallocate() syscall is now restricted to FALLOC_FL_PUNCH_HOLE (Luis Henriques)" * tag 'ceph-for-4.20-rc1' of git://github.com/ceph/ceph-client: (25 commits) ceph: new mount option to disable usage of copy-from op ceph: support copy_file_range file operation libceph: support the RADOS copy-from operation ceph: add non-blocking parameter to ceph_try_get_caps() libceph: check reply num_data_items in setup_request_data() libceph: preallocate message data items libceph, rbd, ceph: move ceph_osdc_alloc_messages() calls libceph: introduce alloc_watch_request() libceph: assign cookies in linger_submit() libceph: enable fallback to ceph_msg_new() in ceph_msgpool_get() ceph: num_ops is off by one in ceph_aio_retry_work() libceph: no need to call osd_req_opcode_valid() in osd_req_encode_op() ceph: set timeout conditionally in __cap_delay_requeue libceph: don't consume a ref on pagelist in ceph_msg_data_add_pagelist() libceph: introduce ceph_pagelist_alloc() libceph: osd_req_op_cls_init() doesn't need to take opcode libceph: bump CEPH_MSG_MAX_DATA_LEN ceph: only allow punch hole mode in fallocate ceph: refactor ceph_sync_read() ceph: check if LOOKUPNAME request was aborted when filling trace ...
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/messenger.c107
-rw-r--r--net/ceph/msgpool.c27
-rw-r--r--net/ceph/osd_client.c363
-rw-r--r--net/ceph/pagelist.c20
4 files changed, 348 insertions, 169 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 0a187196aeed..88e35830198c 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -156,7 +156,6 @@ static bool con_flag_test_and_set(struct ceph_connection *con,
156/* Slab caches for frequently-allocated structures */ 156/* Slab caches for frequently-allocated structures */
157 157
158static struct kmem_cache *ceph_msg_cache; 158static struct kmem_cache *ceph_msg_cache;
159static struct kmem_cache *ceph_msg_data_cache;
160 159
161/* static tag bytes (protocol control messages) */ 160/* static tag bytes (protocol control messages) */
162static char tag_msg = CEPH_MSGR_TAG_MSG; 161static char tag_msg = CEPH_MSGR_TAG_MSG;
@@ -235,23 +234,11 @@ static int ceph_msgr_slab_init(void)
235 if (!ceph_msg_cache) 234 if (!ceph_msg_cache)
236 return -ENOMEM; 235 return -ENOMEM;
237 236
238 BUG_ON(ceph_msg_data_cache); 237 return 0;
239 ceph_msg_data_cache = KMEM_CACHE(ceph_msg_data, 0);
240 if (ceph_msg_data_cache)
241 return 0;
242
243 kmem_cache_destroy(ceph_msg_cache);
244 ceph_msg_cache = NULL;
245
246 return -ENOMEM;
247} 238}
248 239
249static void ceph_msgr_slab_exit(void) 240static void ceph_msgr_slab_exit(void)
250{ 241{
251 BUG_ON(!ceph_msg_data_cache);
252 kmem_cache_destroy(ceph_msg_data_cache);
253 ceph_msg_data_cache = NULL;
254
255 BUG_ON(!ceph_msg_cache); 242 BUG_ON(!ceph_msg_cache);
256 kmem_cache_destroy(ceph_msg_cache); 243 kmem_cache_destroy(ceph_msg_cache);
257 ceph_msg_cache = NULL; 244 ceph_msg_cache = NULL;
@@ -1141,16 +1128,13 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
1141static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length) 1128static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
1142{ 1129{
1143 struct ceph_msg_data_cursor *cursor = &msg->cursor; 1130 struct ceph_msg_data_cursor *cursor = &msg->cursor;
1144 struct ceph_msg_data *data;
1145 1131
1146 BUG_ON(!length); 1132 BUG_ON(!length);
1147 BUG_ON(length > msg->data_length); 1133 BUG_ON(length > msg->data_length);
1148 BUG_ON(list_empty(&msg->data)); 1134 BUG_ON(!msg->num_data_items);
1149 1135
1150 cursor->data_head = &msg->data;
1151 cursor->total_resid = length; 1136 cursor->total_resid = length;
1152 data = list_first_entry(&msg->data, struct ceph_msg_data, links); 1137 cursor->data = msg->data;
1153 cursor->data = data;
1154 1138
1155 __ceph_msg_data_cursor_init(cursor); 1139 __ceph_msg_data_cursor_init(cursor);
1156} 1140}
@@ -1231,8 +1215,7 @@ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
1231 1215
1232 if (!cursor->resid && cursor->total_resid) { 1216 if (!cursor->resid && cursor->total_resid) {
1233 WARN_ON(!cursor->last_piece); 1217 WARN_ON(!cursor->last_piece);
1234 BUG_ON(list_is_last(&cursor->data->links, cursor->data_head)); 1218 cursor->data++;
1235 cursor->data = list_next_entry(cursor->data, links);
1236 __ceph_msg_data_cursor_init(cursor); 1219 __ceph_msg_data_cursor_init(cursor);
1237 new_piece = true; 1220 new_piece = true;
1238 } 1221 }
@@ -1248,9 +1231,6 @@ static size_t sizeof_footer(struct ceph_connection *con)
1248 1231
1249static void prepare_message_data(struct ceph_msg *msg, u32 data_len) 1232static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
1250{ 1233{
1251 BUG_ON(!msg);
1252 BUG_ON(!data_len);
1253
1254 /* Initialize data cursor */ 1234 /* Initialize data cursor */
1255 1235
1256 ceph_msg_data_cursor_init(msg, (size_t)data_len); 1236 ceph_msg_data_cursor_init(msg, (size_t)data_len);
@@ -1590,7 +1570,7 @@ static int write_partial_message_data(struct ceph_connection *con)
1590 1570
1591 dout("%s %p msg %p\n", __func__, con, msg); 1571 dout("%s %p msg %p\n", __func__, con, msg);
1592 1572
1593 if (list_empty(&msg->data)) 1573 if (!msg->num_data_items)
1594 return -EINVAL; 1574 return -EINVAL;
1595 1575
1596 /* 1576 /*
@@ -2347,8 +2327,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
2347 u32 crc = 0; 2327 u32 crc = 0;
2348 int ret; 2328 int ret;
2349 2329
2350 BUG_ON(!msg); 2330 if (!msg->num_data_items)
2351 if (list_empty(&msg->data))
2352 return -EIO; 2331 return -EIO;
2353 2332
2354 if (do_datacrc) 2333 if (do_datacrc)
@@ -3256,32 +3235,16 @@ bool ceph_con_keepalive_expired(struct ceph_connection *con,
3256 return false; 3235 return false;
3257} 3236}
3258 3237
3259static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type) 3238static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)
3260{ 3239{
3261 struct ceph_msg_data *data; 3240 BUG_ON(msg->num_data_items >= msg->max_data_items);
3262 3241 return &msg->data[msg->num_data_items++];
3263 if (WARN_ON(!ceph_msg_data_type_valid(type)))
3264 return NULL;
3265
3266 data = kmem_cache_zalloc(ceph_msg_data_cache, GFP_NOFS);
3267 if (!data)
3268 return NULL;
3269
3270 data->type = type;
3271 INIT_LIST_HEAD(&data->links);
3272
3273 return data;
3274} 3242}
3275 3243
3276static void ceph_msg_data_destroy(struct ceph_msg_data *data) 3244static void ceph_msg_data_destroy(struct ceph_msg_data *data)
3277{ 3245{
3278 if (!data)
3279 return;
3280
3281 WARN_ON(!list_empty(&data->links));
3282 if (data->type == CEPH_MSG_DATA_PAGELIST) 3246 if (data->type == CEPH_MSG_DATA_PAGELIST)
3283 ceph_pagelist_release(data->pagelist); 3247 ceph_pagelist_release(data->pagelist);
3284 kmem_cache_free(ceph_msg_data_cache, data);
3285} 3248}
3286 3249
3287void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, 3250void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
@@ -3292,13 +3255,12 @@ void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
3292 BUG_ON(!pages); 3255 BUG_ON(!pages);
3293 BUG_ON(!length); 3256 BUG_ON(!length);
3294 3257
3295 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES); 3258 data = ceph_msg_data_add(msg);
3296 BUG_ON(!data); 3259 data->type = CEPH_MSG_DATA_PAGES;
3297 data->pages = pages; 3260 data->pages = pages;
3298 data->length = length; 3261 data->length = length;
3299 data->alignment = alignment & ~PAGE_MASK; 3262 data->alignment = alignment & ~PAGE_MASK;
3300 3263
3301 list_add_tail(&data->links, &msg->data);
3302 msg->data_length += length; 3264 msg->data_length += length;
3303} 3265}
3304EXPORT_SYMBOL(ceph_msg_data_add_pages); 3266EXPORT_SYMBOL(ceph_msg_data_add_pages);
@@ -3311,11 +3273,11 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
3311 BUG_ON(!pagelist); 3273 BUG_ON(!pagelist);
3312 BUG_ON(!pagelist->length); 3274 BUG_ON(!pagelist->length);
3313 3275
3314 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST); 3276 data = ceph_msg_data_add(msg);
3315 BUG_ON(!data); 3277 data->type = CEPH_MSG_DATA_PAGELIST;
3278 refcount_inc(&pagelist->refcnt);
3316 data->pagelist = pagelist; 3279 data->pagelist = pagelist;
3317 3280
3318 list_add_tail(&data->links, &msg->data);
3319 msg->data_length += pagelist->length; 3281 msg->data_length += pagelist->length;
3320} 3282}
3321EXPORT_SYMBOL(ceph_msg_data_add_pagelist); 3283EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
@@ -3326,12 +3288,11 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
3326{ 3288{
3327 struct ceph_msg_data *data; 3289 struct ceph_msg_data *data;
3328 3290
3329 data = ceph_msg_data_create(CEPH_MSG_DATA_BIO); 3291 data = ceph_msg_data_add(msg);
3330 BUG_ON(!data); 3292 data->type = CEPH_MSG_DATA_BIO;
3331 data->bio_pos = *bio_pos; 3293 data->bio_pos = *bio_pos;
3332 data->bio_length = length; 3294 data->bio_length = length;
3333 3295
3334 list_add_tail(&data->links, &msg->data);
3335 msg->data_length += length; 3296 msg->data_length += length;
3336} 3297}
3337EXPORT_SYMBOL(ceph_msg_data_add_bio); 3298EXPORT_SYMBOL(ceph_msg_data_add_bio);
@@ -3342,11 +3303,10 @@ void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
3342{ 3303{
3343 struct ceph_msg_data *data; 3304 struct ceph_msg_data *data;
3344 3305
3345 data = ceph_msg_data_create(CEPH_MSG_DATA_BVECS); 3306 data = ceph_msg_data_add(msg);
3346 BUG_ON(!data); 3307 data->type = CEPH_MSG_DATA_BVECS;
3347 data->bvec_pos = *bvec_pos; 3308 data->bvec_pos = *bvec_pos;
3348 3309
3349 list_add_tail(&data->links, &msg->data);
3350 msg->data_length += bvec_pos->iter.bi_size; 3310 msg->data_length += bvec_pos->iter.bi_size;
3351} 3311}
3352EXPORT_SYMBOL(ceph_msg_data_add_bvecs); 3312EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
@@ -3355,8 +3315,8 @@ EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
3355 * construct a new message with given type, size 3315 * construct a new message with given type, size
3356 * the new msg has a ref count of 1. 3316 * the new msg has a ref count of 1.
3357 */ 3317 */
3358struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, 3318struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
3359 bool can_fail) 3319 gfp_t flags, bool can_fail)
3360{ 3320{
3361 struct ceph_msg *m; 3321 struct ceph_msg *m;
3362 3322
@@ -3370,7 +3330,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
3370 3330
3371 INIT_LIST_HEAD(&m->list_head); 3331 INIT_LIST_HEAD(&m->list_head);
3372 kref_init(&m->kref); 3332 kref_init(&m->kref);
3373 INIT_LIST_HEAD(&m->data);
3374 3333
3375 /* front */ 3334 /* front */
3376 if (front_len) { 3335 if (front_len) {
@@ -3385,6 +3344,15 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
3385 } 3344 }
3386 m->front_alloc_len = m->front.iov_len = front_len; 3345 m->front_alloc_len = m->front.iov_len = front_len;
3387 3346
3347 if (max_data_items) {
3348 m->data = kmalloc_array(max_data_items, sizeof(*m->data),
3349 flags);
3350 if (!m->data)
3351 goto out2;
3352
3353 m->max_data_items = max_data_items;
3354 }
3355
3388 dout("ceph_msg_new %p front %d\n", m, front_len); 3356 dout("ceph_msg_new %p front %d\n", m, front_len);
3389 return m; 3357 return m;
3390 3358
@@ -3401,6 +3369,13 @@ out:
3401 } 3369 }
3402 return NULL; 3370 return NULL;
3403} 3371}
3372EXPORT_SYMBOL(ceph_msg_new2);
3373
3374struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
3375 bool can_fail)
3376{
3377 return ceph_msg_new2(type, front_len, 0, flags, can_fail);
3378}
3404EXPORT_SYMBOL(ceph_msg_new); 3379EXPORT_SYMBOL(ceph_msg_new);
3405 3380
3406/* 3381/*
@@ -3496,13 +3471,14 @@ static void ceph_msg_free(struct ceph_msg *m)
3496{ 3471{
3497 dout("%s %p\n", __func__, m); 3472 dout("%s %p\n", __func__, m);
3498 kvfree(m->front.iov_base); 3473 kvfree(m->front.iov_base);
3474 kfree(m->data);
3499 kmem_cache_free(ceph_msg_cache, m); 3475 kmem_cache_free(ceph_msg_cache, m);
3500} 3476}
3501 3477
3502static void ceph_msg_release(struct kref *kref) 3478static void ceph_msg_release(struct kref *kref)
3503{ 3479{
3504 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); 3480 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
3505 struct ceph_msg_data *data, *next; 3481 int i;
3506 3482
3507 dout("%s %p\n", __func__, m); 3483 dout("%s %p\n", __func__, m);
3508 WARN_ON(!list_empty(&m->list_head)); 3484 WARN_ON(!list_empty(&m->list_head));
@@ -3515,11 +3491,8 @@ static void ceph_msg_release(struct kref *kref)
3515 m->middle = NULL; 3491 m->middle = NULL;
3516 } 3492 }
3517 3493
3518 list_for_each_entry_safe(data, next, &m->data, links) { 3494 for (i = 0; i < m->num_data_items; i++)
3519 list_del_init(&data->links); 3495 ceph_msg_data_destroy(&m->data[i]);
3520 ceph_msg_data_destroy(data);
3521 }
3522 m->data_length = 0;
3523 3496
3524 if (m->pool) 3497 if (m->pool)
3525 ceph_msgpool_put(m->pool, m); 3498 ceph_msgpool_put(m->pool, m);
diff --git a/net/ceph/msgpool.c b/net/ceph/msgpool.c
index 72571535883f..e3ecb80cd182 100644
--- a/net/ceph/msgpool.c
+++ b/net/ceph/msgpool.c
@@ -14,7 +14,8 @@ static void *msgpool_alloc(gfp_t gfp_mask, void *arg)
14 struct ceph_msgpool *pool = arg; 14 struct ceph_msgpool *pool = arg;
15 struct ceph_msg *msg; 15 struct ceph_msg *msg;
16 16
17 msg = ceph_msg_new(pool->type, pool->front_len, gfp_mask, true); 17 msg = ceph_msg_new2(pool->type, pool->front_len, pool->max_data_items,
18 gfp_mask, true);
18 if (!msg) { 19 if (!msg) {
19 dout("msgpool_alloc %s failed\n", pool->name); 20 dout("msgpool_alloc %s failed\n", pool->name);
20 } else { 21 } else {
@@ -35,11 +36,13 @@ static void msgpool_free(void *element, void *arg)
35} 36}
36 37
37int ceph_msgpool_init(struct ceph_msgpool *pool, int type, 38int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
38 int front_len, int size, bool blocking, const char *name) 39 int front_len, int max_data_items, int size,
40 const char *name)
39{ 41{
40 dout("msgpool %s init\n", name); 42 dout("msgpool %s init\n", name);
41 pool->type = type; 43 pool->type = type;
42 pool->front_len = front_len; 44 pool->front_len = front_len;
45 pool->max_data_items = max_data_items;
43 pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool); 46 pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool);
44 if (!pool->pool) 47 if (!pool->pool)
45 return -ENOMEM; 48 return -ENOMEM;
@@ -53,18 +56,21 @@ void ceph_msgpool_destroy(struct ceph_msgpool *pool)
53 mempool_destroy(pool->pool); 56 mempool_destroy(pool->pool);
54} 57}
55 58
56struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, 59struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len,
57 int front_len) 60 int max_data_items)
58{ 61{
59 struct ceph_msg *msg; 62 struct ceph_msg *msg;
60 63
61 if (front_len > pool->front_len) { 64 if (front_len > pool->front_len ||
62 dout("msgpool_get %s need front %d, pool size is %d\n", 65 max_data_items > pool->max_data_items) {
63 pool->name, front_len, pool->front_len); 66 pr_warn_ratelimited("%s need %d/%d, pool %s has %d/%d\n",
64 WARN_ON(1); 67 __func__, front_len, max_data_items, pool->name,
68 pool->front_len, pool->max_data_items);
69 WARN_ON_ONCE(1);
65 70
66 /* try to alloc a fresh message */ 71 /* try to alloc a fresh message */
67 return ceph_msg_new(pool->type, front_len, GFP_NOFS, false); 72 return ceph_msg_new2(pool->type, front_len, max_data_items,
73 GFP_NOFS, false);
68 } 74 }
69 75
70 msg = mempool_alloc(pool->pool, GFP_NOFS); 76 msg = mempool_alloc(pool->pool, GFP_NOFS);
@@ -80,6 +86,9 @@ void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg)
80 msg->front.iov_len = pool->front_len; 86 msg->front.iov_len = pool->front_len;
81 msg->hdr.front_len = cpu_to_le32(pool->front_len); 87 msg->hdr.front_len = cpu_to_le32(pool->front_len);
82 88
89 msg->data_length = 0;
90 msg->num_data_items = 0;
91
83 kref_init(&msg->kref); /* retake single ref */ 92 kref_init(&msg->kref); /* retake single ref */
84 mempool_free(msg, pool->pool); 93 mempool_free(msg, pool->pool);
85} 94}
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 60934bd8796c..d23a9f81f3d7 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -126,6 +126,9 @@ static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
126 osd_data->type = CEPH_OSD_DATA_TYPE_NONE; 126 osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
127} 127}
128 128
129/*
130 * Consumes @pages if @own_pages is true.
131 */
129static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, 132static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
130 struct page **pages, u64 length, u32 alignment, 133 struct page **pages, u64 length, u32 alignment,
131 bool pages_from_pool, bool own_pages) 134 bool pages_from_pool, bool own_pages)
@@ -138,6 +141,9 @@ static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
138 osd_data->own_pages = own_pages; 141 osd_data->own_pages = own_pages;
139} 142}
140 143
144/*
145 * Consumes a ref on @pagelist.
146 */
141static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, 147static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
142 struct ceph_pagelist *pagelist) 148 struct ceph_pagelist *pagelist)
143{ 149{
@@ -362,6 +368,8 @@ static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
362 num_pages = calc_pages_for((u64)osd_data->alignment, 368 num_pages = calc_pages_for((u64)osd_data->alignment,
363 (u64)osd_data->length); 369 (u64)osd_data->length);
364 ceph_release_page_vector(osd_data->pages, num_pages); 370 ceph_release_page_vector(osd_data->pages, num_pages);
371 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
372 ceph_pagelist_release(osd_data->pagelist);
365 } 373 }
366 ceph_osd_data_init(osd_data); 374 ceph_osd_data_init(osd_data);
367} 375}
@@ -402,6 +410,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
402 case CEPH_OSD_OP_LIST_WATCHERS: 410 case CEPH_OSD_OP_LIST_WATCHERS:
403 ceph_osd_data_release(&op->list_watchers.response_data); 411 ceph_osd_data_release(&op->list_watchers.response_data);
404 break; 412 break;
413 case CEPH_OSD_OP_COPY_FROM:
414 ceph_osd_data_release(&op->copy_from.osd_data);
415 break;
405 default: 416 default:
406 break; 417 break;
407 } 418 }
@@ -606,12 +617,15 @@ static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
606 return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0); 617 return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
607} 618}
608 619
609int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) 620static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp,
621 int num_request_data_items,
622 int num_reply_data_items)
610{ 623{
611 struct ceph_osd_client *osdc = req->r_osdc; 624 struct ceph_osd_client *osdc = req->r_osdc;
612 struct ceph_msg *msg; 625 struct ceph_msg *msg;
613 int msg_size; 626 int msg_size;
614 627
628 WARN_ON(req->r_request || req->r_reply);
615 WARN_ON(ceph_oid_empty(&req->r_base_oid)); 629 WARN_ON(ceph_oid_empty(&req->r_base_oid));
616 WARN_ON(ceph_oloc_empty(&req->r_base_oloc)); 630 WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
617 631
@@ -633,9 +647,11 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
633 msg_size += 4 + 8; /* retry_attempt, features */ 647 msg_size += 4 + 8; /* retry_attempt, features */
634 648
635 if (req->r_mempool) 649 if (req->r_mempool)
636 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 650 msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size,
651 num_request_data_items);
637 else 652 else
638 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true); 653 msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size,
654 num_request_data_items, gfp, true);
639 if (!msg) 655 if (!msg)
640 return -ENOMEM; 656 return -ENOMEM;
641 657
@@ -648,9 +664,11 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
648 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op); 664 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
649 665
650 if (req->r_mempool) 666 if (req->r_mempool)
651 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 667 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size,
668 num_reply_data_items);
652 else 669 else
653 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true); 670 msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size,
671 num_reply_data_items, gfp, true);
654 if (!msg) 672 if (!msg)
655 return -ENOMEM; 673 return -ENOMEM;
656 674
@@ -658,7 +676,6 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
658 676
659 return 0; 677 return 0;
660} 678}
661EXPORT_SYMBOL(ceph_osdc_alloc_messages);
662 679
663static bool osd_req_opcode_valid(u16 opcode) 680static bool osd_req_opcode_valid(u16 opcode)
664{ 681{
@@ -671,6 +688,65 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
671 } 688 }
672} 689}
673 690
691static void get_num_data_items(struct ceph_osd_request *req,
692 int *num_request_data_items,
693 int *num_reply_data_items)
694{
695 struct ceph_osd_req_op *op;
696
697 *num_request_data_items = 0;
698 *num_reply_data_items = 0;
699
700 for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
701 switch (op->op) {
702 /* request */
703 case CEPH_OSD_OP_WRITE:
704 case CEPH_OSD_OP_WRITEFULL:
705 case CEPH_OSD_OP_SETXATTR:
706 case CEPH_OSD_OP_CMPXATTR:
707 case CEPH_OSD_OP_NOTIFY_ACK:
708 case CEPH_OSD_OP_COPY_FROM:
709 *num_request_data_items += 1;
710 break;
711
712 /* reply */
713 case CEPH_OSD_OP_STAT:
714 case CEPH_OSD_OP_READ:
715 case CEPH_OSD_OP_LIST_WATCHERS:
716 *num_reply_data_items += 1;
717 break;
718
719 /* both */
720 case CEPH_OSD_OP_NOTIFY:
721 *num_request_data_items += 1;
722 *num_reply_data_items += 1;
723 break;
724 case CEPH_OSD_OP_CALL:
725 *num_request_data_items += 2;
726 *num_reply_data_items += 1;
727 break;
728
729 default:
730 WARN_ON(!osd_req_opcode_valid(op->op));
731 break;
732 }
733 }
734}
735
736/*
737 * oid, oloc and OSD op opcode(s) must be filled in before this function
738 * is called.
739 */
740int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
741{
742 int num_request_data_items, num_reply_data_items;
743
744 get_num_data_items(req, &num_request_data_items, &num_reply_data_items);
745 return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items,
746 num_reply_data_items);
747}
748EXPORT_SYMBOL(ceph_osdc_alloc_messages);
749
674/* 750/*
675 * This is an osd op init function for opcodes that have no data or 751 * This is an osd op init function for opcodes that have no data or
676 * other information associated with them. It also serves as a 752 * other information associated with them. It also serves as a
@@ -767,22 +843,19 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
767EXPORT_SYMBOL(osd_req_op_extent_dup_last); 843EXPORT_SYMBOL(osd_req_op_extent_dup_last);
768 844
769int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 845int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
770 u16 opcode, const char *class, const char *method) 846 const char *class, const char *method)
771{ 847{
772 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 848 struct ceph_osd_req_op *op;
773 opcode, 0);
774 struct ceph_pagelist *pagelist; 849 struct ceph_pagelist *pagelist;
775 size_t payload_len = 0; 850 size_t payload_len = 0;
776 size_t size; 851 size_t size;
777 852
778 BUG_ON(opcode != CEPH_OSD_OP_CALL); 853 op = _osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
779 854
780 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 855 pagelist = ceph_pagelist_alloc(GFP_NOFS);
781 if (!pagelist) 856 if (!pagelist)
782 return -ENOMEM; 857 return -ENOMEM;
783 858
784 ceph_pagelist_init(pagelist);
785
786 op->cls.class_name = class; 859 op->cls.class_name = class;
787 size = strlen(class); 860 size = strlen(class);
788 BUG_ON(size > (size_t) U8_MAX); 861 BUG_ON(size > (size_t) U8_MAX);
@@ -815,12 +888,10 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
815 888
816 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR); 889 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
817 890
818 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 891 pagelist = ceph_pagelist_alloc(GFP_NOFS);
819 if (!pagelist) 892 if (!pagelist)
820 return -ENOMEM; 893 return -ENOMEM;
821 894
822 ceph_pagelist_init(pagelist);
823
824 payload_len = strlen(name); 895 payload_len = strlen(name);
825 op->xattr.name_len = payload_len; 896 op->xattr.name_len = payload_len;
826 ceph_pagelist_append(pagelist, name, payload_len); 897 ceph_pagelist_append(pagelist, name, payload_len);
@@ -900,12 +971,6 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
900static u32 osd_req_encode_op(struct ceph_osd_op *dst, 971static u32 osd_req_encode_op(struct ceph_osd_op *dst,
901 const struct ceph_osd_req_op *src) 972 const struct ceph_osd_req_op *src)
902{ 973{
903 if (WARN_ON(!osd_req_opcode_valid(src->op))) {
904 pr_err("unrecognized osd opcode %d\n", src->op);
905
906 return 0;
907 }
908
909 switch (src->op) { 974 switch (src->op) {
910 case CEPH_OSD_OP_STAT: 975 case CEPH_OSD_OP_STAT:
911 break; 976 break;
@@ -955,6 +1020,14 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
955 case CEPH_OSD_OP_CREATE: 1020 case CEPH_OSD_OP_CREATE:
956 case CEPH_OSD_OP_DELETE: 1021 case CEPH_OSD_OP_DELETE:
957 break; 1022 break;
1023 case CEPH_OSD_OP_COPY_FROM:
1024 dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
1025 dst->copy_from.src_version =
1026 cpu_to_le64(src->copy_from.src_version);
1027 dst->copy_from.flags = src->copy_from.flags;
1028 dst->copy_from.src_fadvise_flags =
1029 cpu_to_le32(src->copy_from.src_fadvise_flags);
1030 break;
958 default: 1031 default:
959 pr_err("unsupported osd opcode %s\n", 1032 pr_err("unsupported osd opcode %s\n",
960 ceph_osd_op_name(src->op)); 1033 ceph_osd_op_name(src->op));
@@ -1038,7 +1111,15 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
1038 if (flags & CEPH_OSD_FLAG_WRITE) 1111 if (flags & CEPH_OSD_FLAG_WRITE)
1039 req->r_data_offset = off; 1112 req->r_data_offset = off;
1040 1113
1041 r = ceph_osdc_alloc_messages(req, GFP_NOFS); 1114 if (num_ops > 1)
1115 /*
1116 * This is a special case for ceph_writepages_start(), but it
1117 * also covers ceph_uninline_data(). If more multi-op request
1118 * use cases emerge, we will need a separate helper.
1119 */
1120 r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0);
1121 else
1122 r = ceph_osdc_alloc_messages(req, GFP_NOFS);
1042 if (r) 1123 if (r)
1043 goto fail; 1124 goto fail;
1044 1125
@@ -1845,48 +1926,55 @@ static bool should_plug_request(struct ceph_osd_request *req)
1845 return true; 1926 return true;
1846} 1927}
1847 1928
1848static void setup_request_data(struct ceph_osd_request *req, 1929/*
1849 struct ceph_msg *msg) 1930 * Keep get_num_data_items() in sync with this function.
1931 */
1932static void setup_request_data(struct ceph_osd_request *req)
1850{ 1933{
1851 u32 data_len = 0; 1934 struct ceph_msg *request_msg = req->r_request;
1852 int i; 1935 struct ceph_msg *reply_msg = req->r_reply;
1936 struct ceph_osd_req_op *op;
1853 1937
1854 if (!list_empty(&msg->data)) 1938 if (req->r_request->num_data_items || req->r_reply->num_data_items)
1855 return; 1939 return;
1856 1940
1857 WARN_ON(msg->data_length); 1941 WARN_ON(request_msg->data_length || reply_msg->data_length);
1858 for (i = 0; i < req->r_num_ops; i++) { 1942 for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
1859 struct ceph_osd_req_op *op = &req->r_ops[i];
1860
1861 switch (op->op) { 1943 switch (op->op) {
1862 /* request */ 1944 /* request */
1863 case CEPH_OSD_OP_WRITE: 1945 case CEPH_OSD_OP_WRITE:
1864 case CEPH_OSD_OP_WRITEFULL: 1946 case CEPH_OSD_OP_WRITEFULL:
1865 WARN_ON(op->indata_len != op->extent.length); 1947 WARN_ON(op->indata_len != op->extent.length);
1866 ceph_osdc_msg_data_add(msg, &op->extent.osd_data); 1948 ceph_osdc_msg_data_add(request_msg,
1949 &op->extent.osd_data);
1867 break; 1950 break;
1868 case CEPH_OSD_OP_SETXATTR: 1951 case CEPH_OSD_OP_SETXATTR:
1869 case CEPH_OSD_OP_CMPXATTR: 1952 case CEPH_OSD_OP_CMPXATTR:
1870 WARN_ON(op->indata_len != op->xattr.name_len + 1953 WARN_ON(op->indata_len != op->xattr.name_len +
1871 op->xattr.value_len); 1954 op->xattr.value_len);
1872 ceph_osdc_msg_data_add(msg, &op->xattr.osd_data); 1955 ceph_osdc_msg_data_add(request_msg,
1956 &op->xattr.osd_data);
1873 break; 1957 break;
1874 case CEPH_OSD_OP_NOTIFY_ACK: 1958 case CEPH_OSD_OP_NOTIFY_ACK:
1875 ceph_osdc_msg_data_add(msg, 1959 ceph_osdc_msg_data_add(request_msg,
1876 &op->notify_ack.request_data); 1960 &op->notify_ack.request_data);
1877 break; 1961 break;
1962 case CEPH_OSD_OP_COPY_FROM:
1963 ceph_osdc_msg_data_add(request_msg,
1964 &op->copy_from.osd_data);
1965 break;
1878 1966
1879 /* reply */ 1967 /* reply */
1880 case CEPH_OSD_OP_STAT: 1968 case CEPH_OSD_OP_STAT:
1881 ceph_osdc_msg_data_add(req->r_reply, 1969 ceph_osdc_msg_data_add(reply_msg,
1882 &op->raw_data_in); 1970 &op->raw_data_in);
1883 break; 1971 break;
1884 case CEPH_OSD_OP_READ: 1972 case CEPH_OSD_OP_READ:
1885 ceph_osdc_msg_data_add(req->r_reply, 1973 ceph_osdc_msg_data_add(reply_msg,
1886 &op->extent.osd_data); 1974 &op->extent.osd_data);
1887 break; 1975 break;
1888 case CEPH_OSD_OP_LIST_WATCHERS: 1976 case CEPH_OSD_OP_LIST_WATCHERS:
1889 ceph_osdc_msg_data_add(req->r_reply, 1977 ceph_osdc_msg_data_add(reply_msg,
1890 &op->list_watchers.response_data); 1978 &op->list_watchers.response_data);
1891 break; 1979 break;
1892 1980
@@ -1895,25 +1983,23 @@ static void setup_request_data(struct ceph_osd_request *req,
1895 WARN_ON(op->indata_len != op->cls.class_len + 1983 WARN_ON(op->indata_len != op->cls.class_len +
1896 op->cls.method_len + 1984 op->cls.method_len +
1897 op->cls.indata_len); 1985 op->cls.indata_len);
1898 ceph_osdc_msg_data_add(msg, &op->cls.request_info); 1986 ceph_osdc_msg_data_add(request_msg,
1987 &op->cls.request_info);
1899 /* optional, can be NONE */ 1988 /* optional, can be NONE */
1900 ceph_osdc_msg_data_add(msg, &op->cls.request_data); 1989 ceph_osdc_msg_data_add(request_msg,
1990 &op->cls.request_data);
1901 /* optional, can be NONE */ 1991 /* optional, can be NONE */
1902 ceph_osdc_msg_data_add(req->r_reply, 1992 ceph_osdc_msg_data_add(reply_msg,
1903 &op->cls.response_data); 1993 &op->cls.response_data);
1904 break; 1994 break;
1905 case CEPH_OSD_OP_NOTIFY: 1995 case CEPH_OSD_OP_NOTIFY:
1906 ceph_osdc_msg_data_add(msg, 1996 ceph_osdc_msg_data_add(request_msg,
1907 &op->notify.request_data); 1997 &op->notify.request_data);
1908 ceph_osdc_msg_data_add(req->r_reply, 1998 ceph_osdc_msg_data_add(reply_msg,
1909 &op->notify.response_data); 1999 &op->notify.response_data);
1910 break; 2000 break;
1911 } 2001 }
1912
1913 data_len += op->indata_len;
1914 } 2002 }
1915
1916 WARN_ON(data_len != msg->data_length);
1917} 2003}
1918 2004
1919static void encode_pgid(void **p, const struct ceph_pg *pgid) 2005static void encode_pgid(void **p, const struct ceph_pg *pgid)
@@ -1961,7 +2047,7 @@ static void encode_request_partial(struct ceph_osd_request *req,
1961 req->r_data_offset || req->r_snapc); 2047 req->r_data_offset || req->r_snapc);
1962 } 2048 }
1963 2049
1964 setup_request_data(req, msg); 2050 setup_request_data(req);
1965 2051
1966 encode_spgid(&p, &req->r_t.spgid); /* actual spg */ 2052 encode_spgid(&p, &req->r_t.spgid); /* actual spg */
1967 ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */ 2053 ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
@@ -3001,11 +3087,21 @@ static void linger_submit(struct ceph_osd_linger_request *lreq)
3001 struct ceph_osd_client *osdc = lreq->osdc; 3087 struct ceph_osd_client *osdc = lreq->osdc;
3002 struct ceph_osd *osd; 3088 struct ceph_osd *osd;
3003 3089
3090 down_write(&osdc->lock);
3091 linger_register(lreq);
3092 if (lreq->is_watch) {
3093 lreq->reg_req->r_ops[0].watch.cookie = lreq->linger_id;
3094 lreq->ping_req->r_ops[0].watch.cookie = lreq->linger_id;
3095 } else {
3096 lreq->reg_req->r_ops[0].notify.cookie = lreq->linger_id;
3097 }
3098
3004 calc_target(osdc, &lreq->t, NULL, false); 3099 calc_target(osdc, &lreq->t, NULL, false);
3005 osd = lookup_create_osd(osdc, lreq->t.osd, true); 3100 osd = lookup_create_osd(osdc, lreq->t.osd, true);
3006 link_linger(osd, lreq); 3101 link_linger(osd, lreq);
3007 3102
3008 send_linger(lreq); 3103 send_linger(lreq);
3104 up_write(&osdc->lock);
3009} 3105}
3010 3106
3011static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq) 3107static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
@@ -4318,9 +4414,7 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
4318 lreq->notify_id, notify_id); 4414 lreq->notify_id, notify_id);
4319 } else if (!completion_done(&lreq->notify_finish_wait)) { 4415 } else if (!completion_done(&lreq->notify_finish_wait)) {
4320 struct ceph_msg_data *data = 4416 struct ceph_msg_data *data =
4321 list_first_entry_or_null(&msg->data, 4417 msg->num_data_items ? &msg->data[0] : NULL;
4322 struct ceph_msg_data,
4323 links);
4324 4418
4325 if (data) { 4419 if (data) {
4326 if (lreq->preply_pages) { 4420 if (lreq->preply_pages) {
@@ -4476,6 +4570,23 @@ alloc_linger_request(struct ceph_osd_linger_request *lreq)
4476 4570
4477 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 4571 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
4478 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 4572 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
4573 return req;
4574}
4575
4576static struct ceph_osd_request *
4577alloc_watch_request(struct ceph_osd_linger_request *lreq, u8 watch_opcode)
4578{
4579 struct ceph_osd_request *req;
4580
4581 req = alloc_linger_request(lreq);
4582 if (!req)
4583 return NULL;
4584
4585 /*
4586 * Pass 0 for cookie because we don't know it yet, it will be
4587 * filled in by linger_submit().
4588 */
4589 osd_req_op_watch_init(req, 0, 0, watch_opcode);
4479 4590
4480 if (ceph_osdc_alloc_messages(req, GFP_NOIO)) { 4591 if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
4481 ceph_osdc_put_request(req); 4592 ceph_osdc_put_request(req);
@@ -4514,27 +4625,19 @@ ceph_osdc_watch(struct ceph_osd_client *osdc,
4514 lreq->t.flags = CEPH_OSD_FLAG_WRITE; 4625 lreq->t.flags = CEPH_OSD_FLAG_WRITE;
4515 ktime_get_real_ts64(&lreq->mtime); 4626 ktime_get_real_ts64(&lreq->mtime);
4516 4627
4517 lreq->reg_req = alloc_linger_request(lreq); 4628 lreq->reg_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_WATCH);
4518 if (!lreq->reg_req) { 4629 if (!lreq->reg_req) {
4519 ret = -ENOMEM; 4630 ret = -ENOMEM;
4520 goto err_put_lreq; 4631 goto err_put_lreq;
4521 } 4632 }
4522 4633
4523 lreq->ping_req = alloc_linger_request(lreq); 4634 lreq->ping_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_PING);
4524 if (!lreq->ping_req) { 4635 if (!lreq->ping_req) {
4525 ret = -ENOMEM; 4636 ret = -ENOMEM;
4526 goto err_put_lreq; 4637 goto err_put_lreq;
4527 } 4638 }
4528 4639
4529 down_write(&osdc->lock);
4530 linger_register(lreq); /* before osd_req_op_* */
4531 osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
4532 CEPH_OSD_WATCH_OP_WATCH);
4533 osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
4534 CEPH_OSD_WATCH_OP_PING);
4535 linger_submit(lreq); 4640 linger_submit(lreq);
4536 up_write(&osdc->lock);
4537
4538 ret = linger_reg_commit_wait(lreq); 4641 ret = linger_reg_commit_wait(lreq);
4539 if (ret) { 4642 if (ret) {
4540 linger_cancel(lreq); 4643 linger_cancel(lreq);
@@ -4599,11 +4702,10 @@ static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
4599 4702
4600 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0); 4703 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
4601 4704
4602 pl = kmalloc(sizeof(*pl), GFP_NOIO); 4705 pl = ceph_pagelist_alloc(GFP_NOIO);
4603 if (!pl) 4706 if (!pl)
4604 return -ENOMEM; 4707 return -ENOMEM;
4605 4708
4606 ceph_pagelist_init(pl);
4607 ret = ceph_pagelist_encode_64(pl, notify_id); 4709 ret = ceph_pagelist_encode_64(pl, notify_id);
4608 ret |= ceph_pagelist_encode_64(pl, cookie); 4710 ret |= ceph_pagelist_encode_64(pl, cookie);
4609 if (payload) { 4711 if (payload) {
@@ -4641,12 +4743,12 @@ int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
4641 ceph_oloc_copy(&req->r_base_oloc, oloc); 4743 ceph_oloc_copy(&req->r_base_oloc, oloc);
4642 req->r_flags = CEPH_OSD_FLAG_READ; 4744 req->r_flags = CEPH_OSD_FLAG_READ;
4643 4745
4644 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4746 ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
4747 payload_len);
4645 if (ret) 4748 if (ret)
4646 goto out_put_req; 4749 goto out_put_req;
4647 4750
4648 ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, 4751 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4649 payload_len);
4650 if (ret) 4752 if (ret)
4651 goto out_put_req; 4753 goto out_put_req;
4652 4754
@@ -4670,11 +4772,10 @@ static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
4670 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0); 4772 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
4671 op->notify.cookie = cookie; 4773 op->notify.cookie = cookie;
4672 4774
4673 pl = kmalloc(sizeof(*pl), GFP_NOIO); 4775 pl = ceph_pagelist_alloc(GFP_NOIO);
4674 if (!pl) 4776 if (!pl)
4675 return -ENOMEM; 4777 return -ENOMEM;
4676 4778
4677 ceph_pagelist_init(pl);
4678 ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */ 4779 ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
4679 ret |= ceph_pagelist_encode_32(pl, timeout); 4780 ret |= ceph_pagelist_encode_32(pl, timeout);
4680 ret |= ceph_pagelist_encode_32(pl, payload_len); 4781 ret |= ceph_pagelist_encode_32(pl, payload_len);
@@ -4733,29 +4834,30 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
4733 goto out_put_lreq; 4834 goto out_put_lreq;
4734 } 4835 }
4735 4836
4837 /*
4838 * Pass 0 for cookie because we don't know it yet, it will be
4839 * filled in by linger_submit().
4840 */
4841 ret = osd_req_op_notify_init(lreq->reg_req, 0, 0, 1, timeout,
4842 payload, payload_len);
4843 if (ret)
4844 goto out_put_lreq;
4845
4736 /* for notify_id */ 4846 /* for notify_id */
4737 pages = ceph_alloc_page_vector(1, GFP_NOIO); 4847 pages = ceph_alloc_page_vector(1, GFP_NOIO);
4738 if (IS_ERR(pages)) { 4848 if (IS_ERR(pages)) {
4739 ret = PTR_ERR(pages); 4849 ret = PTR_ERR(pages);
4740 goto out_put_lreq; 4850 goto out_put_lreq;
4741 } 4851 }
4742
4743 down_write(&osdc->lock);
4744 linger_register(lreq); /* before osd_req_op_* */
4745 ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
4746 timeout, payload, payload_len);
4747 if (ret) {
4748 linger_unregister(lreq);
4749 up_write(&osdc->lock);
4750 ceph_release_page_vector(pages, 1);
4751 goto out_put_lreq;
4752 }
4753 ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify, 4852 ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
4754 response_data), 4853 response_data),
4755 pages, PAGE_SIZE, 0, false, true); 4854 pages, PAGE_SIZE, 0, false, true);
4756 linger_submit(lreq);
4757 up_write(&osdc->lock);
4758 4855
4856 ret = ceph_osdc_alloc_messages(lreq->reg_req, GFP_NOIO);
4857 if (ret)
4858 goto out_put_lreq;
4859
4860 linger_submit(lreq);
4759 ret = linger_reg_commit_wait(lreq); 4861 ret = linger_reg_commit_wait(lreq);
4760 if (!ret) 4862 if (!ret)
4761 ret = linger_notify_finish_wait(lreq); 4863 ret = linger_notify_finish_wait(lreq);
@@ -4881,10 +4983,6 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
4881 ceph_oloc_copy(&req->r_base_oloc, oloc); 4983 ceph_oloc_copy(&req->r_base_oloc, oloc);
4882 req->r_flags = CEPH_OSD_FLAG_READ; 4984 req->r_flags = CEPH_OSD_FLAG_READ;
4883 4985
4884 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4885 if (ret)
4886 goto out_put_req;
4887
4888 pages = ceph_alloc_page_vector(1, GFP_NOIO); 4986 pages = ceph_alloc_page_vector(1, GFP_NOIO);
4889 if (IS_ERR(pages)) { 4987 if (IS_ERR(pages)) {
4890 ret = PTR_ERR(pages); 4988 ret = PTR_ERR(pages);
@@ -4896,6 +4994,10 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
4896 response_data), 4994 response_data),
4897 pages, PAGE_SIZE, 0, false, true); 4995 pages, PAGE_SIZE, 0, false, true);
4898 4996
4997 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4998 if (ret)
4999 goto out_put_req;
5000
4899 ceph_osdc_start_request(osdc, req, false); 5001 ceph_osdc_start_request(osdc, req, false);
4900 ret = ceph_osdc_wait_request(osdc, req); 5002 ret = ceph_osdc_wait_request(osdc, req);
4901 if (ret >= 0) { 5003 if (ret >= 0) {
@@ -4958,11 +5060,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
4958 ceph_oloc_copy(&req->r_base_oloc, oloc); 5060 ceph_oloc_copy(&req->r_base_oloc, oloc);
4959 req->r_flags = flags; 5061 req->r_flags = flags;
4960 5062
4961 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 5063 ret = osd_req_op_cls_init(req, 0, class, method);
4962 if (ret)
4963 goto out_put_req;
4964
4965 ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
4966 if (ret) 5064 if (ret)
4967 goto out_put_req; 5065 goto out_put_req;
4968 5066
@@ -4973,6 +5071,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
4973 osd_req_op_cls_response_data_pages(req, 0, &resp_page, 5071 osd_req_op_cls_response_data_pages(req, 0, &resp_page,
4974 *resp_len, 0, false, false); 5072 *resp_len, 0, false, false);
4975 5073
5074 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
5075 if (ret)
5076 goto out_put_req;
5077
4976 ceph_osdc_start_request(osdc, req, false); 5078 ceph_osdc_start_request(osdc, req, false);
4977 ret = ceph_osdc_wait_request(osdc, req); 5079 ret = ceph_osdc_wait_request(osdc, req);
4978 if (ret >= 0) { 5080 if (ret >= 0) {
@@ -5021,11 +5123,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
5021 goto out_map; 5123 goto out_map;
5022 5124
5023 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 5125 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
5024 PAGE_SIZE, 10, true, "osd_op"); 5126 PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op");
5025 if (err < 0) 5127 if (err < 0)
5026 goto out_mempool; 5128 goto out_mempool;
5027 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 5129 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
5028 PAGE_SIZE, 10, true, "osd_op_reply"); 5130 PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10,
5131 "osd_op_reply");
5029 if (err < 0) 5132 if (err < 0)
5030 goto out_msgpool; 5133 goto out_msgpool;
5031 5134
@@ -5168,6 +5271,80 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
5168} 5271}
5169EXPORT_SYMBOL(ceph_osdc_writepages); 5272EXPORT_SYMBOL(ceph_osdc_writepages);
5170 5273
5274static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
5275 u64 src_snapid, u64 src_version,
5276 struct ceph_object_id *src_oid,
5277 struct ceph_object_locator *src_oloc,
5278 u32 src_fadvise_flags,
5279 u32 dst_fadvise_flags,
5280 u8 copy_from_flags)
5281{
5282 struct ceph_osd_req_op *op;
5283 struct page **pages;
5284 void *p, *end;
5285
5286 pages = ceph_alloc_page_vector(1, GFP_KERNEL);
5287 if (IS_ERR(pages))
5288 return PTR_ERR(pages);
5289
5290 op = _osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM, dst_fadvise_flags);
5291 op->copy_from.snapid = src_snapid;
5292 op->copy_from.src_version = src_version;
5293 op->copy_from.flags = copy_from_flags;
5294 op->copy_from.src_fadvise_flags = src_fadvise_flags;
5295
5296 p = page_address(pages[0]);
5297 end = p + PAGE_SIZE;
5298 ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
5299 encode_oloc(&p, end, src_oloc);
5300 op->indata_len = PAGE_SIZE - (end - p);
5301
5302 ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
5303 op->indata_len, 0, false, true);
5304 return 0;
5305}
5306
5307int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
5308 u64 src_snapid, u64 src_version,
5309 struct ceph_object_id *src_oid,
5310 struct ceph_object_locator *src_oloc,
5311 u32 src_fadvise_flags,
5312 struct ceph_object_id *dst_oid,
5313 struct ceph_object_locator *dst_oloc,
5314 u32 dst_fadvise_flags,
5315 u8 copy_from_flags)
5316{
5317 struct ceph_osd_request *req;
5318 int ret;
5319
5320 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
5321 if (!req)
5322 return -ENOMEM;
5323
5324 req->r_flags = CEPH_OSD_FLAG_WRITE;
5325
5326 ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc);
5327 ceph_oid_copy(&req->r_t.base_oid, dst_oid);
5328
5329 ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid,
5330 src_oloc, src_fadvise_flags,
5331 dst_fadvise_flags, copy_from_flags);
5332 if (ret)
5333 goto out;
5334
5335 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
5336 if (ret)
5337 goto out;
5338
5339 ceph_osdc_start_request(osdc, req, false);
5340 ret = ceph_osdc_wait_request(osdc, req);
5341
5342out:
5343 ceph_osdc_put_request(req);
5344 return ret;
5345}
5346EXPORT_SYMBOL(ceph_osdc_copy_from);
5347
5171int __init ceph_osdc_setup(void) 5348int __init ceph_osdc_setup(void)
5172{ 5349{
5173 size_t size = sizeof(struct ceph_osd_request) + 5350 size_t size = sizeof(struct ceph_osd_request) +
@@ -5295,7 +5472,7 @@ static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
5295 u32 front_len = le32_to_cpu(hdr->front_len); 5472 u32 front_len = le32_to_cpu(hdr->front_len);
5296 u32 data_len = le32_to_cpu(hdr->data_len); 5473 u32 data_len = le32_to_cpu(hdr->data_len);
5297 5474
5298 m = ceph_msg_new(type, front_len, GFP_NOIO, false); 5475 m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false);
5299 if (!m) 5476 if (!m)
5300 return NULL; 5477 return NULL;
5301 5478
diff --git a/net/ceph/pagelist.c b/net/ceph/pagelist.c
index 2ea0564771d2..65e34f78b05d 100644
--- a/net/ceph/pagelist.c
+++ b/net/ceph/pagelist.c
@@ -6,6 +6,26 @@
6#include <linux/highmem.h> 6#include <linux/highmem.h>
7#include <linux/ceph/pagelist.h> 7#include <linux/ceph/pagelist.h>
8 8
9struct ceph_pagelist *ceph_pagelist_alloc(gfp_t gfp_flags)
10{
11 struct ceph_pagelist *pl;
12
13 pl = kmalloc(sizeof(*pl), gfp_flags);
14 if (!pl)
15 return NULL;
16
17 INIT_LIST_HEAD(&pl->head);
18 pl->mapped_tail = NULL;
19 pl->length = 0;
20 pl->room = 0;
21 INIT_LIST_HEAD(&pl->free_list);
22 pl->num_pages_free = 0;
23 refcount_set(&pl->refcnt, 1);
24
25 return pl;
26}
27EXPORT_SYMBOL(ceph_pagelist_alloc);
28
9static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl) 29static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl)
10{ 30{
11 if (pl->mapped_tail) { 31 if (pl->mapped_tail) {