aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/osd_client.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2013-05-06 16:11:19 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2013-05-06 16:11:19 -0400
commit91f8575685e35f3bd021286bc82d26397458f5a9 (patch)
tree09de8d889758a12071adb9427ed741e27c907aa6 /net/ceph/osd_client.c
parent2e378f3eebd28feefbb1f9953834a5a19482f053 (diff)
parentb5b09be30cf99f9c699e825629f02e3bce555d44 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph changes from Alex Elder: "This is a big pull. Most of it is culmination of Alex's work to implement RBD image layering, which is now complete (yay!). There is also some work from Yan to fix i_mutex behavior surrounding writes in cephfs, a sync write fix, a fix for RBD images that get resized while they are mapped, and a few patches from me that resolve annoying auth warnings and fix several bugs in the ceph auth code." * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (254 commits) rbd: fix image request leak on parent read libceph: use slab cache for osd client requests libceph: allocate ceph message data with a slab allocator libceph: allocate ceph messages with a slab allocator rbd: allocate image object names with a slab allocator rbd: allocate object requests with a slab allocator rbd: allocate name separate from obj_request rbd: allocate image requests with a slab allocator rbd: use binary search for snapshot lookup rbd: clear EXISTS flag if mapped snapshot disappears rbd: kill off the snapshot list rbd: define rbd_snap_size() and rbd_snap_features() rbd: use snap_id not index to look up snap info rbd: look up snapshot name in names buffer rbd: drop obj_request->version rbd: drop rbd_obj_method_sync() version parameter rbd: more version parameter removal rbd: get rid of some version parameters rbd: stop tracking header object version rbd: snap names are pointer to constant data ...
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r--net/ceph/osd_client.c1087
1 files changed, 734 insertions, 353 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index d730dd4d8eb2..a3395fdfbd4f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1,3 +1,4 @@
1
1#include <linux/ceph/ceph_debug.h> 2#include <linux/ceph/ceph_debug.h>
2 3
3#include <linux/module.h> 4#include <linux/module.h>
@@ -21,6 +22,8 @@
21#define OSD_OP_FRONT_LEN 4096 22#define OSD_OP_FRONT_LEN 4096
22#define OSD_OPREPLY_FRONT_LEN 512 23#define OSD_OPREPLY_FRONT_LEN 512
23 24
25static struct kmem_cache *ceph_osd_request_cache;
26
24static const struct ceph_connection_operations osd_con_ops; 27static const struct ceph_connection_operations osd_con_ops;
25 28
26static void __send_queued(struct ceph_osd_client *osdc); 29static void __send_queued(struct ceph_osd_client *osdc);
@@ -32,12 +35,6 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
32static void __send_request(struct ceph_osd_client *osdc, 35static void __send_request(struct ceph_osd_client *osdc,
33 struct ceph_osd_request *req); 36 struct ceph_osd_request *req);
34 37
35static int op_has_extent(int op)
36{
37 return (op == CEPH_OSD_OP_READ ||
38 op == CEPH_OSD_OP_WRITE);
39}
40
41/* 38/*
42 * Implement client access to distributed object storage cluster. 39 * Implement client access to distributed object storage cluster.
43 * 40 *
@@ -63,53 +60,238 @@ static int op_has_extent(int op)
63 * 60 *
64 * fill osd op in request message. 61 * fill osd op in request message.
65 */ 62 */
66static int calc_layout(struct ceph_vino vino, 63static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
67 struct ceph_file_layout *layout, 64 u64 *objnum, u64 *objoff, u64 *objlen)
68 u64 off, u64 *plen,
69 struct ceph_osd_request *req,
70 struct ceph_osd_req_op *op)
71{ 65{
72 u64 orig_len = *plen; 66 u64 orig_len = *plen;
73 u64 bno = 0;
74 u64 objoff = 0;
75 u64 objlen = 0;
76 int r; 67 int r;
77 68
78 /* object extent? */ 69 /* object extent? */
79 r = ceph_calc_file_object_mapping(layout, off, orig_len, &bno, 70 r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
80 &objoff, &objlen); 71 objoff, objlen);
81 if (r < 0) 72 if (r < 0)
82 return r; 73 return r;
83 if (objlen < orig_len) { 74 if (*objlen < orig_len) {
84 *plen = objlen; 75 *plen = *objlen;
85 dout(" skipping last %llu, final file extent %llu~%llu\n", 76 dout(" skipping last %llu, final file extent %llu~%llu\n",
86 orig_len - *plen, off, *plen); 77 orig_len - *plen, off, *plen);
87 } 78 }
88 79
89 if (op_has_extent(op->op)) { 80 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
90 u32 osize = le32_to_cpu(layout->fl_object_size); 81
91 op->extent.offset = objoff; 82 return 0;
92 op->extent.length = objlen; 83}
93 if (op->extent.truncate_size <= off - objoff) { 84
94 op->extent.truncate_size = 0; 85static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
95 } else { 86{
96 op->extent.truncate_size -= off - objoff; 87 memset(osd_data, 0, sizeof (*osd_data));
97 if (op->extent.truncate_size > osize) 88 osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
98 op->extent.truncate_size = osize; 89}
99 } 90
91static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
92 struct page **pages, u64 length, u32 alignment,
93 bool pages_from_pool, bool own_pages)
94{
95 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
96 osd_data->pages = pages;
97 osd_data->length = length;
98 osd_data->alignment = alignment;
99 osd_data->pages_from_pool = pages_from_pool;
100 osd_data->own_pages = own_pages;
101}
102
103static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
104 struct ceph_pagelist *pagelist)
105{
106 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
107 osd_data->pagelist = pagelist;
108}
109
110#ifdef CONFIG_BLOCK
111static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
112 struct bio *bio, size_t bio_length)
113{
114 osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
115 osd_data->bio = bio;
116 osd_data->bio_length = bio_length;
117}
118#endif /* CONFIG_BLOCK */
119
120#define osd_req_op_data(oreq, whch, typ, fld) \
121 ({ \
122 BUG_ON(whch >= (oreq)->r_num_ops); \
123 &(oreq)->r_ops[whch].typ.fld; \
124 })
125
126static struct ceph_osd_data *
127osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
128{
129 BUG_ON(which >= osd_req->r_num_ops);
130
131 return &osd_req->r_ops[which].raw_data_in;
132}
133
134struct ceph_osd_data *
135osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
136 unsigned int which)
137{
138 return osd_req_op_data(osd_req, which, extent, osd_data);
139}
140EXPORT_SYMBOL(osd_req_op_extent_osd_data);
141
142struct ceph_osd_data *
143osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
144 unsigned int which)
145{
146 return osd_req_op_data(osd_req, which, cls, response_data);
147}
148EXPORT_SYMBOL(osd_req_op_cls_response_data); /* ??? */
149
150void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
151 unsigned int which, struct page **pages,
152 u64 length, u32 alignment,
153 bool pages_from_pool, bool own_pages)
154{
155 struct ceph_osd_data *osd_data;
156
157 osd_data = osd_req_op_raw_data_in(osd_req, which);
158 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
159 pages_from_pool, own_pages);
160}
161EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
162
163void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
164 unsigned int which, struct page **pages,
165 u64 length, u32 alignment,
166 bool pages_from_pool, bool own_pages)
167{
168 struct ceph_osd_data *osd_data;
169
170 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
171 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
172 pages_from_pool, own_pages);
173}
174EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
175
176void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
177 unsigned int which, struct ceph_pagelist *pagelist)
178{
179 struct ceph_osd_data *osd_data;
180
181 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
182 ceph_osd_data_pagelist_init(osd_data, pagelist);
183}
184EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
185
186#ifdef CONFIG_BLOCK
187void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
188 unsigned int which, struct bio *bio, size_t bio_length)
189{
190 struct ceph_osd_data *osd_data;
191
192 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
193 ceph_osd_data_bio_init(osd_data, bio, bio_length);
194}
195EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
196#endif /* CONFIG_BLOCK */
197
198static void osd_req_op_cls_request_info_pagelist(
199 struct ceph_osd_request *osd_req,
200 unsigned int which, struct ceph_pagelist *pagelist)
201{
202 struct ceph_osd_data *osd_data;
203
204 osd_data = osd_req_op_data(osd_req, which, cls, request_info);
205 ceph_osd_data_pagelist_init(osd_data, pagelist);
206}
207
208void osd_req_op_cls_request_data_pagelist(
209 struct ceph_osd_request *osd_req,
210 unsigned int which, struct ceph_pagelist *pagelist)
211{
212 struct ceph_osd_data *osd_data;
213
214 osd_data = osd_req_op_data(osd_req, which, cls, request_data);
215 ceph_osd_data_pagelist_init(osd_data, pagelist);
216}
217EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
218
219void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
220 unsigned int which, struct page **pages, u64 length,
221 u32 alignment, bool pages_from_pool, bool own_pages)
222{
223 struct ceph_osd_data *osd_data;
224
225 osd_data = osd_req_op_data(osd_req, which, cls, request_data);
226 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
227 pages_from_pool, own_pages);
228}
229EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
230
231void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
232 unsigned int which, struct page **pages, u64 length,
233 u32 alignment, bool pages_from_pool, bool own_pages)
234{
235 struct ceph_osd_data *osd_data;
236
237 osd_data = osd_req_op_data(osd_req, which, cls, response_data);
238 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
239 pages_from_pool, own_pages);
240}
241EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
242
243static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
244{
245 switch (osd_data->type) {
246 case CEPH_OSD_DATA_TYPE_NONE:
247 return 0;
248 case CEPH_OSD_DATA_TYPE_PAGES:
249 return osd_data->length;
250 case CEPH_OSD_DATA_TYPE_PAGELIST:
251 return (u64)osd_data->pagelist->length;
252#ifdef CONFIG_BLOCK
253 case CEPH_OSD_DATA_TYPE_BIO:
254 return (u64)osd_data->bio_length;
255#endif /* CONFIG_BLOCK */
256 default:
257 WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
258 return 0;
100 } 259 }
101 req->r_num_pages = calc_pages_for(off, *plen); 260}
102 req->r_page_alignment = off & ~PAGE_MASK;
103 if (op->op == CEPH_OSD_OP_WRITE)
104 op->payload_len = *plen;
105 261
106 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", 262static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
107 bno, objoff, objlen, req->r_num_pages); 263{
264 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
265 int num_pages;
108 266
109 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno); 267 num_pages = calc_pages_for((u64)osd_data->alignment,
110 req->r_oid_len = strlen(req->r_oid); 268 (u64)osd_data->length);
269 ceph_release_page_vector(osd_data->pages, num_pages);
270 }
271 ceph_osd_data_init(osd_data);
272}
273
274static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
275 unsigned int which)
276{
277 struct ceph_osd_req_op *op;
278
279 BUG_ON(which >= osd_req->r_num_ops);
280 op = &osd_req->r_ops[which];
111 281
112 return r; 282 switch (op->op) {
283 case CEPH_OSD_OP_READ:
284 case CEPH_OSD_OP_WRITE:
285 ceph_osd_data_release(&op->extent.osd_data);
286 break;
287 case CEPH_OSD_OP_CALL:
288 ceph_osd_data_release(&op->cls.request_info);
289 ceph_osd_data_release(&op->cls.request_data);
290 ceph_osd_data_release(&op->cls.response_data);
291 break;
292 default:
293 break;
294 }
113} 295}
114 296
115/* 297/*
@@ -117,30 +299,26 @@ static int calc_layout(struct ceph_vino vino,
117 */ 299 */
118void ceph_osdc_release_request(struct kref *kref) 300void ceph_osdc_release_request(struct kref *kref)
119{ 301{
120 struct ceph_osd_request *req = container_of(kref, 302 struct ceph_osd_request *req;
121 struct ceph_osd_request, 303 unsigned int which;
122 r_kref);
123 304
305 req = container_of(kref, struct ceph_osd_request, r_kref);
124 if (req->r_request) 306 if (req->r_request)
125 ceph_msg_put(req->r_request); 307 ceph_msg_put(req->r_request);
126 if (req->r_con_filling_msg) { 308 if (req->r_reply) {
127 dout("%s revoking msg %p from con %p\n", __func__,
128 req->r_reply, req->r_con_filling_msg);
129 ceph_msg_revoke_incoming(req->r_reply); 309 ceph_msg_revoke_incoming(req->r_reply);
130 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
131 req->r_con_filling_msg = NULL;
132 }
133 if (req->r_reply)
134 ceph_msg_put(req->r_reply); 310 ceph_msg_put(req->r_reply);
135 if (req->r_own_pages) 311 }
136 ceph_release_page_vector(req->r_pages, 312
137 req->r_num_pages); 313 for (which = 0; which < req->r_num_ops; which++)
314 osd_req_op_data_release(req, which);
315
138 ceph_put_snap_context(req->r_snapc); 316 ceph_put_snap_context(req->r_snapc);
139 ceph_pagelist_release(&req->r_trail);
140 if (req->r_mempool) 317 if (req->r_mempool)
141 mempool_free(req, req->r_osdc->req_mempool); 318 mempool_free(req, req->r_osdc->req_mempool);
142 else 319 else
143 kfree(req); 320 kmem_cache_free(ceph_osd_request_cache, req);
321
144} 322}
145EXPORT_SYMBOL(ceph_osdc_release_request); 323EXPORT_SYMBOL(ceph_osdc_release_request);
146 324
@@ -154,6 +332,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
154 struct ceph_msg *msg; 332 struct ceph_msg *msg;
155 size_t msg_size; 333 size_t msg_size;
156 334
335 BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX);
336 BUG_ON(num_ops > CEPH_OSD_MAX_OP);
337
157 msg_size = 4 + 4 + 8 + 8 + 4+8; 338 msg_size = 4 + 4 + 8 + 8 + 4+8;
158 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 339 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
159 msg_size += 1 + 8 + 4 + 4; /* pg_t */ 340 msg_size += 1 + 8 + 4 + 4; /* pg_t */
@@ -168,13 +349,14 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
168 req = mempool_alloc(osdc->req_mempool, gfp_flags); 349 req = mempool_alloc(osdc->req_mempool, gfp_flags);
169 memset(req, 0, sizeof(*req)); 350 memset(req, 0, sizeof(*req));
170 } else { 351 } else {
171 req = kzalloc(sizeof(*req), gfp_flags); 352 req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags);
172 } 353 }
173 if (req == NULL) 354 if (req == NULL)
174 return NULL; 355 return NULL;
175 356
176 req->r_osdc = osdc; 357 req->r_osdc = osdc;
177 req->r_mempool = use_mempool; 358 req->r_mempool = use_mempool;
359 req->r_num_ops = num_ops;
178 360
179 kref_init(&req->r_kref); 361 kref_init(&req->r_kref);
180 init_completion(&req->r_completion); 362 init_completion(&req->r_completion);
@@ -198,8 +380,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
198 } 380 }
199 req->r_reply = msg; 381 req->r_reply = msg;
200 382
201 ceph_pagelist_init(&req->r_trail);
202
203 /* create request message; allow space for oid */ 383 /* create request message; allow space for oid */
204 if (use_mempool) 384 if (use_mempool)
205 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 385 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
@@ -218,60 +398,24 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
218} 398}
219EXPORT_SYMBOL(ceph_osdc_alloc_request); 399EXPORT_SYMBOL(ceph_osdc_alloc_request);
220 400
221static void osd_req_encode_op(struct ceph_osd_request *req, 401static bool osd_req_opcode_valid(u16 opcode)
222 struct ceph_osd_op *dst,
223 struct ceph_osd_req_op *src)
224{ 402{
225 dst->op = cpu_to_le16(src->op); 403 switch (opcode) {
226
227 switch (src->op) {
228 case CEPH_OSD_OP_STAT:
229 break;
230 case CEPH_OSD_OP_READ: 404 case CEPH_OSD_OP_READ:
231 case CEPH_OSD_OP_WRITE: 405 case CEPH_OSD_OP_STAT:
232 dst->extent.offset =
233 cpu_to_le64(src->extent.offset);
234 dst->extent.length =
235 cpu_to_le64(src->extent.length);
236 dst->extent.truncate_size =
237 cpu_to_le64(src->extent.truncate_size);
238 dst->extent.truncate_seq =
239 cpu_to_le32(src->extent.truncate_seq);
240 break;
241 case CEPH_OSD_OP_CALL:
242 dst->cls.class_len = src->cls.class_len;
243 dst->cls.method_len = src->cls.method_len;
244 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
245
246 ceph_pagelist_append(&req->r_trail, src->cls.class_name,
247 src->cls.class_len);
248 ceph_pagelist_append(&req->r_trail, src->cls.method_name,
249 src->cls.method_len);
250 ceph_pagelist_append(&req->r_trail, src->cls.indata,
251 src->cls.indata_len);
252 break;
253 case CEPH_OSD_OP_STARTSYNC:
254 break;
255 case CEPH_OSD_OP_NOTIFY_ACK:
256 case CEPH_OSD_OP_WATCH:
257 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
258 dst->watch.ver = cpu_to_le64(src->watch.ver);
259 dst->watch.flag = src->watch.flag;
260 break;
261 default:
262 pr_err("unrecognized osd opcode %d\n", dst->op);
263 WARN_ON(1);
264 break;
265 case CEPH_OSD_OP_MAPEXT: 406 case CEPH_OSD_OP_MAPEXT:
266 case CEPH_OSD_OP_MASKTRUNC: 407 case CEPH_OSD_OP_MASKTRUNC:
267 case CEPH_OSD_OP_SPARSE_READ: 408 case CEPH_OSD_OP_SPARSE_READ:
268 case CEPH_OSD_OP_NOTIFY: 409 case CEPH_OSD_OP_NOTIFY:
410 case CEPH_OSD_OP_NOTIFY_ACK:
269 case CEPH_OSD_OP_ASSERT_VER: 411 case CEPH_OSD_OP_ASSERT_VER:
412 case CEPH_OSD_OP_WRITE:
270 case CEPH_OSD_OP_WRITEFULL: 413 case CEPH_OSD_OP_WRITEFULL:
271 case CEPH_OSD_OP_TRUNCATE: 414 case CEPH_OSD_OP_TRUNCATE:
272 case CEPH_OSD_OP_ZERO: 415 case CEPH_OSD_OP_ZERO:
273 case CEPH_OSD_OP_DELETE: 416 case CEPH_OSD_OP_DELETE:
274 case CEPH_OSD_OP_APPEND: 417 case CEPH_OSD_OP_APPEND:
418 case CEPH_OSD_OP_STARTSYNC:
275 case CEPH_OSD_OP_SETTRUNC: 419 case CEPH_OSD_OP_SETTRUNC:
276 case CEPH_OSD_OP_TRIMTRUNC: 420 case CEPH_OSD_OP_TRIMTRUNC:
277 case CEPH_OSD_OP_TMAPUP: 421 case CEPH_OSD_OP_TMAPUP:
@@ -279,11 +423,11 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
279 case CEPH_OSD_OP_TMAPGET: 423 case CEPH_OSD_OP_TMAPGET:
280 case CEPH_OSD_OP_CREATE: 424 case CEPH_OSD_OP_CREATE:
281 case CEPH_OSD_OP_ROLLBACK: 425 case CEPH_OSD_OP_ROLLBACK:
426 case CEPH_OSD_OP_WATCH:
282 case CEPH_OSD_OP_OMAPGETKEYS: 427 case CEPH_OSD_OP_OMAPGETKEYS:
283 case CEPH_OSD_OP_OMAPGETVALS: 428 case CEPH_OSD_OP_OMAPGETVALS:
284 case CEPH_OSD_OP_OMAPGETHEADER: 429 case CEPH_OSD_OP_OMAPGETHEADER:
285 case CEPH_OSD_OP_OMAPGETVALSBYKEYS: 430 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
286 case CEPH_OSD_OP_MODE_RD:
287 case CEPH_OSD_OP_OMAPSETVALS: 431 case CEPH_OSD_OP_OMAPSETVALS:
288 case CEPH_OSD_OP_OMAPSETHEADER: 432 case CEPH_OSD_OP_OMAPSETHEADER:
289 case CEPH_OSD_OP_OMAPCLEAR: 433 case CEPH_OSD_OP_OMAPCLEAR:
@@ -314,113 +458,233 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
314 case CEPH_OSD_OP_RDUNLOCK: 458 case CEPH_OSD_OP_RDUNLOCK:
315 case CEPH_OSD_OP_UPLOCK: 459 case CEPH_OSD_OP_UPLOCK:
316 case CEPH_OSD_OP_DNLOCK: 460 case CEPH_OSD_OP_DNLOCK:
461 case CEPH_OSD_OP_CALL:
317 case CEPH_OSD_OP_PGLS: 462 case CEPH_OSD_OP_PGLS:
318 case CEPH_OSD_OP_PGLS_FILTER: 463 case CEPH_OSD_OP_PGLS_FILTER:
319 pr_err("unsupported osd opcode %s\n", 464 return true;
320 ceph_osd_op_name(dst->op)); 465 default:
321 WARN_ON(1); 466 return false;
322 break;
323 } 467 }
324 dst->payload_len = cpu_to_le32(src->payload_len);
325} 468}
326 469
327/* 470/*
328 * build new request AND message 471 * This is an osd op init function for opcodes that have no data or
329 * 472 * other information associated with them. It also serves as a
473 * common init routine for all the other init functions, below.
330 */ 474 */
331void ceph_osdc_build_request(struct ceph_osd_request *req, 475static struct ceph_osd_req_op *
332 u64 off, u64 len, unsigned int num_ops, 476_osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
333 struct ceph_osd_req_op *src_ops, 477 u16 opcode)
334 struct ceph_snap_context *snapc, u64 snap_id,
335 struct timespec *mtime)
336{ 478{
337 struct ceph_msg *msg = req->r_request; 479 struct ceph_osd_req_op *op;
338 struct ceph_osd_req_op *src_op;
339 void *p;
340 size_t msg_size;
341 int flags = req->r_flags;
342 u64 data_len;
343 int i;
344 480
345 req->r_num_ops = num_ops; 481 BUG_ON(which >= osd_req->r_num_ops);
346 req->r_snapid = snap_id; 482 BUG_ON(!osd_req_opcode_valid(opcode));
347 req->r_snapc = ceph_get_snap_context(snapc);
348 483
349 /* encode request */ 484 op = &osd_req->r_ops[which];
350 msg->hdr.version = cpu_to_le16(4); 485 memset(op, 0, sizeof (*op));
486 op->op = opcode;
351 487
352 p = msg->front.iov_base; 488 return op;
353 ceph_encode_32(&p, 1); /* client_inc is always 1 */ 489}
354 req->r_request_osdmap_epoch = p;
355 p += 4;
356 req->r_request_flags = p;
357 p += 4;
358 if (req->r_flags & CEPH_OSD_FLAG_WRITE)
359 ceph_encode_timespec(p, mtime);
360 p += sizeof(struct ceph_timespec);
361 req->r_request_reassert_version = p;
362 p += sizeof(struct ceph_eversion); /* will get filled in */
363 490
364 /* oloc */ 491void osd_req_op_init(struct ceph_osd_request *osd_req,
365 ceph_encode_8(&p, 4); 492 unsigned int which, u16 opcode)
366 ceph_encode_8(&p, 4); 493{
367 ceph_encode_32(&p, 8 + 4 + 4); 494 (void)_osd_req_op_init(osd_req, which, opcode);
368 req->r_request_pool = p; 495}
369 p += 8; 496EXPORT_SYMBOL(osd_req_op_init);
370 ceph_encode_32(&p, -1); /* preferred */
371 ceph_encode_32(&p, 0); /* key len */
372 497
373 ceph_encode_8(&p, 1); 498void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
374 req->r_request_pgid = p; 499 unsigned int which, u16 opcode,
375 p += 8 + 4; 500 u64 offset, u64 length,
376 ceph_encode_32(&p, -1); /* preferred */ 501 u64 truncate_size, u32 truncate_seq)
502{
503 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
504 size_t payload_len = 0;
377 505
378 /* oid */ 506 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
379 ceph_encode_32(&p, req->r_oid_len);
380 memcpy(p, req->r_oid, req->r_oid_len);
381 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
382 p += req->r_oid_len;
383 507
384 /* ops */ 508 op->extent.offset = offset;
385 ceph_encode_16(&p, num_ops); 509 op->extent.length = length;
386 src_op = src_ops; 510 op->extent.truncate_size = truncate_size;
387 req->r_request_ops = p; 511 op->extent.truncate_seq = truncate_seq;
388 for (i = 0; i < num_ops; i++, src_op++) { 512 if (opcode == CEPH_OSD_OP_WRITE)
389 osd_req_encode_op(req, p, src_op); 513 payload_len += length;
390 p += sizeof(struct ceph_osd_op);
391 }
392 514
393 /* snaps */ 515 op->payload_len = payload_len;
394 ceph_encode_64(&p, req->r_snapid); 516}
395 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); 517EXPORT_SYMBOL(osd_req_op_extent_init);
396 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); 518
397 if (req->r_snapc) { 519void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
398 for (i = 0; i < snapc->num_snaps; i++) { 520 unsigned int which, u64 length)
399 ceph_encode_64(&p, req->r_snapc->snaps[i]); 521{
400 } 522 struct ceph_osd_req_op *op;
523 u64 previous;
524
525 BUG_ON(which >= osd_req->r_num_ops);
526 op = &osd_req->r_ops[which];
527 previous = op->extent.length;
528
529 if (length == previous)
530 return; /* Nothing to do */
531 BUG_ON(length > previous);
532
533 op->extent.length = length;
534 op->payload_len -= previous - length;
535}
536EXPORT_SYMBOL(osd_req_op_extent_update);
537
538void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
539 u16 opcode, const char *class, const char *method)
540{
541 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
542 struct ceph_pagelist *pagelist;
543 size_t payload_len = 0;
544 size_t size;
545
546 BUG_ON(opcode != CEPH_OSD_OP_CALL);
547
548 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
549 BUG_ON(!pagelist);
550 ceph_pagelist_init(pagelist);
551
552 op->cls.class_name = class;
553 size = strlen(class);
554 BUG_ON(size > (size_t) U8_MAX);
555 op->cls.class_len = size;
556 ceph_pagelist_append(pagelist, class, size);
557 payload_len += size;
558
559 op->cls.method_name = method;
560 size = strlen(method);
561 BUG_ON(size > (size_t) U8_MAX);
562 op->cls.method_len = size;
563 ceph_pagelist_append(pagelist, method, size);
564 payload_len += size;
565
566 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
567
568 op->cls.argc = 0; /* currently unused */
569
570 op->payload_len = payload_len;
571}
572EXPORT_SYMBOL(osd_req_op_cls_init);
573
574void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
575 unsigned int which, u16 opcode,
576 u64 cookie, u64 version, int flag)
577{
578 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
579
580 BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
581
582 op->watch.cookie = cookie;
583 op->watch.ver = version;
584 if (opcode == CEPH_OSD_OP_WATCH && flag)
585 op->watch.flag = (u8)1;
586}
587EXPORT_SYMBOL(osd_req_op_watch_init);
588
589static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
590 struct ceph_osd_data *osd_data)
591{
592 u64 length = ceph_osd_data_length(osd_data);
593
594 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
595 BUG_ON(length > (u64) SIZE_MAX);
596 if (length)
597 ceph_msg_data_add_pages(msg, osd_data->pages,
598 length, osd_data->alignment);
599 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
600 BUG_ON(!length);
601 ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
602#ifdef CONFIG_BLOCK
603 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
604 ceph_msg_data_add_bio(msg, osd_data->bio, length);
605#endif
606 } else {
607 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
401 } 608 }
609}
402 610
403 req->r_request_attempts = p; 611static u64 osd_req_encode_op(struct ceph_osd_request *req,
404 p += 4; 612 struct ceph_osd_op *dst, unsigned int which)
613{
614 struct ceph_osd_req_op *src;
615 struct ceph_osd_data *osd_data;
616 u64 request_data_len = 0;
617 u64 data_length;
405 618
406 data_len = req->r_trail.length; 619 BUG_ON(which >= req->r_num_ops);
407 if (flags & CEPH_OSD_FLAG_WRITE) { 620 src = &req->r_ops[which];
408 req->r_request->hdr.data_off = cpu_to_le16(off); 621 if (WARN_ON(!osd_req_opcode_valid(src->op))) {
409 data_len += len; 622 pr_err("unrecognized osd opcode %d\n", src->op);
623
624 return 0;
410 } 625 }
411 req->r_request->hdr.data_len = cpu_to_le32(data_len);
412 req->r_request->page_alignment = req->r_page_alignment;
413 626
414 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 627 switch (src->op) {
415 msg_size = p - msg->front.iov_base; 628 case CEPH_OSD_OP_STAT:
416 msg->front.iov_len = msg_size; 629 osd_data = &src->raw_data_in;
417 msg->hdr.front_len = cpu_to_le32(msg_size); 630 ceph_osdc_msg_data_add(req->r_reply, osd_data);
631 break;
632 case CEPH_OSD_OP_READ:
633 case CEPH_OSD_OP_WRITE:
634 if (src->op == CEPH_OSD_OP_WRITE)
635 request_data_len = src->extent.length;
636 dst->extent.offset = cpu_to_le64(src->extent.offset);
637 dst->extent.length = cpu_to_le64(src->extent.length);
638 dst->extent.truncate_size =
639 cpu_to_le64(src->extent.truncate_size);
640 dst->extent.truncate_seq =
641 cpu_to_le32(src->extent.truncate_seq);
642 osd_data = &src->extent.osd_data;
643 if (src->op == CEPH_OSD_OP_WRITE)
644 ceph_osdc_msg_data_add(req->r_request, osd_data);
645 else
646 ceph_osdc_msg_data_add(req->r_reply, osd_data);
647 break;
648 case CEPH_OSD_OP_CALL:
649 dst->cls.class_len = src->cls.class_len;
650 dst->cls.method_len = src->cls.method_len;
651 osd_data = &src->cls.request_info;
652 ceph_osdc_msg_data_add(req->r_request, osd_data);
653 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
654 request_data_len = osd_data->pagelist->length;
655
656 osd_data = &src->cls.request_data;
657 data_length = ceph_osd_data_length(osd_data);
658 if (data_length) {
659 BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
660 dst->cls.indata_len = cpu_to_le32(data_length);
661 ceph_osdc_msg_data_add(req->r_request, osd_data);
662 src->payload_len += data_length;
663 request_data_len += data_length;
664 }
665 osd_data = &src->cls.response_data;
666 ceph_osdc_msg_data_add(req->r_reply, osd_data);
667 break;
668 case CEPH_OSD_OP_STARTSYNC:
669 break;
670 case CEPH_OSD_OP_NOTIFY_ACK:
671 case CEPH_OSD_OP_WATCH:
672 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
673 dst->watch.ver = cpu_to_le64(src->watch.ver);
674 dst->watch.flag = src->watch.flag;
675 break;
676 default:
677 pr_err("unsupported osd opcode %s\n",
678 ceph_osd_op_name(src->op));
679 WARN_ON(1);
418 680
419 dout("build_request msg_size was %d num_ops %d\n", (int)msg_size, 681 return 0;
420 num_ops); 682 }
421 return; 683 dst->op = cpu_to_le16(src->op);
684 dst->payload_len = cpu_to_le32(src->payload_len);
685
686 return request_data_len;
422} 687}
423EXPORT_SYMBOL(ceph_osdc_build_request);
424 688
425/* 689/*
426 * build new request AND message, calculate layout, and adjust file 690 * build new request AND message, calculate layout, and adjust file
@@ -436,51 +700,63 @@ EXPORT_SYMBOL(ceph_osdc_build_request);
436struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 700struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
437 struct ceph_file_layout *layout, 701 struct ceph_file_layout *layout,
438 struct ceph_vino vino, 702 struct ceph_vino vino,
439 u64 off, u64 *plen, 703 u64 off, u64 *plen, int num_ops,
440 int opcode, int flags, 704 int opcode, int flags,
441 struct ceph_snap_context *snapc, 705 struct ceph_snap_context *snapc,
442 int do_sync,
443 u32 truncate_seq, 706 u32 truncate_seq,
444 u64 truncate_size, 707 u64 truncate_size,
445 struct timespec *mtime, 708 bool use_mempool)
446 bool use_mempool,
447 int page_align)
448{ 709{
449 struct ceph_osd_req_op ops[2];
450 struct ceph_osd_request *req; 710 struct ceph_osd_request *req;
451 unsigned int num_op = 1; 711 u64 objnum = 0;
712 u64 objoff = 0;
713 u64 objlen = 0;
714 u32 object_size;
715 u64 object_base;
452 int r; 716 int r;
453 717
454 memset(&ops, 0, sizeof ops); 718 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
455
456 ops[0].op = opcode;
457 ops[0].extent.truncate_seq = truncate_seq;
458 ops[0].extent.truncate_size = truncate_size;
459
460 if (do_sync) {
461 ops[1].op = CEPH_OSD_OP_STARTSYNC;
462 num_op++;
463 }
464 719
465 req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool, 720 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
466 GFP_NOFS); 721 GFP_NOFS);
467 if (!req) 722 if (!req)
468 return ERR_PTR(-ENOMEM); 723 return ERR_PTR(-ENOMEM);
724
469 req->r_flags = flags; 725 req->r_flags = flags;
470 726
471 /* calculate max write size */ 727 /* calculate max write size */
472 r = calc_layout(vino, layout, off, plen, req, ops); 728 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
473 if (r < 0) 729 if (r < 0) {
730 ceph_osdc_put_request(req);
474 return ERR_PTR(r); 731 return ERR_PTR(r);
475 req->r_file_layout = *layout; /* keep a copy */ 732 }
476 733
477 /* in case it differs from natural (file) alignment that 734 object_size = le32_to_cpu(layout->fl_object_size);
478 calc_layout filled in for us */ 735 object_base = off - objoff;
479 req->r_num_pages = calc_pages_for(page_align, *plen); 736 if (truncate_size <= object_base) {
480 req->r_page_alignment = page_align; 737 truncate_size = 0;
738 } else {
739 truncate_size -= object_base;
740 if (truncate_size > object_size)
741 truncate_size = object_size;
742 }
743
744 osd_req_op_extent_init(req, 0, opcode, objoff, objlen,
745 truncate_size, truncate_seq);
746
747 /*
748 * A second op in the ops array means the caller wants to
749 * also issue a include a 'startsync' command so that the
750 * osd will flush data quickly.
751 */
752 if (num_ops > 1)
753 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
754
755 req->r_file_layout = *layout; /* keep a copy */
481 756
482 ceph_osdc_build_request(req, off, *plen, num_op, ops, 757 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx",
483 snapc, vino.snap, mtime); 758 vino.ino, objnum);
759 req->r_oid_len = strlen(req->r_oid);
484 760
485 return req; 761 return req;
486} 762}
@@ -558,21 +834,46 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
558 struct ceph_osd *osd) 834 struct ceph_osd *osd)
559{ 835{
560 struct ceph_osd_request *req, *nreq; 836 struct ceph_osd_request *req, *nreq;
837 LIST_HEAD(resend);
561 int err; 838 int err;
562 839
563 dout("__kick_osd_requests osd%d\n", osd->o_osd); 840 dout("__kick_osd_requests osd%d\n", osd->o_osd);
564 err = __reset_osd(osdc, osd); 841 err = __reset_osd(osdc, osd);
565 if (err) 842 if (err)
566 return; 843 return;
567 844 /*
845 * Build up a list of requests to resend by traversing the
846 * osd's list of requests. Requests for a given object are
847 * sent in tid order, and that is also the order they're
848 * kept on this list. Therefore all requests that are in
849 * flight will be found first, followed by all requests that
850 * have not yet been sent. And to resend requests while
851 * preserving this order we will want to put any sent
852 * requests back on the front of the osd client's unsent
853 * list.
854 *
855 * So we build a separate ordered list of already-sent
856 * requests for the affected osd and splice it onto the
857 * front of the osd client's unsent list. Once we've seen a
858 * request that has not yet been sent we're done. Those
859 * requests are already sitting right where they belong.
860 */
568 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 861 list_for_each_entry(req, &osd->o_requests, r_osd_item) {
569 list_move(&req->r_req_lru_item, &osdc->req_unsent); 862 if (!req->r_sent)
570 dout("requeued %p tid %llu osd%d\n", req, req->r_tid, 863 break;
864 list_move_tail(&req->r_req_lru_item, &resend);
865 dout("requeueing %p tid %llu osd%d\n", req, req->r_tid,
571 osd->o_osd); 866 osd->o_osd);
572 if (!req->r_linger) 867 if (!req->r_linger)
573 req->r_flags |= CEPH_OSD_FLAG_RETRY; 868 req->r_flags |= CEPH_OSD_FLAG_RETRY;
574 } 869 }
870 list_splice(&resend, &osdc->req_unsent);
575 871
872 /*
873 * Linger requests are re-registered before sending, which
874 * sets up a new tid for each. We add them to the unsent
875 * list at the end to keep things in tid order.
876 */
576 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 877 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
577 r_linger_osd) { 878 r_linger_osd) {
578 /* 879 /*
@@ -581,8 +882,8 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
581 */ 882 */
582 BUG_ON(!list_empty(&req->r_req_lru_item)); 883 BUG_ON(!list_empty(&req->r_req_lru_item));
583 __register_request(osdc, req); 884 __register_request(osdc, req);
584 list_add(&req->r_req_lru_item, &osdc->req_unsent); 885 list_add_tail(&req->r_req_lru_item, &osdc->req_unsent);
585 list_add(&req->r_osd_item, &req->r_osd->o_requests); 886 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
586 __unregister_linger_request(osdc, req); 887 __unregister_linger_request(osdc, req);
587 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid, 888 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
588 osd->o_osd); 889 osd->o_osd);
@@ -654,8 +955,7 @@ static void put_osd(struct ceph_osd *osd)
654 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { 955 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
655 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 956 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
656 957
657 if (ac->ops && ac->ops->destroy_authorizer) 958 ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer);
658 ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer);
659 kfree(osd); 959 kfree(osd);
660 } 960 }
661} 961}
@@ -820,14 +1120,6 @@ static void __register_request(struct ceph_osd_client *osdc,
820 } 1120 }
821} 1121}
822 1122
823static void register_request(struct ceph_osd_client *osdc,
824 struct ceph_osd_request *req)
825{
826 mutex_lock(&osdc->request_mutex);
827 __register_request(osdc, req);
828 mutex_unlock(&osdc->request_mutex);
829}
830
831/* 1123/*
832 * called under osdc->request_mutex 1124 * called under osdc->request_mutex
833 */ 1125 */
@@ -952,8 +1244,8 @@ static int __map_request(struct ceph_osd_client *osdc,
952 int err; 1244 int err;
953 1245
954 dout("map_request %p tid %lld\n", req, req->r_tid); 1246 dout("map_request %p tid %lld\n", req, req->r_tid);
955 err = ceph_calc_object_layout(&pgid, req->r_oid, 1247 err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap,
956 &req->r_file_layout, osdc->osdmap); 1248 ceph_file_layout_pg_pool(req->r_file_layout));
957 if (err) { 1249 if (err) {
958 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1250 list_move(&req->r_req_lru_item, &osdc->req_notarget);
959 return err; 1251 return err;
@@ -1007,10 +1299,10 @@ static int __map_request(struct ceph_osd_client *osdc,
1007 1299
1008 if (req->r_osd) { 1300 if (req->r_osd) {
1009 __remove_osd_from_lru(req->r_osd); 1301 __remove_osd_from_lru(req->r_osd);
1010 list_add(&req->r_osd_item, &req->r_osd->o_requests); 1302 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
1011 list_move(&req->r_req_lru_item, &osdc->req_unsent); 1303 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
1012 } else { 1304 } else {
1013 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1305 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
1014 } 1306 }
1015 err = 1; /* osd or pg changed */ 1307 err = 1; /* osd or pg changed */
1016 1308
@@ -1045,8 +1337,14 @@ static void __send_request(struct ceph_osd_client *osdc,
1045 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1337 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1046 1338
1047 ceph_msg_get(req->r_request); /* send consumes a ref */ 1339 ceph_msg_get(req->r_request); /* send consumes a ref */
1048 ceph_con_send(&req->r_osd->o_con, req->r_request); 1340
1341 /* Mark the request unsafe if this is the first timet's being sent. */
1342
1343 if (!req->r_sent && req->r_unsafe_callback)
1344 req->r_unsafe_callback(req, true);
1049 req->r_sent = req->r_osd->o_incarnation; 1345 req->r_sent = req->r_osd->o_incarnation;
1346
1347 ceph_con_send(&req->r_osd->o_con, req->r_request);
1050} 1348}
1051 1349
1052/* 1350/*
@@ -1134,31 +1432,11 @@ static void handle_osds_timeout(struct work_struct *work)
1134 1432
1135static void complete_request(struct ceph_osd_request *req) 1433static void complete_request(struct ceph_osd_request *req)
1136{ 1434{
1137 if (req->r_safe_callback) 1435 if (req->r_unsafe_callback)
1138 req->r_safe_callback(req, NULL); 1436 req->r_unsafe_callback(req, false);
1139 complete_all(&req->r_safe_completion); /* fsync waiter */ 1437 complete_all(&req->r_safe_completion); /* fsync waiter */
1140} 1438}
1141 1439
1142static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
1143{
1144 __u8 v;
1145
1146 ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
1147 v = ceph_decode_8(p);
1148 if (v > 1) {
1149 pr_warning("do not understand pg encoding %d > 1", v);
1150 return -EINVAL;
1151 }
1152 pgid->pool = ceph_decode_64(p);
1153 pgid->seed = ceph_decode_32(p);
1154 *p += 4;
1155 return 0;
1156
1157bad:
1158 pr_warning("incomplete pg encoding");
1159 return -EINVAL;
1160}
1161
1162/* 1440/*
1163 * handle osd op reply. either call the callback if it is specified, 1441 * handle osd op reply. either call the callback if it is specified,
1164 * or do the completion to wake up the waiting thread. 1442 * or do the completion to wake up the waiting thread.
@@ -1170,7 +1448,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1170 struct ceph_osd_request *req; 1448 struct ceph_osd_request *req;
1171 u64 tid; 1449 u64 tid;
1172 int object_len; 1450 int object_len;
1173 int numops, payload_len, flags; 1451 unsigned int numops;
1452 int payload_len, flags;
1174 s32 result; 1453 s32 result;
1175 s32 retry_attempt; 1454 s32 retry_attempt;
1176 struct ceph_pg pg; 1455 struct ceph_pg pg;
@@ -1178,7 +1457,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1178 u32 reassert_epoch; 1457 u32 reassert_epoch;
1179 u64 reassert_version; 1458 u64 reassert_version;
1180 u32 osdmap_epoch; 1459 u32 osdmap_epoch;
1181 int i; 1460 int already_completed;
1461 u32 bytes;
1462 unsigned int i;
1182 1463
1183 tid = le64_to_cpu(msg->hdr.tid); 1464 tid = le64_to_cpu(msg->hdr.tid);
1184 dout("handle_reply %p tid %llu\n", msg, tid); 1465 dout("handle_reply %p tid %llu\n", msg, tid);
@@ -1191,7 +1472,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1191 ceph_decode_need(&p, end, object_len, bad); 1472 ceph_decode_need(&p, end, object_len, bad);
1192 p += object_len; 1473 p += object_len;
1193 1474
1194 err = __decode_pgid(&p, end, &pg); 1475 err = ceph_decode_pgid(&p, end, &pg);
1195 if (err) 1476 if (err)
1196 goto bad; 1477 goto bad;
1197 1478
@@ -1207,8 +1488,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1207 req = __lookup_request(osdc, tid); 1488 req = __lookup_request(osdc, tid);
1208 if (req == NULL) { 1489 if (req == NULL) {
1209 dout("handle_reply tid %llu dne\n", tid); 1490 dout("handle_reply tid %llu dne\n", tid);
1210 mutex_unlock(&osdc->request_mutex); 1491 goto bad_mutex;
1211 return;
1212 } 1492 }
1213 ceph_osdc_get_request(req); 1493 ceph_osdc_get_request(req);
1214 1494
@@ -1233,9 +1513,10 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1233 payload_len += len; 1513 payload_len += len;
1234 p += sizeof(*op); 1514 p += sizeof(*op);
1235 } 1515 }
1236 if (payload_len != le32_to_cpu(msg->hdr.data_len)) { 1516 bytes = le32_to_cpu(msg->hdr.data_len);
1517 if (payload_len != bytes) {
1237 pr_warning("sum of op payload lens %d != data_len %d", 1518 pr_warning("sum of op payload lens %d != data_len %d",
1238 payload_len, le32_to_cpu(msg->hdr.data_len)); 1519 payload_len, bytes);
1239 goto bad_put; 1520 goto bad_put;
1240 } 1521 }
1241 1522
@@ -1244,21 +1525,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1244 for (i = 0; i < numops; i++) 1525 for (i = 0; i < numops; i++)
1245 req->r_reply_op_result[i] = ceph_decode_32(&p); 1526 req->r_reply_op_result[i] = ceph_decode_32(&p);
1246 1527
1247 /*
1248 * if this connection filled our message, drop our reference now, to
1249 * avoid a (safe but slower) revoke later.
1250 */
1251 if (req->r_con_filling_msg == con && req->r_reply == msg) {
1252 dout(" dropping con_filling_msg ref %p\n", con);
1253 req->r_con_filling_msg = NULL;
1254 con->ops->put(con);
1255 }
1256
1257 if (!req->r_got_reply) { 1528 if (!req->r_got_reply) {
1258 unsigned int bytes;
1259 1529
1260 req->r_result = result; 1530 req->r_result = result;
1261 bytes = le32_to_cpu(msg->hdr.data_len);
1262 dout("handle_reply result %d bytes %d\n", req->r_result, 1531 dout("handle_reply result %d bytes %d\n", req->r_result,
1263 bytes); 1532 bytes);
1264 if (req->r_result == 0) 1533 if (req->r_result == 0)
@@ -1286,7 +1555,11 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1286 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1555 ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1287 __unregister_request(osdc, req); 1556 __unregister_request(osdc, req);
1288 1557
1558 already_completed = req->r_completed;
1559 req->r_completed = 1;
1289 mutex_unlock(&osdc->request_mutex); 1560 mutex_unlock(&osdc->request_mutex);
1561 if (already_completed)
1562 goto done;
1290 1563
1291 if (req->r_callback) 1564 if (req->r_callback)
1292 req->r_callback(req, msg); 1565 req->r_callback(req, msg);
@@ -1303,6 +1576,8 @@ done:
1303 1576
1304bad_put: 1577bad_put:
1305 ceph_osdc_put_request(req); 1578 ceph_osdc_put_request(req);
1579bad_mutex:
1580 mutex_unlock(&osdc->request_mutex);
1306bad: 1581bad:
1307 pr_err("corrupt osd_op_reply got %d %d\n", 1582 pr_err("corrupt osd_op_reply got %d %d\n",
1308 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); 1583 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
@@ -1736,6 +2011,104 @@ bad:
1736} 2011}
1737 2012
1738/* 2013/*
2014 * build new request AND message
2015 *
2016 */
2017void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
2018 struct ceph_snap_context *snapc, u64 snap_id,
2019 struct timespec *mtime)
2020{
2021 struct ceph_msg *msg = req->r_request;
2022 void *p;
2023 size_t msg_size;
2024 int flags = req->r_flags;
2025 u64 data_len;
2026 unsigned int i;
2027
2028 req->r_snapid = snap_id;
2029 req->r_snapc = ceph_get_snap_context(snapc);
2030
2031 /* encode request */
2032 msg->hdr.version = cpu_to_le16(4);
2033
2034 p = msg->front.iov_base;
2035 ceph_encode_32(&p, 1); /* client_inc is always 1 */
2036 req->r_request_osdmap_epoch = p;
2037 p += 4;
2038 req->r_request_flags = p;
2039 p += 4;
2040 if (req->r_flags & CEPH_OSD_FLAG_WRITE)
2041 ceph_encode_timespec(p, mtime);
2042 p += sizeof(struct ceph_timespec);
2043 req->r_request_reassert_version = p;
2044 p += sizeof(struct ceph_eversion); /* will get filled in */
2045
2046 /* oloc */
2047 ceph_encode_8(&p, 4);
2048 ceph_encode_8(&p, 4);
2049 ceph_encode_32(&p, 8 + 4 + 4);
2050 req->r_request_pool = p;
2051 p += 8;
2052 ceph_encode_32(&p, -1); /* preferred */
2053 ceph_encode_32(&p, 0); /* key len */
2054
2055 ceph_encode_8(&p, 1);
2056 req->r_request_pgid = p;
2057 p += 8 + 4;
2058 ceph_encode_32(&p, -1); /* preferred */
2059
2060 /* oid */
2061 ceph_encode_32(&p, req->r_oid_len);
2062 memcpy(p, req->r_oid, req->r_oid_len);
2063 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
2064 p += req->r_oid_len;
2065
2066 /* ops--can imply data */
2067 ceph_encode_16(&p, (u16)req->r_num_ops);
2068 data_len = 0;
2069 for (i = 0; i < req->r_num_ops; i++) {
2070 data_len += osd_req_encode_op(req, p, i);
2071 p += sizeof(struct ceph_osd_op);
2072 }
2073
2074 /* snaps */
2075 ceph_encode_64(&p, req->r_snapid);
2076 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
2077 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
2078 if (req->r_snapc) {
2079 for (i = 0; i < snapc->num_snaps; i++) {
2080 ceph_encode_64(&p, req->r_snapc->snaps[i]);
2081 }
2082 }
2083
2084 req->r_request_attempts = p;
2085 p += 4;
2086
2087 /* data */
2088 if (flags & CEPH_OSD_FLAG_WRITE) {
2089 u16 data_off;
2090
2091 /*
2092 * The header "data_off" is a hint to the receiver
2093 * allowing it to align received data into its
2094 * buffers such that there's no need to re-copy
2095 * it before writing it to disk (direct I/O).
2096 */
2097 data_off = (u16) (off & 0xffff);
2098 req->r_request->hdr.data_off = cpu_to_le16(data_off);
2099 }
2100 req->r_request->hdr.data_len = cpu_to_le32(data_len);
2101
2102 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
2103 msg_size = p - msg->front.iov_base;
2104 msg->front.iov_len = msg_size;
2105 msg->hdr.front_len = cpu_to_le32(msg_size);
2106
2107 dout("build_request msg_size was %d\n", (int)msg_size);
2108}
2109EXPORT_SYMBOL(ceph_osdc_build_request);
2110
2111/*
1739 * Register request, send initial attempt. 2112 * Register request, send initial attempt.
1740 */ 2113 */
1741int ceph_osdc_start_request(struct ceph_osd_client *osdc, 2114int ceph_osdc_start_request(struct ceph_osd_client *osdc,
@@ -1744,41 +2117,26 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1744{ 2117{
1745 int rc = 0; 2118 int rc = 0;
1746 2119
1747 req->r_request->pages = req->r_pages;
1748 req->r_request->nr_pages = req->r_num_pages;
1749#ifdef CONFIG_BLOCK
1750 req->r_request->bio = req->r_bio;
1751#endif
1752 req->r_request->trail = &req->r_trail;
1753
1754 register_request(osdc, req);
1755
1756 down_read(&osdc->map_sem); 2120 down_read(&osdc->map_sem);
1757 mutex_lock(&osdc->request_mutex); 2121 mutex_lock(&osdc->request_mutex);
1758 /* 2122 __register_request(osdc, req);
1759 * a racing kick_requests() may have sent the message for us 2123 WARN_ON(req->r_sent);
1760 * while we dropped request_mutex above, so only send now if 2124 rc = __map_request(osdc, req, 0);
1761 * the request still han't been touched yet. 2125 if (rc < 0) {
1762 */ 2126 if (nofail) {
1763 if (req->r_sent == 0) { 2127 dout("osdc_start_request failed map, "
1764 rc = __map_request(osdc, req, 0); 2128 " will retry %lld\n", req->r_tid);
1765 if (rc < 0) { 2129 rc = 0;
1766 if (nofail) {
1767 dout("osdc_start_request failed map, "
1768 " will retry %lld\n", req->r_tid);
1769 rc = 0;
1770 }
1771 goto out_unlock;
1772 }
1773 if (req->r_osd == NULL) {
1774 dout("send_request %p no up osds in pg\n", req);
1775 ceph_monc_request_next_osdmap(&osdc->client->monc);
1776 } else {
1777 __send_request(osdc, req);
1778 } 2130 }
1779 rc = 0; 2131 goto out_unlock;
1780 } 2132 }
1781 2133 if (req->r_osd == NULL) {
2134 dout("send_request %p no up osds in pg\n", req);
2135 ceph_monc_request_next_osdmap(&osdc->client->monc);
2136 } else {
2137 __send_queued(osdc);
2138 }
2139 rc = 0;
1782out_unlock: 2140out_unlock:
1783 mutex_unlock(&osdc->request_mutex); 2141 mutex_unlock(&osdc->request_mutex);
1784 up_read(&osdc->map_sem); 2142 up_read(&osdc->map_sem);
@@ -1940,18 +2298,22 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1940 2298
1941 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 2299 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1942 vino.snap, off, *plen); 2300 vino.snap, off, *plen);
1943 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 2301 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1,
1944 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 2302 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1945 NULL, 0, truncate_seq, truncate_size, NULL, 2303 NULL, truncate_seq, truncate_size,
1946 false, page_align); 2304 false);
1947 if (IS_ERR(req)) 2305 if (IS_ERR(req))
1948 return PTR_ERR(req); 2306 return PTR_ERR(req);
1949 2307
1950 /* it may be a short read due to an object boundary */ 2308 /* it may be a short read due to an object boundary */
1951 req->r_pages = pages;
1952 2309
1953 dout("readpages final extent is %llu~%llu (%d pages align %d)\n", 2310 osd_req_op_extent_osd_data_pages(req, 0,
1954 off, *plen, req->r_num_pages, page_align); 2311 pages, *plen, page_align, false, false);
2312
2313 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
2314 off, *plen, *plen, page_align);
2315
2316 ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
1955 2317
1956 rc = ceph_osdc_start_request(osdc, req, false); 2318 rc = ceph_osdc_start_request(osdc, req, false);
1957 if (!rc) 2319 if (!rc)
@@ -1978,20 +2340,21 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1978 int rc = 0; 2340 int rc = 0;
1979 int page_align = off & ~PAGE_MASK; 2341 int page_align = off & ~PAGE_MASK;
1980 2342
1981 BUG_ON(vino.snap != CEPH_NOSNAP); 2343 BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */
1982 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 2344 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1,
1983 CEPH_OSD_OP_WRITE, 2345 CEPH_OSD_OP_WRITE,
1984 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2346 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1985 snapc, 0, 2347 snapc, truncate_seq, truncate_size,
1986 truncate_seq, truncate_size, mtime, 2348 true);
1987 true, page_align);
1988 if (IS_ERR(req)) 2349 if (IS_ERR(req))
1989 return PTR_ERR(req); 2350 return PTR_ERR(req);
1990 2351
1991 /* it may be a short write due to an object boundary */ 2352 /* it may be a short write due to an object boundary */
1992 req->r_pages = pages; 2353 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
1993 dout("writepages %llu~%llu (%d pages)\n", off, len, 2354 false, false);
1994 req->r_num_pages); 2355 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
2356
2357 ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime);
1995 2358
1996 rc = ceph_osdc_start_request(osdc, req, true); 2359 rc = ceph_osdc_start_request(osdc, req, true);
1997 if (!rc) 2360 if (!rc)
@@ -2005,6 +2368,26 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
2005} 2368}
2006EXPORT_SYMBOL(ceph_osdc_writepages); 2369EXPORT_SYMBOL(ceph_osdc_writepages);
2007 2370
2371int ceph_osdc_setup(void)
2372{
2373 BUG_ON(ceph_osd_request_cache);
2374 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request",
2375 sizeof (struct ceph_osd_request),
2376 __alignof__(struct ceph_osd_request),
2377 0, NULL);
2378
2379 return ceph_osd_request_cache ? 0 : -ENOMEM;
2380}
2381EXPORT_SYMBOL(ceph_osdc_setup);
2382
2383void ceph_osdc_cleanup(void)
2384{
2385 BUG_ON(!ceph_osd_request_cache);
2386 kmem_cache_destroy(ceph_osd_request_cache);
2387 ceph_osd_request_cache = NULL;
2388}
2389EXPORT_SYMBOL(ceph_osdc_cleanup);
2390
2008/* 2391/*
2009 * handle incoming message 2392 * handle incoming message
2010 */ 2393 */
@@ -2064,13 +2447,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2064 goto out; 2447 goto out;
2065 } 2448 }
2066 2449
2067 if (req->r_con_filling_msg) { 2450 if (req->r_reply->con)
2068 dout("%s revoking msg %p from old con %p\n", __func__, 2451 dout("%s revoking msg %p from old con %p\n", __func__,
2069 req->r_reply, req->r_con_filling_msg); 2452 req->r_reply, req->r_reply->con);
2070 ceph_msg_revoke_incoming(req->r_reply); 2453 ceph_msg_revoke_incoming(req->r_reply);
2071 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
2072 req->r_con_filling_msg = NULL;
2073 }
2074 2454
2075 if (front > req->r_reply->front.iov_len) { 2455 if (front > req->r_reply->front.iov_len) {
2076 pr_warning("get_reply front %d > preallocated %d\n", 2456 pr_warning("get_reply front %d > preallocated %d\n",
@@ -2084,26 +2464,29 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2084 m = ceph_msg_get(req->r_reply); 2464 m = ceph_msg_get(req->r_reply);
2085 2465
2086 if (data_len > 0) { 2466 if (data_len > 0) {
2087 int want = calc_pages_for(req->r_page_alignment, data_len); 2467 struct ceph_osd_data *osd_data;
2088 2468
2089 if (req->r_pages && unlikely(req->r_num_pages < want)) { 2469 /*
2090 pr_warning("tid %lld reply has %d bytes %d pages, we" 2470 * XXX This is assuming there is only one op containing
2091 " had only %d pages ready\n", tid, data_len, 2471 * XXX page data. Probably OK for reads, but this
2092 want, req->r_num_pages); 2472 * XXX ought to be done more generally.
2093 *skip = 1; 2473 */
2094 ceph_msg_put(m); 2474 osd_data = osd_req_op_extent_osd_data(req, 0);
2095 m = NULL; 2475 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
2096 goto out; 2476 if (osd_data->pages &&
2477 unlikely(osd_data->length < data_len)) {
2478
2479 pr_warning("tid %lld reply has %d bytes "
2480 "we had only %llu bytes ready\n",
2481 tid, data_len, osd_data->length);
2482 *skip = 1;
2483 ceph_msg_put(m);
2484 m = NULL;
2485 goto out;
2486 }
2097 } 2487 }
2098 m->pages = req->r_pages;
2099 m->nr_pages = req->r_num_pages;
2100 m->page_alignment = req->r_page_alignment;
2101#ifdef CONFIG_BLOCK
2102 m->bio = req->r_bio;
2103#endif
2104 } 2488 }
2105 *skip = 0; 2489 *skip = 0;
2106 req->r_con_filling_msg = con->ops->get(con);
2107 dout("get_reply tid %lld %p\n", tid, m); 2490 dout("get_reply tid %lld %p\n", tid, m);
2108 2491
2109out: 2492out:
@@ -2168,13 +2551,17 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
2168 struct ceph_auth_handshake *auth = &o->o_auth; 2551 struct ceph_auth_handshake *auth = &o->o_auth;
2169 2552
2170 if (force_new && auth->authorizer) { 2553 if (force_new && auth->authorizer) {
2171 if (ac->ops && ac->ops->destroy_authorizer) 2554 ceph_auth_destroy_authorizer(ac, auth->authorizer);
2172 ac->ops->destroy_authorizer(ac, auth->authorizer);
2173 auth->authorizer = NULL; 2555 auth->authorizer = NULL;
2174 } 2556 }
2175 if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) { 2557 if (!auth->authorizer) {
2176 int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2558 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2177 auth); 2559 auth);
2560 if (ret)
2561 return ERR_PTR(ret);
2562 } else {
2563 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2564 auth);
2178 if (ret) 2565 if (ret)
2179 return ERR_PTR(ret); 2566 return ERR_PTR(ret);
2180 } 2567 }
@@ -2190,11 +2577,7 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len)
2190 struct ceph_osd_client *osdc = o->o_osdc; 2577 struct ceph_osd_client *osdc = o->o_osdc;
2191 struct ceph_auth_client *ac = osdc->client->monc.auth; 2578 struct ceph_auth_client *ac = osdc->client->monc.auth;
2192 2579
2193 /* 2580 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len);
2194 * XXX If ac->ops or ac->ops->verify_authorizer_reply is null,
2195 * XXX which do we do: succeed or fail?
2196 */
2197 return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len);
2198} 2581}
2199 2582
2200static int invalidate_authorizer(struct ceph_connection *con) 2583static int invalidate_authorizer(struct ceph_connection *con)
@@ -2203,9 +2586,7 @@ static int invalidate_authorizer(struct ceph_connection *con)
2203 struct ceph_osd_client *osdc = o->o_osdc; 2586 struct ceph_osd_client *osdc = o->o_osdc;
2204 struct ceph_auth_client *ac = osdc->client->monc.auth; 2587 struct ceph_auth_client *ac = osdc->client->monc.auth;
2205 2588
2206 if (ac->ops && ac->ops->invalidate_authorizer) 2589 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2207 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2208
2209 return ceph_monc_validate_auth(&osdc->client->monc); 2590 return ceph_monc_validate_auth(&osdc->client->monc);
2210} 2591}
2211 2592