aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/osd_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r--net/ceph/osd_client.c206
1 files changed, 72 insertions, 134 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index eb9a44478764..500ae8b49321 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -32,52 +32,43 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
32static void __send_request(struct ceph_osd_client *osdc, 32static void __send_request(struct ceph_osd_client *osdc,
33 struct ceph_osd_request *req); 33 struct ceph_osd_request *req);
34 34
35static int op_needs_trail(int op)
36{
37 switch (op) {
38 case CEPH_OSD_OP_GETXATTR:
39 case CEPH_OSD_OP_SETXATTR:
40 case CEPH_OSD_OP_CMPXATTR:
41 case CEPH_OSD_OP_CALL:
42 case CEPH_OSD_OP_NOTIFY:
43 return 1;
44 default:
45 return 0;
46 }
47}
48
49static int op_has_extent(int op) 35static int op_has_extent(int op)
50{ 36{
51 return (op == CEPH_OSD_OP_READ || 37 return (op == CEPH_OSD_OP_READ ||
52 op == CEPH_OSD_OP_WRITE); 38 op == CEPH_OSD_OP_WRITE);
53} 39}
54 40
55int ceph_calc_raw_layout(struct ceph_osd_client *osdc, 41int ceph_calc_raw_layout(struct ceph_file_layout *layout,
56 struct ceph_file_layout *layout,
57 u64 snapid,
58 u64 off, u64 *plen, u64 *bno, 42 u64 off, u64 *plen, u64 *bno,
59 struct ceph_osd_request *req, 43 struct ceph_osd_request *req,
60 struct ceph_osd_req_op *op) 44 struct ceph_osd_req_op *op)
61{ 45{
62 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
63 u64 orig_len = *plen; 46 u64 orig_len = *plen;
64 u64 objoff, objlen; /* extent in object */ 47 u64 objoff, objlen; /* extent in object */
65 int r; 48 int r;
66 49
67 reqhead->snapid = cpu_to_le64(snapid);
68
69 /* object extent? */ 50 /* object extent? */
70 r = ceph_calc_file_object_mapping(layout, off, plen, bno, 51 r = ceph_calc_file_object_mapping(layout, off, orig_len, bno,
71 &objoff, &objlen); 52 &objoff, &objlen);
72 if (r < 0) 53 if (r < 0)
73 return r; 54 return r;
74 if (*plen < orig_len) 55 if (objlen < orig_len) {
56 *plen = objlen;
75 dout(" skipping last %llu, final file extent %llu~%llu\n", 57 dout(" skipping last %llu, final file extent %llu~%llu\n",
76 orig_len - *plen, off, *plen); 58 orig_len - *plen, off, *plen);
59 }
77 60
78 if (op_has_extent(op->op)) { 61 if (op_has_extent(op->op)) {
62 u32 osize = le32_to_cpu(layout->fl_object_size);
79 op->extent.offset = objoff; 63 op->extent.offset = objoff;
80 op->extent.length = objlen; 64 op->extent.length = objlen;
65 if (op->extent.truncate_size <= off - objoff) {
66 op->extent.truncate_size = 0;
67 } else {
68 op->extent.truncate_size -= off - objoff;
69 if (op->extent.truncate_size > osize)
70 op->extent.truncate_size = osize;
71 }
81 } 72 }
82 req->r_num_pages = calc_pages_for(off, *plen); 73 req->r_num_pages = calc_pages_for(off, *plen);
83 req->r_page_alignment = off & ~PAGE_MASK; 74 req->r_page_alignment = off & ~PAGE_MASK;
@@ -115,8 +106,7 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
115 * 106 *
116 * fill osd op in request message. 107 * fill osd op in request message.
117 */ 108 */
118static int calc_layout(struct ceph_osd_client *osdc, 109static int calc_layout(struct ceph_vino vino,
119 struct ceph_vino vino,
120 struct ceph_file_layout *layout, 110 struct ceph_file_layout *layout,
121 u64 off, u64 *plen, 111 u64 off, u64 *plen,
122 struct ceph_osd_request *req, 112 struct ceph_osd_request *req,
@@ -125,8 +115,7 @@ static int calc_layout(struct ceph_osd_client *osdc,
125 u64 bno; 115 u64 bno;
126 int r; 116 int r;
127 117
128 r = ceph_calc_raw_layout(osdc, layout, vino.snap, off, 118 r = ceph_calc_raw_layout(layout, off, plen, &bno, req, op);
129 plen, &bno, req, op);
130 if (r < 0) 119 if (r < 0)
131 return r; 120 return r;
132 121
@@ -163,10 +152,7 @@ void ceph_osdc_release_request(struct kref *kref)
163 bio_put(req->r_bio); 152 bio_put(req->r_bio);
164#endif 153#endif
165 ceph_put_snap_context(req->r_snapc); 154 ceph_put_snap_context(req->r_snapc);
166 if (req->r_trail) { 155 ceph_pagelist_release(&req->r_trail);
167 ceph_pagelist_release(req->r_trail);
168 kfree(req->r_trail);
169 }
170 if (req->r_mempool) 156 if (req->r_mempool)
171 mempool_free(req, req->r_osdc->req_mempool); 157 mempool_free(req, req->r_osdc->req_mempool);
172 else 158 else
@@ -174,34 +160,14 @@ void ceph_osdc_release_request(struct kref *kref)
174} 160}
175EXPORT_SYMBOL(ceph_osdc_release_request); 161EXPORT_SYMBOL(ceph_osdc_release_request);
176 162
177static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
178{
179 int i = 0;
180
181 if (needs_trail)
182 *needs_trail = 0;
183 while (ops[i].op) {
184 if (needs_trail && op_needs_trail(ops[i].op))
185 *needs_trail = 1;
186 i++;
187 }
188
189 return i;
190}
191
192struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 163struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
193 int flags,
194 struct ceph_snap_context *snapc, 164 struct ceph_snap_context *snapc,
195 struct ceph_osd_req_op *ops, 165 unsigned int num_op,
196 bool use_mempool, 166 bool use_mempool,
197 gfp_t gfp_flags, 167 gfp_t gfp_flags)
198 struct page **pages,
199 struct bio *bio)
200{ 168{
201 struct ceph_osd_request *req; 169 struct ceph_osd_request *req;
202 struct ceph_msg *msg; 170 struct ceph_msg *msg;
203 int needs_trail;
204 int num_op = get_num_ops(ops, &needs_trail);
205 size_t msg_size = sizeof(struct ceph_osd_request_head); 171 size_t msg_size = sizeof(struct ceph_osd_request_head);
206 172
207 msg_size += num_op*sizeof(struct ceph_osd_op); 173 msg_size += num_op*sizeof(struct ceph_osd_op);
@@ -228,10 +194,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
228 INIT_LIST_HEAD(&req->r_req_lru_item); 194 INIT_LIST_HEAD(&req->r_req_lru_item);
229 INIT_LIST_HEAD(&req->r_osd_item); 195 INIT_LIST_HEAD(&req->r_osd_item);
230 196
231 req->r_flags = flags;
232
233 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
234
235 /* create reply message */ 197 /* create reply message */
236 if (use_mempool) 198 if (use_mempool)
237 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 199 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -244,15 +206,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
244 } 206 }
245 req->r_reply = msg; 207 req->r_reply = msg;
246 208
247 /* allocate space for the trailing data */ 209 ceph_pagelist_init(&req->r_trail);
248 if (needs_trail) {
249 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
250 if (!req->r_trail) {
251 ceph_osdc_put_request(req);
252 return NULL;
253 }
254 ceph_pagelist_init(req->r_trail);
255 }
256 210
257 /* create request message; allow space for oid */ 211 /* create request message; allow space for oid */
258 msg_size += MAX_OBJ_NAME_SIZE; 212 msg_size += MAX_OBJ_NAME_SIZE;
@@ -270,13 +224,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
270 memset(msg->front.iov_base, 0, msg->front.iov_len); 224 memset(msg->front.iov_base, 0, msg->front.iov_len);
271 225
272 req->r_request = msg; 226 req->r_request = msg;
273 req->r_pages = pages;
274#ifdef CONFIG_BLOCK
275 if (bio) {
276 req->r_bio = bio;
277 bio_get(req->r_bio);
278 }
279#endif
280 227
281 return req; 228 return req;
282} 229}
@@ -304,29 +251,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
304 case CEPH_OSD_OP_GETXATTR: 251 case CEPH_OSD_OP_GETXATTR:
305 case CEPH_OSD_OP_SETXATTR: 252 case CEPH_OSD_OP_SETXATTR:
306 case CEPH_OSD_OP_CMPXATTR: 253 case CEPH_OSD_OP_CMPXATTR:
307 BUG_ON(!req->r_trail);
308
309 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); 254 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
310 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 255 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
311 dst->xattr.cmp_op = src->xattr.cmp_op; 256 dst->xattr.cmp_op = src->xattr.cmp_op;
312 dst->xattr.cmp_mode = src->xattr.cmp_mode; 257 dst->xattr.cmp_mode = src->xattr.cmp_mode;
313 ceph_pagelist_append(req->r_trail, src->xattr.name, 258 ceph_pagelist_append(&req->r_trail, src->xattr.name,
314 src->xattr.name_len); 259 src->xattr.name_len);
315 ceph_pagelist_append(req->r_trail, src->xattr.val, 260 ceph_pagelist_append(&req->r_trail, src->xattr.val,
316 src->xattr.value_len); 261 src->xattr.value_len);
317 break; 262 break;
318 case CEPH_OSD_OP_CALL: 263 case CEPH_OSD_OP_CALL:
319 BUG_ON(!req->r_trail);
320
321 dst->cls.class_len = src->cls.class_len; 264 dst->cls.class_len = src->cls.class_len;
322 dst->cls.method_len = src->cls.method_len; 265 dst->cls.method_len = src->cls.method_len;
323 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 266 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
324 267
325 ceph_pagelist_append(req->r_trail, src->cls.class_name, 268 ceph_pagelist_append(&req->r_trail, src->cls.class_name,
326 src->cls.class_len); 269 src->cls.class_len);
327 ceph_pagelist_append(req->r_trail, src->cls.method_name, 270 ceph_pagelist_append(&req->r_trail, src->cls.method_name,
328 src->cls.method_len); 271 src->cls.method_len);
329 ceph_pagelist_append(req->r_trail, src->cls.indata, 272 ceph_pagelist_append(&req->r_trail, src->cls.indata,
330 src->cls.indata_len); 273 src->cls.indata_len);
331 break; 274 break;
332 case CEPH_OSD_OP_ROLLBACK: 275 case CEPH_OSD_OP_ROLLBACK:
@@ -339,11 +282,9 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
339 __le32 prot_ver = cpu_to_le32(src->watch.prot_ver); 282 __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
340 __le32 timeout = cpu_to_le32(src->watch.timeout); 283 __le32 timeout = cpu_to_le32(src->watch.timeout);
341 284
342 BUG_ON(!req->r_trail); 285 ceph_pagelist_append(&req->r_trail,
343
344 ceph_pagelist_append(req->r_trail,
345 &prot_ver, sizeof(prot_ver)); 286 &prot_ver, sizeof(prot_ver));
346 ceph_pagelist_append(req->r_trail, 287 ceph_pagelist_append(&req->r_trail,
347 &timeout, sizeof(timeout)); 288 &timeout, sizeof(timeout));
348 } 289 }
349 case CEPH_OSD_OP_NOTIFY_ACK: 290 case CEPH_OSD_OP_NOTIFY_ACK:
@@ -365,25 +306,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
365 * 306 *
366 */ 307 */
367void ceph_osdc_build_request(struct ceph_osd_request *req, 308void ceph_osdc_build_request(struct ceph_osd_request *req,
368 u64 off, u64 *plen, 309 u64 off, u64 len, unsigned int num_op,
369 struct ceph_osd_req_op *src_ops, 310 struct ceph_osd_req_op *src_ops,
370 struct ceph_snap_context *snapc, 311 struct ceph_snap_context *snapc, u64 snap_id,
371 struct timespec *mtime, 312 struct timespec *mtime)
372 const char *oid,
373 int oid_len)
374{ 313{
375 struct ceph_msg *msg = req->r_request; 314 struct ceph_msg *msg = req->r_request;
376 struct ceph_osd_request_head *head; 315 struct ceph_osd_request_head *head;
377 struct ceph_osd_req_op *src_op; 316 struct ceph_osd_req_op *src_op;
378 struct ceph_osd_op *op; 317 struct ceph_osd_op *op;
379 void *p; 318 void *p;
380 int num_op = get_num_ops(src_ops, NULL);
381 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 319 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
382 int flags = req->r_flags; 320 int flags = req->r_flags;
383 u64 data_len = 0; 321 u64 data_len = 0;
384 int i; 322 int i;
385 323
324 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
325
386 head = msg->front.iov_base; 326 head = msg->front.iov_base;
327 head->snapid = cpu_to_le64(snap_id);
387 op = (void *)(head + 1); 328 op = (void *)(head + 1);
388 p = (void *)(op + num_op); 329 p = (void *)(op + num_op);
389 330
@@ -393,23 +334,19 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
393 head->flags = cpu_to_le32(flags); 334 head->flags = cpu_to_le32(flags);
394 if (flags & CEPH_OSD_FLAG_WRITE) 335 if (flags & CEPH_OSD_FLAG_WRITE)
395 ceph_encode_timespec(&head->mtime, mtime); 336 ceph_encode_timespec(&head->mtime, mtime);
337 BUG_ON(num_op > (unsigned int) ((u16) -1));
396 head->num_ops = cpu_to_le16(num_op); 338 head->num_ops = cpu_to_le16(num_op);
397 339
398
399 /* fill in oid */ 340 /* fill in oid */
400 head->object_len = cpu_to_le32(oid_len); 341 head->object_len = cpu_to_le32(req->r_oid_len);
401 memcpy(p, oid, oid_len); 342 memcpy(p, req->r_oid, req->r_oid_len);
402 p += oid_len; 343 p += req->r_oid_len;
403 344
404 src_op = src_ops; 345 src_op = src_ops;
405 while (src_op->op) { 346 while (num_op--)
406 osd_req_encode_op(req, op, src_op); 347 osd_req_encode_op(req, op++, src_op++);
407 src_op++;
408 op++;
409 }
410 348
411 if (req->r_trail) 349 data_len += req->r_trail.length;
412 data_len += req->r_trail->length;
413 350
414 if (snapc) { 351 if (snapc) {
415 head->snap_seq = cpu_to_le64(snapc->seq); 352 head->snap_seq = cpu_to_le64(snapc->seq);
@@ -422,7 +359,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
422 359
423 if (flags & CEPH_OSD_FLAG_WRITE) { 360 if (flags & CEPH_OSD_FLAG_WRITE) {
424 req->r_request->hdr.data_off = cpu_to_le16(off); 361 req->r_request->hdr.data_off = cpu_to_le16(off);
425 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len); 362 req->r_request->hdr.data_len = cpu_to_le32(len + data_len);
426 } else if (data_len) { 363 } else if (data_len) {
427 req->r_request->hdr.data_off = 0; 364 req->r_request->hdr.data_off = 0;
428 req->r_request->hdr.data_len = cpu_to_le32(data_len); 365 req->r_request->hdr.data_len = cpu_to_le32(data_len);
@@ -462,31 +399,30 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
462 bool use_mempool, int num_reply, 399 bool use_mempool, int num_reply,
463 int page_align) 400 int page_align)
464{ 401{
465 struct ceph_osd_req_op ops[3]; 402 struct ceph_osd_req_op ops[2];
466 struct ceph_osd_request *req; 403 struct ceph_osd_request *req;
404 unsigned int num_op = 1;
467 int r; 405 int r;
468 406
407 memset(&ops, 0, sizeof ops);
408
469 ops[0].op = opcode; 409 ops[0].op = opcode;
470 ops[0].extent.truncate_seq = truncate_seq; 410 ops[0].extent.truncate_seq = truncate_seq;
471 ops[0].extent.truncate_size = truncate_size; 411 ops[0].extent.truncate_size = truncate_size;
472 ops[0].payload_len = 0;
473 412
474 if (do_sync) { 413 if (do_sync) {
475 ops[1].op = CEPH_OSD_OP_STARTSYNC; 414 ops[1].op = CEPH_OSD_OP_STARTSYNC;
476 ops[1].payload_len = 0; 415 num_op++;
477 ops[2].op = 0; 416 }
478 } else 417
479 ops[1].op = 0; 418 req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
480 419 GFP_NOFS);
481 req = ceph_osdc_alloc_request(osdc, flags,
482 snapc, ops,
483 use_mempool,
484 GFP_NOFS, NULL, NULL);
485 if (!req) 420 if (!req)
486 return ERR_PTR(-ENOMEM); 421 return ERR_PTR(-ENOMEM);
422 req->r_flags = flags;
487 423
488 /* calculate max write size */ 424 /* calculate max write size */
489 r = calc_layout(osdc, vino, layout, off, plen, req, ops); 425 r = calc_layout(vino, layout, off, plen, req, ops);
490 if (r < 0) 426 if (r < 0)
491 return ERR_PTR(r); 427 return ERR_PTR(r);
492 req->r_file_layout = *layout; /* keep a copy */ 428 req->r_file_layout = *layout; /* keep a copy */
@@ -496,10 +432,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
496 req->r_num_pages = calc_pages_for(page_align, *plen); 432 req->r_num_pages = calc_pages_for(page_align, *plen);
497 req->r_page_alignment = page_align; 433 req->r_page_alignment = page_align;
498 434
499 ceph_osdc_build_request(req, off, plen, ops, 435 ceph_osdc_build_request(req, off, *plen, num_op, ops,
500 snapc, 436 snapc, vino.snap, mtime);
501 mtime,
502 req->r_oid, req->r_oid_len);
503 437
504 return req; 438 return req;
505} 439}
@@ -739,31 +673,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
739 */ 673 */
740static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 674static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
741{ 675{
742 struct ceph_osd_request *req; 676 struct ceph_entity_addr *peer_addr;
743 int ret = 0;
744 677
745 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 678 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
746 if (list_empty(&osd->o_requests) && 679 if (list_empty(&osd->o_requests) &&
747 list_empty(&osd->o_linger_requests)) { 680 list_empty(&osd->o_linger_requests)) {
748 __remove_osd(osdc, osd); 681 __remove_osd(osdc, osd);
749 ret = -ENODEV; 682
750 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 683 return -ENODEV;
751 &osd->o_con.peer_addr, 684 }
752 sizeof(osd->o_con.peer_addr)) == 0 && 685
753 !ceph_con_opened(&osd->o_con)) { 686 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
687 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
688 !ceph_con_opened(&osd->o_con)) {
689 struct ceph_osd_request *req;
690
754 dout(" osd addr hasn't changed and connection never opened," 691 dout(" osd addr hasn't changed and connection never opened,"
755 " letting msgr retry"); 692 " letting msgr retry");
756 /* touch each r_stamp for handle_timeout()'s benfit */ 693 /* touch each r_stamp for handle_timeout()'s benfit */
757 list_for_each_entry(req, &osd->o_requests, r_osd_item) 694 list_for_each_entry(req, &osd->o_requests, r_osd_item)
758 req->r_stamp = jiffies; 695 req->r_stamp = jiffies;
759 ret = -EAGAIN; 696
760 } else { 697 return -EAGAIN;
761 ceph_con_close(&osd->o_con);
762 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
763 &osdc->osdmap->osd_addr[osd->o_osd]);
764 osd->o_incarnation++;
765 } 698 }
766 return ret; 699
700 ceph_con_close(&osd->o_con);
701 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
702 osd->o_incarnation++;
703
704 return 0;
767} 705}
768 706
769static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 707static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -1706,7 +1644,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1706#ifdef CONFIG_BLOCK 1644#ifdef CONFIG_BLOCK
1707 req->r_request->bio = req->r_bio; 1645 req->r_request->bio = req->r_bio;
1708#endif 1646#endif
1709 req->r_request->trail = req->r_trail; 1647 req->r_request->trail = &req->r_trail;
1710 1648
1711 register_request(osdc, req); 1649 register_request(osdc, req);
1712 1650