aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/osd_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r--net/ceph/osd_client.c635
1 files changed, 337 insertions, 298 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index eb9a44478764..d730dd4d8eb2 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -23,7 +23,7 @@
23 23
24static const struct ceph_connection_operations osd_con_ops; 24static const struct ceph_connection_operations osd_con_ops;
25 25
26static void send_queued(struct ceph_osd_client *osdc); 26static void __send_queued(struct ceph_osd_client *osdc);
27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28static void __register_request(struct ceph_osd_client *osdc, 28static void __register_request(struct ceph_osd_client *osdc,
29 struct ceph_osd_request *req); 29 struct ceph_osd_request *req);
@@ -32,64 +32,12 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
32static void __send_request(struct ceph_osd_client *osdc, 32static void __send_request(struct ceph_osd_client *osdc,
33 struct ceph_osd_request *req); 33 struct ceph_osd_request *req);
34 34
35static int op_needs_trail(int op)
36{
37 switch (op) {
38 case CEPH_OSD_OP_GETXATTR:
39 case CEPH_OSD_OP_SETXATTR:
40 case CEPH_OSD_OP_CMPXATTR:
41 case CEPH_OSD_OP_CALL:
42 case CEPH_OSD_OP_NOTIFY:
43 return 1;
44 default:
45 return 0;
46 }
47}
48
49static int op_has_extent(int op) 35static int op_has_extent(int op)
50{ 36{
51 return (op == CEPH_OSD_OP_READ || 37 return (op == CEPH_OSD_OP_READ ||
52 op == CEPH_OSD_OP_WRITE); 38 op == CEPH_OSD_OP_WRITE);
53} 39}
54 40
55int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
56 struct ceph_file_layout *layout,
57 u64 snapid,
58 u64 off, u64 *plen, u64 *bno,
59 struct ceph_osd_request *req,
60 struct ceph_osd_req_op *op)
61{
62 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
63 u64 orig_len = *plen;
64 u64 objoff, objlen; /* extent in object */
65 int r;
66
67 reqhead->snapid = cpu_to_le64(snapid);
68
69 /* object extent? */
70 r = ceph_calc_file_object_mapping(layout, off, plen, bno,
71 &objoff, &objlen);
72 if (r < 0)
73 return r;
74 if (*plen < orig_len)
75 dout(" skipping last %llu, final file extent %llu~%llu\n",
76 orig_len - *plen, off, *plen);
77
78 if (op_has_extent(op->op)) {
79 op->extent.offset = objoff;
80 op->extent.length = objlen;
81 }
82 req->r_num_pages = calc_pages_for(off, *plen);
83 req->r_page_alignment = off & ~PAGE_MASK;
84 if (op->op == CEPH_OSD_OP_WRITE)
85 op->payload_len = *plen;
86
87 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
88 *bno, objoff, objlen, req->r_num_pages);
89 return 0;
90}
91EXPORT_SYMBOL(ceph_calc_raw_layout);
92
93/* 41/*
94 * Implement client access to distributed object storage cluster. 42 * Implement client access to distributed object storage cluster.
95 * 43 *
@@ -115,20 +63,48 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
115 * 63 *
116 * fill osd op in request message. 64 * fill osd op in request message.
117 */ 65 */
118static int calc_layout(struct ceph_osd_client *osdc, 66static int calc_layout(struct ceph_vino vino,
119 struct ceph_vino vino,
120 struct ceph_file_layout *layout, 67 struct ceph_file_layout *layout,
121 u64 off, u64 *plen, 68 u64 off, u64 *plen,
122 struct ceph_osd_request *req, 69 struct ceph_osd_request *req,
123 struct ceph_osd_req_op *op) 70 struct ceph_osd_req_op *op)
124{ 71{
125 u64 bno; 72 u64 orig_len = *plen;
73 u64 bno = 0;
74 u64 objoff = 0;
75 u64 objlen = 0;
126 int r; 76 int r;
127 77
128 r = ceph_calc_raw_layout(osdc, layout, vino.snap, off, 78 /* object extent? */
129 plen, &bno, req, op); 79 r = ceph_calc_file_object_mapping(layout, off, orig_len, &bno,
80 &objoff, &objlen);
130 if (r < 0) 81 if (r < 0)
131 return r; 82 return r;
83 if (objlen < orig_len) {
84 *plen = objlen;
85 dout(" skipping last %llu, final file extent %llu~%llu\n",
86 orig_len - *plen, off, *plen);
87 }
88
89 if (op_has_extent(op->op)) {
90 u32 osize = le32_to_cpu(layout->fl_object_size);
91 op->extent.offset = objoff;
92 op->extent.length = objlen;
93 if (op->extent.truncate_size <= off - objoff) {
94 op->extent.truncate_size = 0;
95 } else {
96 op->extent.truncate_size -= off - objoff;
97 if (op->extent.truncate_size > osize)
98 op->extent.truncate_size = osize;
99 }
100 }
101 req->r_num_pages = calc_pages_for(off, *plen);
102 req->r_page_alignment = off & ~PAGE_MASK;
103 if (op->op == CEPH_OSD_OP_WRITE)
104 op->payload_len = *plen;
105
106 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
107 bno, objoff, objlen, req->r_num_pages);
132 108
133 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno); 109 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
134 req->r_oid_len = strlen(req->r_oid); 110 req->r_oid_len = strlen(req->r_oid);
@@ -148,25 +124,19 @@ void ceph_osdc_release_request(struct kref *kref)
148 if (req->r_request) 124 if (req->r_request)
149 ceph_msg_put(req->r_request); 125 ceph_msg_put(req->r_request);
150 if (req->r_con_filling_msg) { 126 if (req->r_con_filling_msg) {
151 dout("%s revoking pages %p from con %p\n", __func__, 127 dout("%s revoking msg %p from con %p\n", __func__,
152 req->r_pages, req->r_con_filling_msg); 128 req->r_reply, req->r_con_filling_msg);
153 ceph_msg_revoke_incoming(req->r_reply); 129 ceph_msg_revoke_incoming(req->r_reply);
154 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 130 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
131 req->r_con_filling_msg = NULL;
155 } 132 }
156 if (req->r_reply) 133 if (req->r_reply)
157 ceph_msg_put(req->r_reply); 134 ceph_msg_put(req->r_reply);
158 if (req->r_own_pages) 135 if (req->r_own_pages)
159 ceph_release_page_vector(req->r_pages, 136 ceph_release_page_vector(req->r_pages,
160 req->r_num_pages); 137 req->r_num_pages);
161#ifdef CONFIG_BLOCK
162 if (req->r_bio)
163 bio_put(req->r_bio);
164#endif
165 ceph_put_snap_context(req->r_snapc); 138 ceph_put_snap_context(req->r_snapc);
166 if (req->r_trail) { 139 ceph_pagelist_release(&req->r_trail);
167 ceph_pagelist_release(req->r_trail);
168 kfree(req->r_trail);
169 }
170 if (req->r_mempool) 140 if (req->r_mempool)
171 mempool_free(req, req->r_osdc->req_mempool); 141 mempool_free(req, req->r_osdc->req_mempool);
172 else 142 else
@@ -174,37 +144,25 @@ void ceph_osdc_release_request(struct kref *kref)
174} 144}
175EXPORT_SYMBOL(ceph_osdc_release_request); 145EXPORT_SYMBOL(ceph_osdc_release_request);
176 146
177static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
178{
179 int i = 0;
180
181 if (needs_trail)
182 *needs_trail = 0;
183 while (ops[i].op) {
184 if (needs_trail && op_needs_trail(ops[i].op))
185 *needs_trail = 1;
186 i++;
187 }
188
189 return i;
190}
191
192struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 147struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
193 int flags,
194 struct ceph_snap_context *snapc, 148 struct ceph_snap_context *snapc,
195 struct ceph_osd_req_op *ops, 149 unsigned int num_ops,
196 bool use_mempool, 150 bool use_mempool,
197 gfp_t gfp_flags, 151 gfp_t gfp_flags)
198 struct page **pages,
199 struct bio *bio)
200{ 152{
201 struct ceph_osd_request *req; 153 struct ceph_osd_request *req;
202 struct ceph_msg *msg; 154 struct ceph_msg *msg;
203 int needs_trail; 155 size_t msg_size;
204 int num_op = get_num_ops(ops, &needs_trail); 156
205 size_t msg_size = sizeof(struct ceph_osd_request_head); 157 msg_size = 4 + 4 + 8 + 8 + 4+8;
206 158 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
207 msg_size += num_op*sizeof(struct ceph_osd_op); 159 msg_size += 1 + 8 + 4 + 4; /* pg_t */
160 msg_size += 4 + MAX_OBJ_NAME_SIZE;
161 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
162 msg_size += 8; /* snapid */
163 msg_size += 8; /* snap_seq */
164 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
165 msg_size += 4;
208 166
209 if (use_mempool) { 167 if (use_mempool) {
210 req = mempool_alloc(osdc->req_mempool, gfp_flags); 168 req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -228,10 +186,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
228 INIT_LIST_HEAD(&req->r_req_lru_item); 186 INIT_LIST_HEAD(&req->r_req_lru_item);
229 INIT_LIST_HEAD(&req->r_osd_item); 187 INIT_LIST_HEAD(&req->r_osd_item);
230 188
231 req->r_flags = flags;
232
233 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
234
235 /* create reply message */ 189 /* create reply message */
236 if (use_mempool) 190 if (use_mempool)
237 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 191 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -244,20 +198,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
244 } 198 }
245 req->r_reply = msg; 199 req->r_reply = msg;
246 200
247 /* allocate space for the trailing data */ 201 ceph_pagelist_init(&req->r_trail);
248 if (needs_trail) {
249 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
250 if (!req->r_trail) {
251 ceph_osdc_put_request(req);
252 return NULL;
253 }
254 ceph_pagelist_init(req->r_trail);
255 }
256 202
257 /* create request message; allow space for oid */ 203 /* create request message; allow space for oid */
258 msg_size += MAX_OBJ_NAME_SIZE;
259 if (snapc)
260 msg_size += sizeof(u64) * snapc->num_snaps;
261 if (use_mempool) 204 if (use_mempool)
262 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 205 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
263 else 206 else
@@ -270,13 +213,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
270 memset(msg->front.iov_base, 0, msg->front.iov_len); 213 memset(msg->front.iov_base, 0, msg->front.iov_len);
271 214
272 req->r_request = msg; 215 req->r_request = msg;
273 req->r_pages = pages;
274#ifdef CONFIG_BLOCK
275 if (bio) {
276 req->r_bio = bio;
277 bio_get(req->r_bio);
278 }
279#endif
280 216
281 return req; 217 return req;
282} 218}
@@ -289,6 +225,8 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
289 dst->op = cpu_to_le16(src->op); 225 dst->op = cpu_to_le16(src->op);
290 226
291 switch (src->op) { 227 switch (src->op) {
228 case CEPH_OSD_OP_STAT:
229 break;
292 case CEPH_OSD_OP_READ: 230 case CEPH_OSD_OP_READ:
293 case CEPH_OSD_OP_WRITE: 231 case CEPH_OSD_OP_WRITE:
294 dst->extent.offset = 232 dst->extent.offset =
@@ -300,52 +238,20 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
300 dst->extent.truncate_seq = 238 dst->extent.truncate_seq =
301 cpu_to_le32(src->extent.truncate_seq); 239 cpu_to_le32(src->extent.truncate_seq);
302 break; 240 break;
303
304 case CEPH_OSD_OP_GETXATTR:
305 case CEPH_OSD_OP_SETXATTR:
306 case CEPH_OSD_OP_CMPXATTR:
307 BUG_ON(!req->r_trail);
308
309 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
310 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
311 dst->xattr.cmp_op = src->xattr.cmp_op;
312 dst->xattr.cmp_mode = src->xattr.cmp_mode;
313 ceph_pagelist_append(req->r_trail, src->xattr.name,
314 src->xattr.name_len);
315 ceph_pagelist_append(req->r_trail, src->xattr.val,
316 src->xattr.value_len);
317 break;
318 case CEPH_OSD_OP_CALL: 241 case CEPH_OSD_OP_CALL:
319 BUG_ON(!req->r_trail);
320
321 dst->cls.class_len = src->cls.class_len; 242 dst->cls.class_len = src->cls.class_len;
322 dst->cls.method_len = src->cls.method_len; 243 dst->cls.method_len = src->cls.method_len;
323 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 244 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
324 245
325 ceph_pagelist_append(req->r_trail, src->cls.class_name, 246 ceph_pagelist_append(&req->r_trail, src->cls.class_name,
326 src->cls.class_len); 247 src->cls.class_len);
327 ceph_pagelist_append(req->r_trail, src->cls.method_name, 248 ceph_pagelist_append(&req->r_trail, src->cls.method_name,
328 src->cls.method_len); 249 src->cls.method_len);
329 ceph_pagelist_append(req->r_trail, src->cls.indata, 250 ceph_pagelist_append(&req->r_trail, src->cls.indata,
330 src->cls.indata_len); 251 src->cls.indata_len);
331 break; 252 break;
332 case CEPH_OSD_OP_ROLLBACK:
333 dst->snap.snapid = cpu_to_le64(src->snap.snapid);
334 break;
335 case CEPH_OSD_OP_STARTSYNC: 253 case CEPH_OSD_OP_STARTSYNC:
336 break; 254 break;
337 case CEPH_OSD_OP_NOTIFY:
338 {
339 __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
340 __le32 timeout = cpu_to_le32(src->watch.timeout);
341
342 BUG_ON(!req->r_trail);
343
344 ceph_pagelist_append(req->r_trail,
345 &prot_ver, sizeof(prot_ver));
346 ceph_pagelist_append(req->r_trail,
347 &timeout, sizeof(timeout));
348 }
349 case CEPH_OSD_OP_NOTIFY_ACK: 255 case CEPH_OSD_OP_NOTIFY_ACK:
350 case CEPH_OSD_OP_WATCH: 256 case CEPH_OSD_OP_WATCH:
351 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 257 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
@@ -356,6 +262,64 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
356 pr_err("unrecognized osd opcode %d\n", dst->op); 262 pr_err("unrecognized osd opcode %d\n", dst->op);
357 WARN_ON(1); 263 WARN_ON(1);
358 break; 264 break;
265 case CEPH_OSD_OP_MAPEXT:
266 case CEPH_OSD_OP_MASKTRUNC:
267 case CEPH_OSD_OP_SPARSE_READ:
268 case CEPH_OSD_OP_NOTIFY:
269 case CEPH_OSD_OP_ASSERT_VER:
270 case CEPH_OSD_OP_WRITEFULL:
271 case CEPH_OSD_OP_TRUNCATE:
272 case CEPH_OSD_OP_ZERO:
273 case CEPH_OSD_OP_DELETE:
274 case CEPH_OSD_OP_APPEND:
275 case CEPH_OSD_OP_SETTRUNC:
276 case CEPH_OSD_OP_TRIMTRUNC:
277 case CEPH_OSD_OP_TMAPUP:
278 case CEPH_OSD_OP_TMAPPUT:
279 case CEPH_OSD_OP_TMAPGET:
280 case CEPH_OSD_OP_CREATE:
281 case CEPH_OSD_OP_ROLLBACK:
282 case CEPH_OSD_OP_OMAPGETKEYS:
283 case CEPH_OSD_OP_OMAPGETVALS:
284 case CEPH_OSD_OP_OMAPGETHEADER:
285 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
286 case CEPH_OSD_OP_MODE_RD:
287 case CEPH_OSD_OP_OMAPSETVALS:
288 case CEPH_OSD_OP_OMAPSETHEADER:
289 case CEPH_OSD_OP_OMAPCLEAR:
290 case CEPH_OSD_OP_OMAPRMKEYS:
291 case CEPH_OSD_OP_OMAP_CMP:
292 case CEPH_OSD_OP_CLONERANGE:
293 case CEPH_OSD_OP_ASSERT_SRC_VERSION:
294 case CEPH_OSD_OP_SRC_CMPXATTR:
295 case CEPH_OSD_OP_GETXATTR:
296 case CEPH_OSD_OP_GETXATTRS:
297 case CEPH_OSD_OP_CMPXATTR:
298 case CEPH_OSD_OP_SETXATTR:
299 case CEPH_OSD_OP_SETXATTRS:
300 case CEPH_OSD_OP_RESETXATTRS:
301 case CEPH_OSD_OP_RMXATTR:
302 case CEPH_OSD_OP_PULL:
303 case CEPH_OSD_OP_PUSH:
304 case CEPH_OSD_OP_BALANCEREADS:
305 case CEPH_OSD_OP_UNBALANCEREADS:
306 case CEPH_OSD_OP_SCRUB:
307 case CEPH_OSD_OP_SCRUB_RESERVE:
308 case CEPH_OSD_OP_SCRUB_UNRESERVE:
309 case CEPH_OSD_OP_SCRUB_STOP:
310 case CEPH_OSD_OP_SCRUB_MAP:
311 case CEPH_OSD_OP_WRLOCK:
312 case CEPH_OSD_OP_WRUNLOCK:
313 case CEPH_OSD_OP_RDLOCK:
314 case CEPH_OSD_OP_RDUNLOCK:
315 case CEPH_OSD_OP_UPLOCK:
316 case CEPH_OSD_OP_DNLOCK:
317 case CEPH_OSD_OP_PGLS:
318 case CEPH_OSD_OP_PGLS_FILTER:
319 pr_err("unsupported osd opcode %s\n",
320 ceph_osd_op_name(dst->op));
321 WARN_ON(1);
322 break;
359 } 323 }
360 dst->payload_len = cpu_to_le32(src->payload_len); 324 dst->payload_len = cpu_to_le32(src->payload_len);
361} 325}
@@ -365,75 +329,95 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
365 * 329 *
366 */ 330 */
367void ceph_osdc_build_request(struct ceph_osd_request *req, 331void ceph_osdc_build_request(struct ceph_osd_request *req,
368 u64 off, u64 *plen, 332 u64 off, u64 len, unsigned int num_ops,
369 struct ceph_osd_req_op *src_ops, 333 struct ceph_osd_req_op *src_ops,
370 struct ceph_snap_context *snapc, 334 struct ceph_snap_context *snapc, u64 snap_id,
371 struct timespec *mtime, 335 struct timespec *mtime)
372 const char *oid,
373 int oid_len)
374{ 336{
375 struct ceph_msg *msg = req->r_request; 337 struct ceph_msg *msg = req->r_request;
376 struct ceph_osd_request_head *head;
377 struct ceph_osd_req_op *src_op; 338 struct ceph_osd_req_op *src_op;
378 struct ceph_osd_op *op;
379 void *p; 339 void *p;
380 int num_op = get_num_ops(src_ops, NULL); 340 size_t msg_size;
381 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
382 int flags = req->r_flags; 341 int flags = req->r_flags;
383 u64 data_len = 0; 342 u64 data_len;
384 int i; 343 int i;
385 344
386 head = msg->front.iov_base; 345 req->r_num_ops = num_ops;
387 op = (void *)(head + 1); 346 req->r_snapid = snap_id;
388 p = (void *)(op + num_op);
389
390 req->r_snapc = ceph_get_snap_context(snapc); 347 req->r_snapc = ceph_get_snap_context(snapc);
391 348
392 head->client_inc = cpu_to_le32(1); /* always, for now. */ 349 /* encode request */
393 head->flags = cpu_to_le32(flags); 350 msg->hdr.version = cpu_to_le16(4);
394 if (flags & CEPH_OSD_FLAG_WRITE)
395 ceph_encode_timespec(&head->mtime, mtime);
396 head->num_ops = cpu_to_le16(num_op);
397
398
399 /* fill in oid */
400 head->object_len = cpu_to_le32(oid_len);
401 memcpy(p, oid, oid_len);
402 p += oid_len;
403 351
352 p = msg->front.iov_base;
353 ceph_encode_32(&p, 1); /* client_inc is always 1 */
354 req->r_request_osdmap_epoch = p;
355 p += 4;
356 req->r_request_flags = p;
357 p += 4;
358 if (req->r_flags & CEPH_OSD_FLAG_WRITE)
359 ceph_encode_timespec(p, mtime);
360 p += sizeof(struct ceph_timespec);
361 req->r_request_reassert_version = p;
362 p += sizeof(struct ceph_eversion); /* will get filled in */
363
364 /* oloc */
365 ceph_encode_8(&p, 4);
366 ceph_encode_8(&p, 4);
367 ceph_encode_32(&p, 8 + 4 + 4);
368 req->r_request_pool = p;
369 p += 8;
370 ceph_encode_32(&p, -1); /* preferred */
371 ceph_encode_32(&p, 0); /* key len */
372
373 ceph_encode_8(&p, 1);
374 req->r_request_pgid = p;
375 p += 8 + 4;
376 ceph_encode_32(&p, -1); /* preferred */
377
378 /* oid */
379 ceph_encode_32(&p, req->r_oid_len);
380 memcpy(p, req->r_oid, req->r_oid_len);
381 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
382 p += req->r_oid_len;
383
384 /* ops */
385 ceph_encode_16(&p, num_ops);
404 src_op = src_ops; 386 src_op = src_ops;
405 while (src_op->op) { 387 req->r_request_ops = p;
406 osd_req_encode_op(req, op, src_op); 388 for (i = 0; i < num_ops; i++, src_op++) {
407 src_op++; 389 osd_req_encode_op(req, p, src_op);
408 op++; 390 p += sizeof(struct ceph_osd_op);
409 } 391 }
410 392
411 if (req->r_trail) 393 /* snaps */
412 data_len += req->r_trail->length; 394 ceph_encode_64(&p, req->r_snapid);
413 395 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
414 if (snapc) { 396 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
415 head->snap_seq = cpu_to_le64(snapc->seq); 397 if (req->r_snapc) {
416 head->num_snaps = cpu_to_le32(snapc->num_snaps);
417 for (i = 0; i < snapc->num_snaps; i++) { 398 for (i = 0; i < snapc->num_snaps; i++) {
418 put_unaligned_le64(snapc->snaps[i], p); 399 ceph_encode_64(&p, req->r_snapc->snaps[i]);
419 p += sizeof(u64);
420 } 400 }
421 } 401 }
422 402
403 req->r_request_attempts = p;
404 p += 4;
405
406 data_len = req->r_trail.length;
423 if (flags & CEPH_OSD_FLAG_WRITE) { 407 if (flags & CEPH_OSD_FLAG_WRITE) {
424 req->r_request->hdr.data_off = cpu_to_le16(off); 408 req->r_request->hdr.data_off = cpu_to_le16(off);
425 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len); 409 data_len += len;
426 } else if (data_len) {
427 req->r_request->hdr.data_off = 0;
428 req->r_request->hdr.data_len = cpu_to_le32(data_len);
429 } 410 }
430 411 req->r_request->hdr.data_len = cpu_to_le32(data_len);
431 req->r_request->page_alignment = req->r_page_alignment; 412 req->r_request->page_alignment = req->r_page_alignment;
432 413
433 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 414 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
434 msg_size = p - msg->front.iov_base; 415 msg_size = p - msg->front.iov_base;
435 msg->front.iov_len = msg_size; 416 msg->front.iov_len = msg_size;
436 msg->hdr.front_len = cpu_to_le32(msg_size); 417 msg->hdr.front_len = cpu_to_le32(msg_size);
418
419 dout("build_request msg_size was %d num_ops %d\n", (int)msg_size,
420 num_ops);
437 return; 421 return;
438} 422}
439EXPORT_SYMBOL(ceph_osdc_build_request); 423EXPORT_SYMBOL(ceph_osdc_build_request);
@@ -459,34 +443,33 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
459 u32 truncate_seq, 443 u32 truncate_seq,
460 u64 truncate_size, 444 u64 truncate_size,
461 struct timespec *mtime, 445 struct timespec *mtime,
462 bool use_mempool, int num_reply, 446 bool use_mempool,
463 int page_align) 447 int page_align)
464{ 448{
465 struct ceph_osd_req_op ops[3]; 449 struct ceph_osd_req_op ops[2];
466 struct ceph_osd_request *req; 450 struct ceph_osd_request *req;
451 unsigned int num_op = 1;
467 int r; 452 int r;
468 453
454 memset(&ops, 0, sizeof ops);
455
469 ops[0].op = opcode; 456 ops[0].op = opcode;
470 ops[0].extent.truncate_seq = truncate_seq; 457 ops[0].extent.truncate_seq = truncate_seq;
471 ops[0].extent.truncate_size = truncate_size; 458 ops[0].extent.truncate_size = truncate_size;
472 ops[0].payload_len = 0;
473 459
474 if (do_sync) { 460 if (do_sync) {
475 ops[1].op = CEPH_OSD_OP_STARTSYNC; 461 ops[1].op = CEPH_OSD_OP_STARTSYNC;
476 ops[1].payload_len = 0; 462 num_op++;
477 ops[2].op = 0; 463 }
478 } else 464
479 ops[1].op = 0; 465 req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
480 466 GFP_NOFS);
481 req = ceph_osdc_alloc_request(osdc, flags,
482 snapc, ops,
483 use_mempool,
484 GFP_NOFS, NULL, NULL);
485 if (!req) 467 if (!req)
486 return ERR_PTR(-ENOMEM); 468 return ERR_PTR(-ENOMEM);
469 req->r_flags = flags;
487 470
488 /* calculate max write size */ 471 /* calculate max write size */
489 r = calc_layout(osdc, vino, layout, off, plen, req, ops); 472 r = calc_layout(vino, layout, off, plen, req, ops);
490 if (r < 0) 473 if (r < 0)
491 return ERR_PTR(r); 474 return ERR_PTR(r);
492 req->r_file_layout = *layout; /* keep a copy */ 475 req->r_file_layout = *layout; /* keep a copy */
@@ -496,10 +479,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
496 req->r_num_pages = calc_pages_for(page_align, *plen); 479 req->r_num_pages = calc_pages_for(page_align, *plen);
497 req->r_page_alignment = page_align; 480 req->r_page_alignment = page_align;
498 481
499 ceph_osdc_build_request(req, off, plen, ops, 482 ceph_osdc_build_request(req, off, *plen, num_op, ops,
500 snapc, 483 snapc, vino.snap, mtime);
501 mtime,
502 req->r_oid, req->r_oid_len);
503 484
504 return req; 485 return req;
505} 486}
@@ -623,8 +604,8 @@ static void osd_reset(struct ceph_connection *con)
623 down_read(&osdc->map_sem); 604 down_read(&osdc->map_sem);
624 mutex_lock(&osdc->request_mutex); 605 mutex_lock(&osdc->request_mutex);
625 __kick_osd_requests(osdc, osd); 606 __kick_osd_requests(osdc, osd);
607 __send_queued(osdc);
626 mutex_unlock(&osdc->request_mutex); 608 mutex_unlock(&osdc->request_mutex);
627 send_queued(osdc);
628 up_read(&osdc->map_sem); 609 up_read(&osdc->map_sem);
629} 610}
630 611
@@ -739,31 +720,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
739 */ 720 */
740static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 721static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
741{ 722{
742 struct ceph_osd_request *req; 723 struct ceph_entity_addr *peer_addr;
743 int ret = 0;
744 724
745 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 725 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
746 if (list_empty(&osd->o_requests) && 726 if (list_empty(&osd->o_requests) &&
747 list_empty(&osd->o_linger_requests)) { 727 list_empty(&osd->o_linger_requests)) {
748 __remove_osd(osdc, osd); 728 __remove_osd(osdc, osd);
749 ret = -ENODEV; 729
750 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 730 return -ENODEV;
751 &osd->o_con.peer_addr, 731 }
752 sizeof(osd->o_con.peer_addr)) == 0 && 732
753 !ceph_con_opened(&osd->o_con)) { 733 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
734 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
735 !ceph_con_opened(&osd->o_con)) {
736 struct ceph_osd_request *req;
737
754 dout(" osd addr hasn't changed and connection never opened," 738 dout(" osd addr hasn't changed and connection never opened,"
755 " letting msgr retry"); 739 " letting msgr retry");
756 /* touch each r_stamp for handle_timeout()'s benfit */ 740 /* touch each r_stamp for handle_timeout()'s benfit */
757 list_for_each_entry(req, &osd->o_requests, r_osd_item) 741 list_for_each_entry(req, &osd->o_requests, r_osd_item)
758 req->r_stamp = jiffies; 742 req->r_stamp = jiffies;
759 ret = -EAGAIN; 743
760 } else { 744 return -EAGAIN;
761 ceph_con_close(&osd->o_con);
762 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
763 &osdc->osdmap->osd_addr[osd->o_osd]);
764 osd->o_incarnation++;
765 } 745 }
766 return ret; 746
747 ceph_con_close(&osd->o_con);
748 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
749 osd->o_incarnation++;
750
751 return 0;
767} 752}
768 753
769static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 754static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -961,20 +946,18 @@ EXPORT_SYMBOL(ceph_osdc_set_request_linger);
961static int __map_request(struct ceph_osd_client *osdc, 946static int __map_request(struct ceph_osd_client *osdc,
962 struct ceph_osd_request *req, int force_resend) 947 struct ceph_osd_request *req, int force_resend)
963{ 948{
964 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
965 struct ceph_pg pgid; 949 struct ceph_pg pgid;
966 int acting[CEPH_PG_MAX_SIZE]; 950 int acting[CEPH_PG_MAX_SIZE];
967 int o = -1, num = 0; 951 int o = -1, num = 0;
968 int err; 952 int err;
969 953
970 dout("map_request %p tid %lld\n", req, req->r_tid); 954 dout("map_request %p tid %lld\n", req, req->r_tid);
971 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, 955 err = ceph_calc_object_layout(&pgid, req->r_oid,
972 &req->r_file_layout, osdc->osdmap); 956 &req->r_file_layout, osdc->osdmap);
973 if (err) { 957 if (err) {
974 list_move(&req->r_req_lru_item, &osdc->req_notarget); 958 list_move(&req->r_req_lru_item, &osdc->req_notarget);
975 return err; 959 return err;
976 } 960 }
977 pgid = reqhead->layout.ol_pgid;
978 req->r_pgid = pgid; 961 req->r_pgid = pgid;
979 962
980 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 963 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
@@ -991,8 +974,8 @@ static int __map_request(struct ceph_osd_client *osdc,
991 (req->r_osd == NULL && o == -1)) 974 (req->r_osd == NULL && o == -1))
992 return 0; /* no change */ 975 return 0; /* no change */
993 976
994 dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n", 977 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
995 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, 978 req->r_tid, pgid.pool, pgid.seed, o,
996 req->r_osd ? req->r_osd->o_osd : -1); 979 req->r_osd ? req->r_osd->o_osd : -1);
997 980
998 /* record full pg acting set */ 981 /* record full pg acting set */
@@ -1041,15 +1024,22 @@ out:
1041static void __send_request(struct ceph_osd_client *osdc, 1024static void __send_request(struct ceph_osd_client *osdc,
1042 struct ceph_osd_request *req) 1025 struct ceph_osd_request *req)
1043{ 1026{
1044 struct ceph_osd_request_head *reqhead; 1027 void *p;
1045
1046 dout("send_request %p tid %llu to osd%d flags %d\n",
1047 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
1048 1028
1049 reqhead = req->r_request->front.iov_base; 1029 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
1050 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch); 1030 req, req->r_tid, req->r_osd->o_osd, req->r_flags,
1051 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ 1031 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
1052 reqhead->reassert_version = req->r_reassert_version; 1032
1033 /* fill in message content that changes each time we send it */
1034 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1035 put_unaligned_le32(req->r_flags, req->r_request_flags);
1036 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
1037 p = req->r_request_pgid;
1038 ceph_encode_64(&p, req->r_pgid.pool);
1039 ceph_encode_32(&p, req->r_pgid.seed);
1040 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */
1041 memcpy(req->r_request_reassert_version, &req->r_reassert_version,
1042 sizeof(req->r_reassert_version));
1053 1043
1054 req->r_stamp = jiffies; 1044 req->r_stamp = jiffies;
1055 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1045 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
@@ -1062,16 +1052,13 @@ static void __send_request(struct ceph_osd_client *osdc,
1062/* 1052/*
1063 * Send any requests in the queue (req_unsent). 1053 * Send any requests in the queue (req_unsent).
1064 */ 1054 */
1065static void send_queued(struct ceph_osd_client *osdc) 1055static void __send_queued(struct ceph_osd_client *osdc)
1066{ 1056{
1067 struct ceph_osd_request *req, *tmp; 1057 struct ceph_osd_request *req, *tmp;
1068 1058
1069 dout("send_queued\n"); 1059 dout("__send_queued\n");
1070 mutex_lock(&osdc->request_mutex); 1060 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
1071 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
1072 __send_request(osdc, req); 1061 __send_request(osdc, req);
1073 }
1074 mutex_unlock(&osdc->request_mutex);
1075} 1062}
1076 1063
1077/* 1064/*
@@ -1123,8 +1110,8 @@ static void handle_timeout(struct work_struct *work)
1123 } 1110 }
1124 1111
1125 __schedule_osd_timeout(osdc); 1112 __schedule_osd_timeout(osdc);
1113 __send_queued(osdc);
1126 mutex_unlock(&osdc->request_mutex); 1114 mutex_unlock(&osdc->request_mutex);
1127 send_queued(osdc);
1128 up_read(&osdc->map_sem); 1115 up_read(&osdc->map_sem);
1129} 1116}
1130 1117
@@ -1152,6 +1139,26 @@ static void complete_request(struct ceph_osd_request *req)
1152 complete_all(&req->r_safe_completion); /* fsync waiter */ 1139 complete_all(&req->r_safe_completion); /* fsync waiter */
1153} 1140}
1154 1141
1142static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
1143{
1144 __u8 v;
1145
1146 ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
1147 v = ceph_decode_8(p);
1148 if (v > 1) {
1149 pr_warning("do not understand pg encoding %d > 1", v);
1150 return -EINVAL;
1151 }
1152 pgid->pool = ceph_decode_64(p);
1153 pgid->seed = ceph_decode_32(p);
1154 *p += 4;
1155 return 0;
1156
1157bad:
1158 pr_warning("incomplete pg encoding");
1159 return -EINVAL;
1160}
1161
1155/* 1162/*
1156 * handle osd op reply. either call the callback if it is specified, 1163 * handle osd op reply. either call the callback if it is specified,
1157 * or do the completion to wake up the waiting thread. 1164 * or do the completion to wake up the waiting thread.
@@ -1159,22 +1166,42 @@ static void complete_request(struct ceph_osd_request *req)
1159static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1166static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1160 struct ceph_connection *con) 1167 struct ceph_connection *con)
1161{ 1168{
1162 struct ceph_osd_reply_head *rhead = msg->front.iov_base; 1169 void *p, *end;
1163 struct ceph_osd_request *req; 1170 struct ceph_osd_request *req;
1164 u64 tid; 1171 u64 tid;
1165 int numops, object_len, flags; 1172 int object_len;
1173 int numops, payload_len, flags;
1166 s32 result; 1174 s32 result;
1175 s32 retry_attempt;
1176 struct ceph_pg pg;
1177 int err;
1178 u32 reassert_epoch;
1179 u64 reassert_version;
1180 u32 osdmap_epoch;
1181 int i;
1167 1182
1168 tid = le64_to_cpu(msg->hdr.tid); 1183 tid = le64_to_cpu(msg->hdr.tid);
1169 if (msg->front.iov_len < sizeof(*rhead)) 1184 dout("handle_reply %p tid %llu\n", msg, tid);
1170 goto bad; 1185
1171 numops = le32_to_cpu(rhead->num_ops); 1186 p = msg->front.iov_base;
1172 object_len = le32_to_cpu(rhead->object_len); 1187 end = p + msg->front.iov_len;
1173 result = le32_to_cpu(rhead->result); 1188
1174 if (msg->front.iov_len != sizeof(*rhead) + object_len + 1189 ceph_decode_need(&p, end, 4, bad);
1175 numops * sizeof(struct ceph_osd_op)) 1190 object_len = ceph_decode_32(&p);
1191 ceph_decode_need(&p, end, object_len, bad);
1192 p += object_len;
1193
1194 err = __decode_pgid(&p, end, &pg);
1195 if (err)
1176 goto bad; 1196 goto bad;
1177 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); 1197
1198 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
1199 flags = ceph_decode_64(&p);
1200 result = ceph_decode_32(&p);
1201 reassert_epoch = ceph_decode_32(&p);
1202 reassert_version = ceph_decode_64(&p);
1203 osdmap_epoch = ceph_decode_32(&p);
1204
1178 /* lookup */ 1205 /* lookup */
1179 mutex_lock(&osdc->request_mutex); 1206 mutex_lock(&osdc->request_mutex);
1180 req = __lookup_request(osdc, tid); 1207 req = __lookup_request(osdc, tid);
@@ -1184,7 +1211,38 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1184 return; 1211 return;
1185 } 1212 }
1186 ceph_osdc_get_request(req); 1213 ceph_osdc_get_request(req);
1187 flags = le32_to_cpu(rhead->flags); 1214
1215 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
1216 req, result);
1217
1218 ceph_decode_need(&p, end, 4, bad);
1219 numops = ceph_decode_32(&p);
1220 if (numops > CEPH_OSD_MAX_OP)
1221 goto bad_put;
1222 if (numops != req->r_num_ops)
1223 goto bad_put;
1224 payload_len = 0;
1225 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
1226 for (i = 0; i < numops; i++) {
1227 struct ceph_osd_op *op = p;
1228 int len;
1229
1230 len = le32_to_cpu(op->payload_len);
1231 req->r_reply_op_len[i] = len;
1232 dout(" op %d has %d bytes\n", i, len);
1233 payload_len += len;
1234 p += sizeof(*op);
1235 }
1236 if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
1237 pr_warning("sum of op payload lens %d != data_len %d",
1238 payload_len, le32_to_cpu(msg->hdr.data_len));
1239 goto bad_put;
1240 }
1241
1242 ceph_decode_need(&p, end, 4 + numops * 4, bad);
1243 retry_attempt = ceph_decode_32(&p);
1244 for (i = 0; i < numops; i++)
1245 req->r_reply_op_result[i] = ceph_decode_32(&p);
1188 1246
1189 /* 1247 /*
1190 * if this connection filled our message, drop our reference now, to 1248 * if this connection filled our message, drop our reference now, to
@@ -1199,7 +1257,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1199 if (!req->r_got_reply) { 1257 if (!req->r_got_reply) {
1200 unsigned int bytes; 1258 unsigned int bytes;
1201 1259
1202 req->r_result = le32_to_cpu(rhead->result); 1260 req->r_result = result;
1203 bytes = le32_to_cpu(msg->hdr.data_len); 1261 bytes = le32_to_cpu(msg->hdr.data_len);
1204 dout("handle_reply result %d bytes %d\n", req->r_result, 1262 dout("handle_reply result %d bytes %d\n", req->r_result,
1205 bytes); 1263 bytes);
@@ -1207,7 +1265,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1207 req->r_result = bytes; 1265 req->r_result = bytes;
1208 1266
1209 /* in case this is a write and we need to replay, */ 1267 /* in case this is a write and we need to replay, */
1210 req->r_reassert_version = rhead->reassert_version; 1268 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
1269 req->r_reassert_version.version = cpu_to_le64(reassert_version);
1211 1270
1212 req->r_got_reply = 1; 1271 req->r_got_reply = 1;
1213 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1272 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
@@ -1242,10 +1301,11 @@ done:
1242 ceph_osdc_put_request(req); 1301 ceph_osdc_put_request(req);
1243 return; 1302 return;
1244 1303
1304bad_put:
1305 ceph_osdc_put_request(req);
1245bad: 1306bad:
1246 pr_err("corrupt osd_op_reply got %d %d expected %d\n", 1307 pr_err("corrupt osd_op_reply got %d %d\n",
1247 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len), 1308 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
1248 (int)sizeof(*rhead));
1249 ceph_msg_dump(msg); 1309 ceph_msg_dump(msg);
1250} 1310}
1251 1311
@@ -1462,7 +1522,9 @@ done:
1462 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 1522 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1463 ceph_monc_request_next_osdmap(&osdc->client->monc); 1523 ceph_monc_request_next_osdmap(&osdc->client->monc);
1464 1524
1465 send_queued(osdc); 1525 mutex_lock(&osdc->request_mutex);
1526 __send_queued(osdc);
1527 mutex_unlock(&osdc->request_mutex);
1466 up_read(&osdc->map_sem); 1528 up_read(&osdc->map_sem);
1467 wake_up_all(&osdc->client->auth_wq); 1529 wake_up_all(&osdc->client->auth_wq);
1468 return; 1530 return;
@@ -1556,8 +1618,7 @@ static void __remove_event(struct ceph_osd_event *event)
1556 1618
1557int ceph_osdc_create_event(struct ceph_osd_client *osdc, 1619int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1558 void (*event_cb)(u64, u64, u8, void *), 1620 void (*event_cb)(u64, u64, u8, void *),
1559 int one_shot, void *data, 1621 void *data, struct ceph_osd_event **pevent)
1560 struct ceph_osd_event **pevent)
1561{ 1622{
1562 struct ceph_osd_event *event; 1623 struct ceph_osd_event *event;
1563 1624
@@ -1567,14 +1628,13 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1567 1628
1568 dout("create_event %p\n", event); 1629 dout("create_event %p\n", event);
1569 event->cb = event_cb; 1630 event->cb = event_cb;
1570 event->one_shot = one_shot; 1631 event->one_shot = 0;
1571 event->data = data; 1632 event->data = data;
1572 event->osdc = osdc; 1633 event->osdc = osdc;
1573 INIT_LIST_HEAD(&event->osd_node); 1634 INIT_LIST_HEAD(&event->osd_node);
1574 RB_CLEAR_NODE(&event->node); 1635 RB_CLEAR_NODE(&event->node);
1575 kref_init(&event->kref); /* one ref for us */ 1636 kref_init(&event->kref); /* one ref for us */
1576 kref_get(&event->kref); /* one ref for the caller */ 1637 kref_get(&event->kref); /* one ref for the caller */
1577 init_completion(&event->completion);
1578 1638
1579 spin_lock(&osdc->event_lock); 1639 spin_lock(&osdc->event_lock);
1580 event->cookie = ++osdc->event_count; 1640 event->cookie = ++osdc->event_count;
@@ -1610,7 +1670,6 @@ static void do_event_work(struct work_struct *work)
1610 1670
1611 dout("do_event_work completing %p\n", event); 1671 dout("do_event_work completing %p\n", event);
1612 event->cb(ver, notify_id, opcode, event->data); 1672 event->cb(ver, notify_id, opcode, event->data);
1613 complete(&event->completion);
1614 dout("do_event_work completed %p\n", event); 1673 dout("do_event_work completed %p\n", event);
1615 ceph_osdc_put_event(event); 1674 ceph_osdc_put_event(event);
1616 kfree(event_work); 1675 kfree(event_work);
@@ -1620,7 +1679,8 @@ static void do_event_work(struct work_struct *work)
1620/* 1679/*
1621 * Process osd watch notifications 1680 * Process osd watch notifications
1622 */ 1681 */
1623void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1682static void handle_watch_notify(struct ceph_osd_client *osdc,
1683 struct ceph_msg *msg)
1624{ 1684{
1625 void *p, *end; 1685 void *p, *end;
1626 u8 proto_ver; 1686 u8 proto_ver;
@@ -1641,9 +1701,8 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1641 spin_lock(&osdc->event_lock); 1701 spin_lock(&osdc->event_lock);
1642 event = __find_event(osdc, cookie); 1702 event = __find_event(osdc, cookie);
1643 if (event) { 1703 if (event) {
1704 BUG_ON(event->one_shot);
1644 get_event(event); 1705 get_event(event);
1645 if (event->one_shot)
1646 __remove_event(event);
1647 } 1706 }
1648 spin_unlock(&osdc->event_lock); 1707 spin_unlock(&osdc->event_lock);
1649 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 1708 dout("handle_watch_notify cookie %lld ver %lld event %p\n",
@@ -1668,7 +1727,6 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1668 return; 1727 return;
1669 1728
1670done_err: 1729done_err:
1671 complete(&event->completion);
1672 ceph_osdc_put_event(event); 1730 ceph_osdc_put_event(event);
1673 return; 1731 return;
1674 1732
@@ -1677,21 +1735,6 @@ bad:
1677 return; 1735 return;
1678} 1736}
1679 1737
1680int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
1681{
1682 int err;
1683
1684 dout("wait_event %p\n", event);
1685 err = wait_for_completion_interruptible_timeout(&event->completion,
1686 timeout * HZ);
1687 ceph_osdc_put_event(event);
1688 if (err > 0)
1689 err = 0;
1690 dout("wait_event %p returns %d\n", event, err);
1691 return err;
1692}
1693EXPORT_SYMBOL(ceph_osdc_wait_event);
1694
1695/* 1738/*
1696 * Register request, send initial attempt. 1739 * Register request, send initial attempt.
1697 */ 1740 */
@@ -1706,7 +1749,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1706#ifdef CONFIG_BLOCK 1749#ifdef CONFIG_BLOCK
1707 req->r_request->bio = req->r_bio; 1750 req->r_request->bio = req->r_bio;
1708#endif 1751#endif
1709 req->r_request->trail = req->r_trail; 1752 req->r_request->trail = &req->r_trail;
1710 1753
1711 register_request(osdc, req); 1754 register_request(osdc, req);
1712 1755
@@ -1865,7 +1908,6 @@ out_mempool:
1865out: 1908out:
1866 return err; 1909 return err;
1867} 1910}
1868EXPORT_SYMBOL(ceph_osdc_init);
1869 1911
1870void ceph_osdc_stop(struct ceph_osd_client *osdc) 1912void ceph_osdc_stop(struct ceph_osd_client *osdc)
1871{ 1913{
@@ -1882,7 +1924,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
1882 ceph_msgpool_destroy(&osdc->msgpool_op); 1924 ceph_msgpool_destroy(&osdc->msgpool_op);
1883 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 1925 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1884} 1926}
1885EXPORT_SYMBOL(ceph_osdc_stop);
1886 1927
1887/* 1928/*
1888 * Read some contiguous pages. If we cross a stripe boundary, shorten 1929 * Read some contiguous pages. If we cross a stripe boundary, shorten
@@ -1902,7 +1943,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1902 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1943 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1903 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 1944 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1904 NULL, 0, truncate_seq, truncate_size, NULL, 1945 NULL, 0, truncate_seq, truncate_size, NULL,
1905 false, 1, page_align); 1946 false, page_align);
1906 if (IS_ERR(req)) 1947 if (IS_ERR(req))
1907 return PTR_ERR(req); 1948 return PTR_ERR(req);
1908 1949
@@ -1931,8 +1972,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1931 u64 off, u64 len, 1972 u64 off, u64 len,
1932 u32 truncate_seq, u64 truncate_size, 1973 u32 truncate_seq, u64 truncate_size,
1933 struct timespec *mtime, 1974 struct timespec *mtime,
1934 struct page **pages, int num_pages, 1975 struct page **pages, int num_pages)
1935 int flags, int do_sync, bool nofail)
1936{ 1976{
1937 struct ceph_osd_request *req; 1977 struct ceph_osd_request *req;
1938 int rc = 0; 1978 int rc = 0;
@@ -1941,11 +1981,10 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1941 BUG_ON(vino.snap != CEPH_NOSNAP); 1981 BUG_ON(vino.snap != CEPH_NOSNAP);
1942 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1982 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1943 CEPH_OSD_OP_WRITE, 1983 CEPH_OSD_OP_WRITE,
1944 flags | CEPH_OSD_FLAG_ONDISK | 1984 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1945 CEPH_OSD_FLAG_WRITE, 1985 snapc, 0,
1946 snapc, do_sync,
1947 truncate_seq, truncate_size, mtime, 1986 truncate_seq, truncate_size, mtime,
1948 nofail, 1, page_align); 1987 true, page_align);
1949 if (IS_ERR(req)) 1988 if (IS_ERR(req))
1950 return PTR_ERR(req); 1989 return PTR_ERR(req);
1951 1990
@@ -1954,7 +1993,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1954 dout("writepages %llu~%llu (%d pages)\n", off, len, 1993 dout("writepages %llu~%llu (%d pages)\n", off, len,
1955 req->r_num_pages); 1994 req->r_num_pages);
1956 1995
1957 rc = ceph_osdc_start_request(osdc, req, nofail); 1996 rc = ceph_osdc_start_request(osdc, req, true);
1958 if (!rc) 1997 if (!rc)
1959 rc = ceph_osdc_wait_request(osdc, req); 1998 rc = ceph_osdc_wait_request(osdc, req);
1960 1999
@@ -2047,7 +2086,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2047 if (data_len > 0) { 2086 if (data_len > 0) {
2048 int want = calc_pages_for(req->r_page_alignment, data_len); 2087 int want = calc_pages_for(req->r_page_alignment, data_len);
2049 2088
2050 if (unlikely(req->r_num_pages < want)) { 2089 if (req->r_pages && unlikely(req->r_num_pages < want)) {
2051 pr_warning("tid %lld reply has %d bytes %d pages, we" 2090 pr_warning("tid %lld reply has %d bytes %d pages, we"
2052 " had only %d pages ready\n", tid, data_len, 2091 " had only %d pages ready\n", tid, data_len,
2053 want, req->r_num_pages); 2092 want, req->r_num_pages);