aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/osd_client.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2013-02-28 20:43:09 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2013-02-28 20:43:09 -0500
commit1cf0209c431fa7790253c532039d53b0773193aa (patch)
tree24310eaaf4c9583988d9098f6c85a4a34970b5b9 /net/ceph/osd_client.c
parentde1a2262b006220dae2561a299a6ea128c46f4fe (diff)
parent83ca14fdd35821554058e5fd4fa7b118ee504a33 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil: "A few groups of patches here. Alex has been hard at work improving the RBD code, layout groundwork for understanding the new formats and doing layering. Most of the infrastructure is now in place for the final bits that will come with the next window. There are a few changes to the data layout. Jim Schutt's patch fixes some non-ideal CRUSH behavior, and a set of patches from me updates the client to speak a newer version of the protocol and implement an improved hashing strategy across storage nodes (when the server side supports it too). A pair of patches from Sam Lang fix the atomicity of open+create operations. Several patches from Yan, Zheng fix various mds/client issues that turned up during multi-mds torture tests. A final set of patches expose file layouts via virtual xattrs, and allow the policies to be set on directories via xattrs as well (avoiding the awkward ioctl interface and providing a consistent interface for both kernel mount and ceph-fuse users)." * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (143 commits) libceph: add support for HASHPSPOOL pool flag libceph: update osd request/reply encoding libceph: calculate placement based on the internal data types ceph: update support for PGID64, PGPOOL3, OSDENC protocol features ceph: update "ceph_features.h" libceph: decode into cpu-native ceph_pg type libceph: rename ceph_pg -> ceph_pg_v1 rbd: pass length, not op for osd completions rbd: move rbd_osd_trivial_callback() libceph: use a do..while loop in con_work() libceph: use a flag to indicate a fault has occurred libceph: separate non-locked fault handling libceph: encapsulate connection backoff libceph: eliminate sparse warnings ceph: eliminate sparse warnings in fs code rbd: eliminate sparse warnings libceph: define connection flag helpers rbd: normalize dout() calls rbd: barriers are hard rbd: ignore zero-length requests ...
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r--net/ceph/osd_client.c635
1 files changed, 337 insertions, 298 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index eb9a44478764..d730dd4d8eb2 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -23,7 +23,7 @@
23 23
24static const struct ceph_connection_operations osd_con_ops; 24static const struct ceph_connection_operations osd_con_ops;
25 25
26static void send_queued(struct ceph_osd_client *osdc); 26static void __send_queued(struct ceph_osd_client *osdc);
27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28static void __register_request(struct ceph_osd_client *osdc, 28static void __register_request(struct ceph_osd_client *osdc,
29 struct ceph_osd_request *req); 29 struct ceph_osd_request *req);
@@ -32,64 +32,12 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
32static void __send_request(struct ceph_osd_client *osdc, 32static void __send_request(struct ceph_osd_client *osdc,
33 struct ceph_osd_request *req); 33 struct ceph_osd_request *req);
34 34
35static int op_needs_trail(int op)
36{
37 switch (op) {
38 case CEPH_OSD_OP_GETXATTR:
39 case CEPH_OSD_OP_SETXATTR:
40 case CEPH_OSD_OP_CMPXATTR:
41 case CEPH_OSD_OP_CALL:
42 case CEPH_OSD_OP_NOTIFY:
43 return 1;
44 default:
45 return 0;
46 }
47}
48
49static int op_has_extent(int op) 35static int op_has_extent(int op)
50{ 36{
51 return (op == CEPH_OSD_OP_READ || 37 return (op == CEPH_OSD_OP_READ ||
52 op == CEPH_OSD_OP_WRITE); 38 op == CEPH_OSD_OP_WRITE);
53} 39}
54 40
55int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
56 struct ceph_file_layout *layout,
57 u64 snapid,
58 u64 off, u64 *plen, u64 *bno,
59 struct ceph_osd_request *req,
60 struct ceph_osd_req_op *op)
61{
62 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
63 u64 orig_len = *plen;
64 u64 objoff, objlen; /* extent in object */
65 int r;
66
67 reqhead->snapid = cpu_to_le64(snapid);
68
69 /* object extent? */
70 r = ceph_calc_file_object_mapping(layout, off, plen, bno,
71 &objoff, &objlen);
72 if (r < 0)
73 return r;
74 if (*plen < orig_len)
75 dout(" skipping last %llu, final file extent %llu~%llu\n",
76 orig_len - *plen, off, *plen);
77
78 if (op_has_extent(op->op)) {
79 op->extent.offset = objoff;
80 op->extent.length = objlen;
81 }
82 req->r_num_pages = calc_pages_for(off, *plen);
83 req->r_page_alignment = off & ~PAGE_MASK;
84 if (op->op == CEPH_OSD_OP_WRITE)
85 op->payload_len = *plen;
86
87 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
88 *bno, objoff, objlen, req->r_num_pages);
89 return 0;
90}
91EXPORT_SYMBOL(ceph_calc_raw_layout);
92
93/* 41/*
94 * Implement client access to distributed object storage cluster. 42 * Implement client access to distributed object storage cluster.
95 * 43 *
@@ -115,20 +63,48 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
115 * 63 *
116 * fill osd op in request message. 64 * fill osd op in request message.
117 */ 65 */
118static int calc_layout(struct ceph_osd_client *osdc, 66static int calc_layout(struct ceph_vino vino,
119 struct ceph_vino vino,
120 struct ceph_file_layout *layout, 67 struct ceph_file_layout *layout,
121 u64 off, u64 *plen, 68 u64 off, u64 *plen,
122 struct ceph_osd_request *req, 69 struct ceph_osd_request *req,
123 struct ceph_osd_req_op *op) 70 struct ceph_osd_req_op *op)
124{ 71{
125 u64 bno; 72 u64 orig_len = *plen;
73 u64 bno = 0;
74 u64 objoff = 0;
75 u64 objlen = 0;
126 int r; 76 int r;
127 77
128 r = ceph_calc_raw_layout(osdc, layout, vino.snap, off, 78 /* object extent? */
129 plen, &bno, req, op); 79 r = ceph_calc_file_object_mapping(layout, off, orig_len, &bno,
80 &objoff, &objlen);
130 if (r < 0) 81 if (r < 0)
131 return r; 82 return r;
83 if (objlen < orig_len) {
84 *plen = objlen;
85 dout(" skipping last %llu, final file extent %llu~%llu\n",
86 orig_len - *plen, off, *plen);
87 }
88
89 if (op_has_extent(op->op)) {
90 u32 osize = le32_to_cpu(layout->fl_object_size);
91 op->extent.offset = objoff;
92 op->extent.length = objlen;
93 if (op->extent.truncate_size <= off - objoff) {
94 op->extent.truncate_size = 0;
95 } else {
96 op->extent.truncate_size -= off - objoff;
97 if (op->extent.truncate_size > osize)
98 op->extent.truncate_size = osize;
99 }
100 }
101 req->r_num_pages = calc_pages_for(off, *plen);
102 req->r_page_alignment = off & ~PAGE_MASK;
103 if (op->op == CEPH_OSD_OP_WRITE)
104 op->payload_len = *plen;
105
106 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
107 bno, objoff, objlen, req->r_num_pages);
132 108
133 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno); 109 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
134 req->r_oid_len = strlen(req->r_oid); 110 req->r_oid_len = strlen(req->r_oid);
@@ -148,25 +124,19 @@ void ceph_osdc_release_request(struct kref *kref)
148 if (req->r_request) 124 if (req->r_request)
149 ceph_msg_put(req->r_request); 125 ceph_msg_put(req->r_request);
150 if (req->r_con_filling_msg) { 126 if (req->r_con_filling_msg) {
151 dout("%s revoking pages %p from con %p\n", __func__, 127 dout("%s revoking msg %p from con %p\n", __func__,
152 req->r_pages, req->r_con_filling_msg); 128 req->r_reply, req->r_con_filling_msg);
153 ceph_msg_revoke_incoming(req->r_reply); 129 ceph_msg_revoke_incoming(req->r_reply);
154 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 130 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
131 req->r_con_filling_msg = NULL;
155 } 132 }
156 if (req->r_reply) 133 if (req->r_reply)
157 ceph_msg_put(req->r_reply); 134 ceph_msg_put(req->r_reply);
158 if (req->r_own_pages) 135 if (req->r_own_pages)
159 ceph_release_page_vector(req->r_pages, 136 ceph_release_page_vector(req->r_pages,
160 req->r_num_pages); 137 req->r_num_pages);
161#ifdef CONFIG_BLOCK
162 if (req->r_bio)
163 bio_put(req->r_bio);
164#endif
165 ceph_put_snap_context(req->r_snapc); 138 ceph_put_snap_context(req->r_snapc);
166 if (req->r_trail) { 139 ceph_pagelist_release(&req->r_trail);
167 ceph_pagelist_release(req->r_trail);
168 kfree(req->r_trail);
169 }
170 if (req->r_mempool) 140 if (req->r_mempool)
171 mempool_free(req, req->r_osdc->req_mempool); 141 mempool_free(req, req->r_osdc->req_mempool);
172 else 142 else
@@ -174,37 +144,25 @@ void ceph_osdc_release_request(struct kref *kref)
174} 144}
175EXPORT_SYMBOL(ceph_osdc_release_request); 145EXPORT_SYMBOL(ceph_osdc_release_request);
176 146
177static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
178{
179 int i = 0;
180
181 if (needs_trail)
182 *needs_trail = 0;
183 while (ops[i].op) {
184 if (needs_trail && op_needs_trail(ops[i].op))
185 *needs_trail = 1;
186 i++;
187 }
188
189 return i;
190}
191
192struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 147struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
193 int flags,
194 struct ceph_snap_context *snapc, 148 struct ceph_snap_context *snapc,
195 struct ceph_osd_req_op *ops, 149 unsigned int num_ops,
196 bool use_mempool, 150 bool use_mempool,
197 gfp_t gfp_flags, 151 gfp_t gfp_flags)
198 struct page **pages,
199 struct bio *bio)
200{ 152{
201 struct ceph_osd_request *req; 153 struct ceph_osd_request *req;
202 struct ceph_msg *msg; 154 struct ceph_msg *msg;
203 int needs_trail; 155 size_t msg_size;
204 int num_op = get_num_ops(ops, &needs_trail); 156
205 size_t msg_size = sizeof(struct ceph_osd_request_head); 157 msg_size = 4 + 4 + 8 + 8 + 4+8;
206 158 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
207 msg_size += num_op*sizeof(struct ceph_osd_op); 159 msg_size += 1 + 8 + 4 + 4; /* pg_t */
160 msg_size += 4 + MAX_OBJ_NAME_SIZE;
161 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
162 msg_size += 8; /* snapid */
163 msg_size += 8; /* snap_seq */
164 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
165 msg_size += 4;
208 166
209 if (use_mempool) { 167 if (use_mempool) {
210 req = mempool_alloc(osdc->req_mempool, gfp_flags); 168 req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -228,10 +186,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
228 INIT_LIST_HEAD(&req->r_req_lru_item); 186 INIT_LIST_HEAD(&req->r_req_lru_item);
229 INIT_LIST_HEAD(&req->r_osd_item); 187 INIT_LIST_HEAD(&req->r_osd_item);
230 188
231 req->r_flags = flags;
232
233 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
234
235 /* create reply message */ 189 /* create reply message */
236 if (use_mempool) 190 if (use_mempool)
237 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 191 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -244,20 +198,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
244 } 198 }
245 req->r_reply = msg; 199 req->r_reply = msg;
246 200
247 /* allocate space for the trailing data */ 201 ceph_pagelist_init(&req->r_trail);
248 if (needs_trail) {
249 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
250 if (!req->r_trail) {
251 ceph_osdc_put_request(req);
252 return NULL;
253 }
254 ceph_pagelist_init(req->r_trail);
255 }
256 202
257 /* create request message; allow space for oid */ 203 /* create request message; allow space for oid */
258 msg_size += MAX_OBJ_NAME_SIZE;
259 if (snapc)
260 msg_size += sizeof(u64) * snapc->num_snaps;
261 if (use_mempool) 204 if (use_mempool)
262 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 205 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
263 else 206 else
@@ -270,13 +213,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
270 memset(msg->front.iov_base, 0, msg->front.iov_len); 213 memset(msg->front.iov_base, 0, msg->front.iov_len);
271 214
272 req->r_request = msg; 215 req->r_request = msg;
273 req->r_pages = pages;
274#ifdef CONFIG_BLOCK
275 if (bio) {
276 req->r_bio = bio;
277 bio_get(req->r_bio);
278 }
279#endif
280 216
281 return req; 217 return req;
282} 218}
@@ -289,6 +225,8 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
289 dst->op = cpu_to_le16(src->op); 225 dst->op = cpu_to_le16(src->op);
290 226
291 switch (src->op) { 227 switch (src->op) {
228 case CEPH_OSD_OP_STAT:
229 break;
292 case CEPH_OSD_OP_READ: 230 case CEPH_OSD_OP_READ:
293 case CEPH_OSD_OP_WRITE: 231 case CEPH_OSD_OP_WRITE:
294 dst->extent.offset = 232 dst->extent.offset =
@@ -300,52 +238,20 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
300 dst->extent.truncate_seq = 238 dst->extent.truncate_seq =
301 cpu_to_le32(src->extent.truncate_seq); 239 cpu_to_le32(src->extent.truncate_seq);
302 break; 240 break;
303
304 case CEPH_OSD_OP_GETXATTR:
305 case CEPH_OSD_OP_SETXATTR:
306 case CEPH_OSD_OP_CMPXATTR:
307 BUG_ON(!req->r_trail);
308
309 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
310 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
311 dst->xattr.cmp_op = src->xattr.cmp_op;
312 dst->xattr.cmp_mode = src->xattr.cmp_mode;
313 ceph_pagelist_append(req->r_trail, src->xattr.name,
314 src->xattr.name_len);
315 ceph_pagelist_append(req->r_trail, src->xattr.val,
316 src->xattr.value_len);
317 break;
318 case CEPH_OSD_OP_CALL: 241 case CEPH_OSD_OP_CALL:
319 BUG_ON(!req->r_trail);
320
321 dst->cls.class_len = src->cls.class_len; 242 dst->cls.class_len = src->cls.class_len;
322 dst->cls.method_len = src->cls.method_len; 243 dst->cls.method_len = src->cls.method_len;
323 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 244 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
324 245
325 ceph_pagelist_append(req->r_trail, src->cls.class_name, 246 ceph_pagelist_append(&req->r_trail, src->cls.class_name,
326 src->cls.class_len); 247 src->cls.class_len);
327 ceph_pagelist_append(req->r_trail, src->cls.method_name, 248 ceph_pagelist_append(&req->r_trail, src->cls.method_name,
328 src->cls.method_len); 249 src->cls.method_len);
329 ceph_pagelist_append(req->r_trail, src->cls.indata, 250 ceph_pagelist_append(&req->r_trail, src->cls.indata,
330 src->cls.indata_len); 251 src->cls.indata_len);
331 break; 252 break;
332 case CEPH_OSD_OP_ROLLBACK:
333 dst->snap.snapid = cpu_to_le64(src->snap.snapid);
334 break;
335 case CEPH_OSD_OP_STARTSYNC: 253 case CEPH_OSD_OP_STARTSYNC:
336 break; 254 break;
337 case CEPH_OSD_OP_NOTIFY:
338 {
339 __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
340 __le32 timeout = cpu_to_le32(src->watch.timeout);
341
342 BUG_ON(!req->r_trail);
343
344 ceph_pagelist_append(req->r_trail,
345 &prot_ver, sizeof(prot_ver));
346 ceph_pagelist_append(req->r_trail,
347 &timeout, sizeof(timeout));
348 }
349 case CEPH_OSD_OP_NOTIFY_ACK: 255 case CEPH_OSD_OP_NOTIFY_ACK:
350 case CEPH_OSD_OP_WATCH: 256 case CEPH_OSD_OP_WATCH:
351 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 257 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
@@ -356,6 +262,64 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
356 pr_err("unrecognized osd opcode %d\n", dst->op); 262 pr_err("unrecognized osd opcode %d\n", dst->op);
357 WARN_ON(1); 263 WARN_ON(1);
358 break; 264 break;
265 case CEPH_OSD_OP_MAPEXT:
266 case CEPH_OSD_OP_MASKTRUNC:
267 case CEPH_OSD_OP_SPARSE_READ:
268 case CEPH_OSD_OP_NOTIFY:
269 case CEPH_OSD_OP_ASSERT_VER:
270 case CEPH_OSD_OP_WRITEFULL:
271 case CEPH_OSD_OP_TRUNCATE:
272 case CEPH_OSD_OP_ZERO:
273 case CEPH_OSD_OP_DELETE:
274 case CEPH_OSD_OP_APPEND:
275 case CEPH_OSD_OP_SETTRUNC:
276 case CEPH_OSD_OP_TRIMTRUNC:
277 case CEPH_OSD_OP_TMAPUP:
278 case CEPH_OSD_OP_TMAPPUT:
279 case CEPH_OSD_OP_TMAPGET:
280 case CEPH_OSD_OP_CREATE:
281 case CEPH_OSD_OP_ROLLBACK:
282 case CEPH_OSD_OP_OMAPGETKEYS:
283 case CEPH_OSD_OP_OMAPGETVALS:
284 case CEPH_OSD_OP_OMAPGETHEADER:
285 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
286 case CEPH_OSD_OP_MODE_RD:
287 case CEPH_OSD_OP_OMAPSETVALS:
288 case CEPH_OSD_OP_OMAPSETHEADER:
289 case CEPH_OSD_OP_OMAPCLEAR:
290 case CEPH_OSD_OP_OMAPRMKEYS:
291 case CEPH_OSD_OP_OMAP_CMP:
292 case CEPH_OSD_OP_CLONERANGE:
293 case CEPH_OSD_OP_ASSERT_SRC_VERSION:
294 case CEPH_OSD_OP_SRC_CMPXATTR:
295 case CEPH_OSD_OP_GETXATTR:
296 case CEPH_OSD_OP_GETXATTRS:
297 case CEPH_OSD_OP_CMPXATTR:
298 case CEPH_OSD_OP_SETXATTR:
299 case CEPH_OSD_OP_SETXATTRS:
300 case CEPH_OSD_OP_RESETXATTRS:
301 case CEPH_OSD_OP_RMXATTR:
302 case CEPH_OSD_OP_PULL:
303 case CEPH_OSD_OP_PUSH:
304 case CEPH_OSD_OP_BALANCEREADS:
305 case CEPH_OSD_OP_UNBALANCEREADS:
306 case CEPH_OSD_OP_SCRUB:
307 case CEPH_OSD_OP_SCRUB_RESERVE:
308 case CEPH_OSD_OP_SCRUB_UNRESERVE:
309 case CEPH_OSD_OP_SCRUB_STOP:
310 case CEPH_OSD_OP_SCRUB_MAP:
311 case CEPH_OSD_OP_WRLOCK:
312 case CEPH_OSD_OP_WRUNLOCK:
313 case CEPH_OSD_OP_RDLOCK:
314 case CEPH_OSD_OP_RDUNLOCK:
315 case CEPH_OSD_OP_UPLOCK:
316 case CEPH_OSD_OP_DNLOCK:
317 case CEPH_OSD_OP_PGLS:
318 case CEPH_OSD_OP_PGLS_FILTER:
319 pr_err("unsupported osd opcode %s\n",
320 ceph_osd_op_name(dst->op));
321 WARN_ON(1);
322 break;
359 } 323 }
360 dst->payload_len = cpu_to_le32(src->payload_len); 324 dst->payload_len = cpu_to_le32(src->payload_len);
361} 325}
@@ -365,75 +329,95 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
365 * 329 *
366 */ 330 */
367void ceph_osdc_build_request(struct ceph_osd_request *req, 331void ceph_osdc_build_request(struct ceph_osd_request *req,
368 u64 off, u64 *plen, 332 u64 off, u64 len, unsigned int num_ops,
369 struct ceph_osd_req_op *src_ops, 333 struct ceph_osd_req_op *src_ops,
370 struct ceph_snap_context *snapc, 334 struct ceph_snap_context *snapc, u64 snap_id,
371 struct timespec *mtime, 335 struct timespec *mtime)
372 const char *oid,
373 int oid_len)
374{ 336{
375 struct ceph_msg *msg = req->r_request; 337 struct ceph_msg *msg = req->r_request;
376 struct ceph_osd_request_head *head;
377 struct ceph_osd_req_op *src_op; 338 struct ceph_osd_req_op *src_op;
378 struct ceph_osd_op *op;
379 void *p; 339 void *p;
380 int num_op = get_num_ops(src_ops, NULL); 340 size_t msg_size;
381 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
382 int flags = req->r_flags; 341 int flags = req->r_flags;
383 u64 data_len = 0; 342 u64 data_len;
384 int i; 343 int i;
385 344
386 head = msg->front.iov_base; 345 req->r_num_ops = num_ops;
387 op = (void *)(head + 1); 346 req->r_snapid = snap_id;
388 p = (void *)(op + num_op);
389
390 req->r_snapc = ceph_get_snap_context(snapc); 347 req->r_snapc = ceph_get_snap_context(snapc);
391 348
392 head->client_inc = cpu_to_le32(1); /* always, for now. */ 349 /* encode request */
393 head->flags = cpu_to_le32(flags); 350 msg->hdr.version = cpu_to_le16(4);
394 if (flags & CEPH_OSD_FLAG_WRITE)
395 ceph_encode_timespec(&head->mtime, mtime);
396 head->num_ops = cpu_to_le16(num_op);
397
398
399 /* fill in oid */
400 head->object_len = cpu_to_le32(oid_len);
401 memcpy(p, oid, oid_len);
402 p += oid_len;
403 351
352 p = msg->front.iov_base;
353 ceph_encode_32(&p, 1); /* client_inc is always 1 */
354 req->r_request_osdmap_epoch = p;
355 p += 4;
356 req->r_request_flags = p;
357 p += 4;
358 if (req->r_flags & CEPH_OSD_FLAG_WRITE)
359 ceph_encode_timespec(p, mtime);
360 p += sizeof(struct ceph_timespec);
361 req->r_request_reassert_version = p;
362 p += sizeof(struct ceph_eversion); /* will get filled in */
363
364 /* oloc */
365 ceph_encode_8(&p, 4);
366 ceph_encode_8(&p, 4);
367 ceph_encode_32(&p, 8 + 4 + 4);
368 req->r_request_pool = p;
369 p += 8;
370 ceph_encode_32(&p, -1); /* preferred */
371 ceph_encode_32(&p, 0); /* key len */
372
373 ceph_encode_8(&p, 1);
374 req->r_request_pgid = p;
375 p += 8 + 4;
376 ceph_encode_32(&p, -1); /* preferred */
377
378 /* oid */
379 ceph_encode_32(&p, req->r_oid_len);
380 memcpy(p, req->r_oid, req->r_oid_len);
381 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
382 p += req->r_oid_len;
383
384 /* ops */
385 ceph_encode_16(&p, num_ops);
404 src_op = src_ops; 386 src_op = src_ops;
405 while (src_op->op) { 387 req->r_request_ops = p;
406 osd_req_encode_op(req, op, src_op); 388 for (i = 0; i < num_ops; i++, src_op++) {
407 src_op++; 389 osd_req_encode_op(req, p, src_op);
408 op++; 390 p += sizeof(struct ceph_osd_op);
409 } 391 }
410 392
411 if (req->r_trail) 393 /* snaps */
412 data_len += req->r_trail->length; 394 ceph_encode_64(&p, req->r_snapid);
413 395 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
414 if (snapc) { 396 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
415 head->snap_seq = cpu_to_le64(snapc->seq); 397 if (req->r_snapc) {
416 head->num_snaps = cpu_to_le32(snapc->num_snaps);
417 for (i = 0; i < snapc->num_snaps; i++) { 398 for (i = 0; i < snapc->num_snaps; i++) {
418 put_unaligned_le64(snapc->snaps[i], p); 399 ceph_encode_64(&p, req->r_snapc->snaps[i]);
419 p += sizeof(u64);
420 } 400 }
421 } 401 }
422 402
403 req->r_request_attempts = p;
404 p += 4;
405
406 data_len = req->r_trail.length;
423 if (flags & CEPH_OSD_FLAG_WRITE) { 407 if (flags & CEPH_OSD_FLAG_WRITE) {
424 req->r_request->hdr.data_off = cpu_to_le16(off); 408 req->r_request->hdr.data_off = cpu_to_le16(off);
425 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len); 409 data_len += len;
426 } else if (data_len) {
427 req->r_request->hdr.data_off = 0;
428 req->r_request->hdr.data_len = cpu_to_le32(data_len);
429 } 410 }
430 411 req->r_request->hdr.data_len = cpu_to_le32(data_len);
431 req->r_request->page_alignment = req->r_page_alignment; 412 req->r_request->page_alignment = req->r_page_alignment;
432 413
433 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 414 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
434 msg_size = p - msg->front.iov_base; 415 msg_size = p - msg->front.iov_base;
435 msg->front.iov_len = msg_size; 416 msg->front.iov_len = msg_size;
436 msg->hdr.front_len = cpu_to_le32(msg_size); 417 msg->hdr.front_len = cpu_to_le32(msg_size);
418
419 dout("build_request msg_size was %d num_ops %d\n", (int)msg_size,
420 num_ops);
437 return; 421 return;
438} 422}
439EXPORT_SYMBOL(ceph_osdc_build_request); 423EXPORT_SYMBOL(ceph_osdc_build_request);
@@ -459,34 +443,33 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
459 u32 truncate_seq, 443 u32 truncate_seq,
460 u64 truncate_size, 444 u64 truncate_size,
461 struct timespec *mtime, 445 struct timespec *mtime,
462 bool use_mempool, int num_reply, 446 bool use_mempool,
463 int page_align) 447 int page_align)
464{ 448{
465 struct ceph_osd_req_op ops[3]; 449 struct ceph_osd_req_op ops[2];
466 struct ceph_osd_request *req; 450 struct ceph_osd_request *req;
451 unsigned int num_op = 1;
467 int r; 452 int r;
468 453
454 memset(&ops, 0, sizeof ops);
455
469 ops[0].op = opcode; 456 ops[0].op = opcode;
470 ops[0].extent.truncate_seq = truncate_seq; 457 ops[0].extent.truncate_seq = truncate_seq;
471 ops[0].extent.truncate_size = truncate_size; 458 ops[0].extent.truncate_size = truncate_size;
472 ops[0].payload_len = 0;
473 459
474 if (do_sync) { 460 if (do_sync) {
475 ops[1].op = CEPH_OSD_OP_STARTSYNC; 461 ops[1].op = CEPH_OSD_OP_STARTSYNC;
476 ops[1].payload_len = 0; 462 num_op++;
477 ops[2].op = 0; 463 }
478 } else 464
479 ops[1].op = 0; 465 req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
480 466 GFP_NOFS);
481 req = ceph_osdc_alloc_request(osdc, flags,
482 snapc, ops,
483 use_mempool,
484 GFP_NOFS, NULL, NULL);
485 if (!req) 467 if (!req)
486 return ERR_PTR(-ENOMEM); 468 return ERR_PTR(-ENOMEM);
469 req->r_flags = flags;
487 470
488 /* calculate max write size */ 471 /* calculate max write size */
489 r = calc_layout(osdc, vino, layout, off, plen, req, ops); 472 r = calc_layout(vino, layout, off, plen, req, ops);
490 if (r < 0) 473 if (r < 0)
491 return ERR_PTR(r); 474 return ERR_PTR(r);
492 req->r_file_layout = *layout; /* keep a copy */ 475 req->r_file_layout = *layout; /* keep a copy */
@@ -496,10 +479,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
496 req->r_num_pages = calc_pages_for(page_align, *plen); 479 req->r_num_pages = calc_pages_for(page_align, *plen);
497 req->r_page_alignment = page_align; 480 req->r_page_alignment = page_align;
498 481
499 ceph_osdc_build_request(req, off, plen, ops, 482 ceph_osdc_build_request(req, off, *plen, num_op, ops,
500 snapc, 483 snapc, vino.snap, mtime);
501 mtime,
502 req->r_oid, req->r_oid_len);
503 484
504 return req; 485 return req;
505} 486}
@@ -623,8 +604,8 @@ static void osd_reset(struct ceph_connection *con)
623 down_read(&osdc->map_sem); 604 down_read(&osdc->map_sem);
624 mutex_lock(&osdc->request_mutex); 605 mutex_lock(&osdc->request_mutex);
625 __kick_osd_requests(osdc, osd); 606 __kick_osd_requests(osdc, osd);
607 __send_queued(osdc);
626 mutex_unlock(&osdc->request_mutex); 608 mutex_unlock(&osdc->request_mutex);
627 send_queued(osdc);
628 up_read(&osdc->map_sem); 609 up_read(&osdc->map_sem);
629} 610}
630 611
@@ -739,31 +720,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
739 */ 720 */
740static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 721static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
741{ 722{
742 struct ceph_osd_request *req; 723 struct ceph_entity_addr *peer_addr;
743 int ret = 0;
744 724
745 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 725 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
746 if (list_empty(&osd->o_requests) && 726 if (list_empty(&osd->o_requests) &&
747 list_empty(&osd->o_linger_requests)) { 727 list_empty(&osd->o_linger_requests)) {
748 __remove_osd(osdc, osd); 728 __remove_osd(osdc, osd);
749 ret = -ENODEV; 729
750 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 730 return -ENODEV;
751 &osd->o_con.peer_addr, 731 }
752 sizeof(osd->o_con.peer_addr)) == 0 && 732
753 !ceph_con_opened(&osd->o_con)) { 733 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
734 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
735 !ceph_con_opened(&osd->o_con)) {
736 struct ceph_osd_request *req;
737
754 dout(" osd addr hasn't changed and connection never opened," 738 dout(" osd addr hasn't changed and connection never opened,"
755 " letting msgr retry"); 739 " letting msgr retry");
756 /* touch each r_stamp for handle_timeout()'s benfit */ 740 /* touch each r_stamp for handle_timeout()'s benfit */
757 list_for_each_entry(req, &osd->o_requests, r_osd_item) 741 list_for_each_entry(req, &osd->o_requests, r_osd_item)
758 req->r_stamp = jiffies; 742 req->r_stamp = jiffies;
759 ret = -EAGAIN; 743
760 } else { 744 return -EAGAIN;
761 ceph_con_close(&osd->o_con);
762 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
763 &osdc->osdmap->osd_addr[osd->o_osd]);
764 osd->o_incarnation++;
765 } 745 }
766 return ret; 746
747 ceph_con_close(&osd->o_con);
748 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
749 osd->o_incarnation++;
750
751 return 0;
767} 752}
768 753
769static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 754static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -961,20 +946,18 @@ EXPORT_SYMBOL(ceph_osdc_set_request_linger);
961static int __map_request(struct ceph_osd_client *osdc, 946static int __map_request(struct ceph_osd_client *osdc,
962 struct ceph_osd_request *req, int force_resend) 947 struct ceph_osd_request *req, int force_resend)
963{ 948{
964 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
965 struct ceph_pg pgid; 949 struct ceph_pg pgid;
966 int acting[CEPH_PG_MAX_SIZE]; 950 int acting[CEPH_PG_MAX_SIZE];
967 int o = -1, num = 0; 951 int o = -1, num = 0;
968 int err; 952 int err;
969 953
970 dout("map_request %p tid %lld\n", req, req->r_tid); 954 dout("map_request %p tid %lld\n", req, req->r_tid);
971 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, 955 err = ceph_calc_object_layout(&pgid, req->r_oid,
972 &req->r_file_layout, osdc->osdmap); 956 &req->r_file_layout, osdc->osdmap);
973 if (err) { 957 if (err) {
974 list_move(&req->r_req_lru_item, &osdc->req_notarget); 958 list_move(&req->r_req_lru_item, &osdc->req_notarget);
975 return err; 959 return err;
976 } 960 }
977 pgid = reqhead->layout.ol_pgid;
978 req->r_pgid = pgid; 961 req->r_pgid = pgid;
979 962
980 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 963 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
@@ -991,8 +974,8 @@ static int __map_request(struct ceph_osd_client *osdc,
991 (req->r_osd == NULL && o == -1)) 974 (req->r_osd == NULL && o == -1))
992 return 0; /* no change */ 975 return 0; /* no change */
993 976
994 dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n", 977 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
995 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, 978 req->r_tid, pgid.pool, pgid.seed, o,
996 req->r_osd ? req->r_osd->o_osd : -1); 979 req->r_osd ? req->r_osd->o_osd : -1);
997 980
998 /* record full pg acting set */ 981 /* record full pg acting set */
@@ -1041,15 +1024,22 @@ out:
1041static void __send_request(struct ceph_osd_client *osdc, 1024static void __send_request(struct ceph_osd_client *osdc,
1042 struct ceph_osd_request *req) 1025 struct ceph_osd_request *req)
1043{ 1026{
1044 struct ceph_osd_request_head *reqhead; 1027 void *p;
1045
1046 dout("send_request %p tid %llu to osd%d flags %d\n",
1047 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
1048 1028
1049 reqhead = req->r_request->front.iov_base; 1029 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
1050 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch); 1030 req, req->r_tid, req->r_osd->o_osd, req->r_flags,
1051 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ 1031 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
1052 reqhead->reassert_version = req->r_reassert_version; 1032
1033 /* fill in message content that changes each time we send it */
1034 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1035 put_unaligned_le32(req->r_flags, req->r_request_flags);
1036 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
1037 p = req->r_request_pgid;
1038 ceph_encode_64(&p, req->r_pgid.pool);
1039 ceph_encode_32(&p, req->r_pgid.seed);
1040 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */
1041 memcpy(req->r_request_reassert_version, &req->r_reassert_version,
1042 sizeof(req->r_reassert_version));
1053 1043
1054 req->r_stamp = jiffies; 1044 req->r_stamp = jiffies;
1055 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1045 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
@@ -1062,16 +1052,13 @@ static void __send_request(struct ceph_osd_client *osdc,
1062/* 1052/*
1063 * Send any requests in the queue (req_unsent). 1053 * Send any requests in the queue (req_unsent).
1064 */ 1054 */
1065static void send_queued(struct ceph_osd_client *osdc) 1055static void __send_queued(struct ceph_osd_client *osdc)
1066{ 1056{
1067 struct ceph_osd_request *req, *tmp; 1057 struct ceph_osd_request *req, *tmp;
1068 1058
1069 dout("send_queued\n"); 1059 dout("__send_queued\n");
1070 mutex_lock(&osdc->request_mutex); 1060 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
1071 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
1072 __send_request(osdc, req); 1061 __send_request(osdc, req);
1073 }
1074 mutex_unlock(&osdc->request_mutex);
1075} 1062}
1076 1063
1077/* 1064/*
@@ -1123,8 +1110,8 @@ static void handle_timeout(struct work_struct *work)
1123 } 1110 }
1124 1111
1125 __schedule_osd_timeout(osdc); 1112 __schedule_osd_timeout(osdc);
1113 __send_queued(osdc);
1126 mutex_unlock(&osdc->request_mutex); 1114 mutex_unlock(&osdc->request_mutex);
1127 send_queued(osdc);
1128 up_read(&osdc->map_sem); 1115 up_read(&osdc->map_sem);
1129} 1116}
1130 1117
@@ -1152,6 +1139,26 @@ static void complete_request(struct ceph_osd_request *req)
1152 complete_all(&req->r_safe_completion); /* fsync waiter */ 1139 complete_all(&req->r_safe_completion); /* fsync waiter */
1153} 1140}
1154 1141
1142static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
1143{
1144 __u8 v;
1145
1146 ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
1147 v = ceph_decode_8(p);
1148 if (v > 1) {
1149 pr_warning("do not understand pg encoding %d > 1", v);
1150 return -EINVAL;
1151 }
1152 pgid->pool = ceph_decode_64(p);
1153 pgid->seed = ceph_decode_32(p);
1154 *p += 4;
1155 return 0;
1156
1157bad:
1158 pr_warning("incomplete pg encoding");
1159 return -EINVAL;
1160}
1161
1155/* 1162/*
1156 * handle osd op reply. either call the callback if it is specified, 1163 * handle osd op reply. either call the callback if it is specified,
1157 * or do the completion to wake up the waiting thread. 1164 * or do the completion to wake up the waiting thread.
@@ -1159,22 +1166,42 @@ static void complete_request(struct ceph_osd_request *req)
1159static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1166static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1160 struct ceph_connection *con) 1167 struct ceph_connection *con)
1161{ 1168{
1162 struct ceph_osd_reply_head *rhead = msg->front.iov_base; 1169 void *p, *end;
1163 struct ceph_osd_request *req; 1170 struct ceph_osd_request *req;
1164 u64 tid; 1171 u64 tid;
1165 int numops, object_len, flags; 1172 int object_len;
1173 int numops, payload_len, flags;
1166 s32 result; 1174 s32 result;
1175 s32 retry_attempt;
1176 struct ceph_pg pg;
1177 int err;
1178 u32 reassert_epoch;
1179 u64 reassert_version;
1180 u32 osdmap_epoch;
1181 int i;
1167 1182
1168 tid = le64_to_cpu(msg->hdr.tid); 1183 tid = le64_to_cpu(msg->hdr.tid);
1169 if (msg->front.iov_len < sizeof(*rhead)) 1184 dout("handle_reply %p tid %llu\n", msg, tid);
1170 goto bad; 1185
1171 numops = le32_to_cpu(rhead->num_ops); 1186 p = msg->front.iov_base;
1172 object_len = le32_to_cpu(rhead->object_len); 1187 end = p + msg->front.iov_len;
1173 result = le32_to_cpu(rhead->result); 1188
1174 if (msg->front.iov_len != sizeof(*rhead) + object_len + 1189 ceph_decode_need(&p, end, 4, bad);
1175 numops * sizeof(struct ceph_osd_op)) 1190 object_len = ceph_decode_32(&p);
1191 ceph_decode_need(&p, end, object_len, bad);
1192 p += object_len;
1193
1194 err = __decode_pgid(&p, end, &pg);
1195 if (err)
1176 goto bad; 1196 goto bad;
1177 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); 1197
1198 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
1199 flags = ceph_decode_64(&p);
1200 result = ceph_decode_32(&p);
1201 reassert_epoch = ceph_decode_32(&p);
1202 reassert_version = ceph_decode_64(&p);
1203 osdmap_epoch = ceph_decode_32(&p);
1204
1178 /* lookup */ 1205 /* lookup */
1179 mutex_lock(&osdc->request_mutex); 1206 mutex_lock(&osdc->request_mutex);
1180 req = __lookup_request(osdc, tid); 1207 req = __lookup_request(osdc, tid);
@@ -1184,7 +1211,38 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1184 return; 1211 return;
1185 } 1212 }
1186 ceph_osdc_get_request(req); 1213 ceph_osdc_get_request(req);
1187 flags = le32_to_cpu(rhead->flags); 1214
1215 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
1216 req, result);
1217
1218 ceph_decode_need(&p, end, 4, bad);
1219 numops = ceph_decode_32(&p);
1220 if (numops > CEPH_OSD_MAX_OP)
1221 goto bad_put;
1222 if (numops != req->r_num_ops)
1223 goto bad_put;
1224 payload_len = 0;
1225 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
1226 for (i = 0; i < numops; i++) {
1227 struct ceph_osd_op *op = p;
1228 int len;
1229
1230 len = le32_to_cpu(op->payload_len);
1231 req->r_reply_op_len[i] = len;
1232 dout(" op %d has %d bytes\n", i, len);
1233 payload_len += len;
1234 p += sizeof(*op);
1235 }
1236 if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
1237 pr_warning("sum of op payload lens %d != data_len %d",
1238 payload_len, le32_to_cpu(msg->hdr.data_len));
1239 goto bad_put;
1240 }
1241
1242 ceph_decode_need(&p, end, 4 + numops * 4, bad);
1243 retry_attempt = ceph_decode_32(&p);
1244 for (i = 0; i < numops; i++)
1245 req->r_reply_op_result[i] = ceph_decode_32(&p);
1188 1246
1189 /* 1247 /*
1190 * if this connection filled our message, drop our reference now, to 1248 * if this connection filled our message, drop our reference now, to
@@ -1199,7 +1257,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1199 if (!req->r_got_reply) { 1257 if (!req->r_got_reply) {
1200 unsigned int bytes; 1258 unsigned int bytes;
1201 1259
1202 req->r_result = le32_to_cpu(rhead->result); 1260 req->r_result = result;
1203 bytes = le32_to_cpu(msg->hdr.data_len); 1261 bytes = le32_to_cpu(msg->hdr.data_len);
1204 dout("handle_reply result %d bytes %d\n", req->r_result, 1262 dout("handle_reply result %d bytes %d\n", req->r_result,
1205 bytes); 1263 bytes);
@@ -1207,7 +1265,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1207 req->r_result = bytes; 1265 req->r_result = bytes;
1208 1266
1209 /* in case this is a write and we need to replay, */ 1267 /* in case this is a write and we need to replay, */
1210 req->r_reassert_version = rhead->reassert_version; 1268 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
1269 req->r_reassert_version.version = cpu_to_le64(reassert_version);
1211 1270
1212 req->r_got_reply = 1; 1271 req->r_got_reply = 1;
1213 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1272 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
@@ -1242,10 +1301,11 @@ done:
1242 ceph_osdc_put_request(req); 1301 ceph_osdc_put_request(req);
1243 return; 1302 return;
1244 1303
1304bad_put:
1305 ceph_osdc_put_request(req);
1245bad: 1306bad:
1246 pr_err("corrupt osd_op_reply got %d %d expected %d\n", 1307 pr_err("corrupt osd_op_reply got %d %d\n",
1247 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len), 1308 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
1248 (int)sizeof(*rhead));
1249 ceph_msg_dump(msg); 1309 ceph_msg_dump(msg);
1250} 1310}
1251 1311
@@ -1462,7 +1522,9 @@ done:
1462 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 1522 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1463 ceph_monc_request_next_osdmap(&osdc->client->monc); 1523 ceph_monc_request_next_osdmap(&osdc->client->monc);
1464 1524
1465 send_queued(osdc); 1525 mutex_lock(&osdc->request_mutex);
1526 __send_queued(osdc);
1527 mutex_unlock(&osdc->request_mutex);
1466 up_read(&osdc->map_sem); 1528 up_read(&osdc->map_sem);
1467 wake_up_all(&osdc->client->auth_wq); 1529 wake_up_all(&osdc->client->auth_wq);
1468 return; 1530 return;
@@ -1556,8 +1618,7 @@ static void __remove_event(struct ceph_osd_event *event)
1556 1618
1557int ceph_osdc_create_event(struct ceph_osd_client *osdc, 1619int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1558 void (*event_cb)(u64, u64, u8, void *), 1620 void (*event_cb)(u64, u64, u8, void *),
1559 int one_shot, void *data, 1621 void *data, struct ceph_osd_event **pevent)
1560 struct ceph_osd_event **pevent)
1561{ 1622{
1562 struct ceph_osd_event *event; 1623 struct ceph_osd_event *event;
1563 1624
@@ -1567,14 +1628,13 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1567 1628
1568 dout("create_event %p\n", event); 1629 dout("create_event %p\n", event);
1569 event->cb = event_cb; 1630 event->cb = event_cb;
1570 event->one_shot = one_shot; 1631 event->one_shot = 0;
1571 event->data = data; 1632 event->data = data;
1572 event->osdc = osdc; 1633 event->osdc = osdc;
1573 INIT_LIST_HEAD(&event->osd_node); 1634 INIT_LIST_HEAD(&event->osd_node);
1574 RB_CLEAR_NODE(&event->node); 1635 RB_CLEAR_NODE(&event->node);
1575 kref_init(&event->kref); /* one ref for us */ 1636 kref_init(&event->kref); /* one ref for us */
1576 kref_get(&event->kref); /* one ref for the caller */ 1637 kref_get(&event->kref); /* one ref for the caller */
1577 init_completion(&event->completion);
1578 1638
1579 spin_lock(&osdc->event_lock); 1639 spin_lock(&osdc->event_lock);
1580 event->cookie = ++osdc->event_count; 1640 event->cookie = ++osdc->event_count;
@@ -1610,7 +1670,6 @@ static void do_event_work(struct work_struct *work)
1610 1670
1611 dout("do_event_work completing %p\n", event); 1671 dout("do_event_work completing %p\n", event);
1612 event->cb(ver, notify_id, opcode, event->data); 1672 event->cb(ver, notify_id, opcode, event->data);
1613 complete(&event->completion);
1614 dout("do_event_work completed %p\n", event); 1673 dout("do_event_work completed %p\n", event);
1615 ceph_osdc_put_event(event); 1674 ceph_osdc_put_event(event);
1616 kfree(event_work); 1675 kfree(event_work);
@@ -1620,7 +1679,8 @@ static void do_event_work(struct work_struct *work)
1620/* 1679/*
1621 * Process osd watch notifications 1680 * Process osd watch notifications
1622 */ 1681 */
1623void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1682static void handle_watch_notify(struct ceph_osd_client *osdc,
1683 struct ceph_msg *msg)
1624{ 1684{
1625 void *p, *end; 1685 void *p, *end;
1626 u8 proto_ver; 1686 u8 proto_ver;
@@ -1641,9 +1701,8 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1641 spin_lock(&osdc->event_lock); 1701 spin_lock(&osdc->event_lock);
1642 event = __find_event(osdc, cookie); 1702 event = __find_event(osdc, cookie);
1643 if (event) { 1703 if (event) {
1704 BUG_ON(event->one_shot);
1644 get_event(event); 1705 get_event(event);
1645 if (event->one_shot)
1646 __remove_event(event);
1647 } 1706 }
1648 spin_unlock(&osdc->event_lock); 1707 spin_unlock(&osdc->event_lock);
1649 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 1708 dout("handle_watch_notify cookie %lld ver %lld event %p\n",
@@ -1668,7 +1727,6 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1668 return; 1727 return;
1669 1728
1670done_err: 1729done_err:
1671 complete(&event->completion);
1672 ceph_osdc_put_event(event); 1730 ceph_osdc_put_event(event);
1673 return; 1731 return;
1674 1732
@@ -1677,21 +1735,6 @@ bad:
1677 return; 1735 return;
1678} 1736}
1679 1737
1680int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
1681{
1682 int err;
1683
1684 dout("wait_event %p\n", event);
1685 err = wait_for_completion_interruptible_timeout(&event->completion,
1686 timeout * HZ);
1687 ceph_osdc_put_event(event);
1688 if (err > 0)
1689 err = 0;
1690 dout("wait_event %p returns %d\n", event, err);
1691 return err;
1692}
1693EXPORT_SYMBOL(ceph_osdc_wait_event);
1694
1695/* 1738/*
1696 * Register request, send initial attempt. 1739 * Register request, send initial attempt.
1697 */ 1740 */
@@ -1706,7 +1749,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1706#ifdef CONFIG_BLOCK 1749#ifdef CONFIG_BLOCK
1707 req->r_request->bio = req->r_bio; 1750 req->r_request->bio = req->r_bio;
1708#endif 1751#endif
1709 req->r_request->trail = req->r_trail; 1752 req->r_request->trail = &req->r_trail;
1710 1753
1711 register_request(osdc, req); 1754 register_request(osdc, req);
1712 1755
@@ -1865,7 +1908,6 @@ out_mempool:
1865out: 1908out:
1866 return err; 1909 return err;
1867} 1910}
1868EXPORT_SYMBOL(ceph_osdc_init);
1869 1911
1870void ceph_osdc_stop(struct ceph_osd_client *osdc) 1912void ceph_osdc_stop(struct ceph_osd_client *osdc)
1871{ 1913{
@@ -1882,7 +1924,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
1882 ceph_msgpool_destroy(&osdc->msgpool_op); 1924 ceph_msgpool_destroy(&osdc->msgpool_op);
1883 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 1925 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1884} 1926}
1885EXPORT_SYMBOL(ceph_osdc_stop);
1886 1927
1887/* 1928/*
1888 * Read some contiguous pages. If we cross a stripe boundary, shorten 1929 * Read some contiguous pages. If we cross a stripe boundary, shorten
@@ -1902,7 +1943,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1902 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1943 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1903 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 1944 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1904 NULL, 0, truncate_seq, truncate_size, NULL, 1945 NULL, 0, truncate_seq, truncate_size, NULL,
1905 false, 1, page_align); 1946 false, page_align);
1906 if (IS_ERR(req)) 1947 if (IS_ERR(req))
1907 return PTR_ERR(req); 1948 return PTR_ERR(req);
1908 1949
@@ -1931,8 +1972,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1931 u64 off, u64 len, 1972 u64 off, u64 len,
1932 u32 truncate_seq, u64 truncate_size, 1973 u32 truncate_seq, u64 truncate_size,
1933 struct timespec *mtime, 1974 struct timespec *mtime,
1934 struct page **pages, int num_pages, 1975 struct page **pages, int num_pages)
1935 int flags, int do_sync, bool nofail)
1936{ 1976{
1937 struct ceph_osd_request *req; 1977 struct ceph_osd_request *req;
1938 int rc = 0; 1978 int rc = 0;
@@ -1941,11 +1981,10 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1941 BUG_ON(vino.snap != CEPH_NOSNAP); 1981 BUG_ON(vino.snap != CEPH_NOSNAP);
1942 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1982 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1943 CEPH_OSD_OP_WRITE, 1983 CEPH_OSD_OP_WRITE,
1944 flags | CEPH_OSD_FLAG_ONDISK | 1984 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1945 CEPH_OSD_FLAG_WRITE, 1985 snapc, 0,
1946 snapc, do_sync,
1947 truncate_seq, truncate_size, mtime, 1986 truncate_seq, truncate_size, mtime,
1948 nofail, 1, page_align); 1987 true, page_align);
1949 if (IS_ERR(req)) 1988 if (IS_ERR(req))
1950 return PTR_ERR(req); 1989 return PTR_ERR(req);
1951 1990
@@ -1954,7 +1993,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1954 dout("writepages %llu~%llu (%d pages)\n", off, len, 1993 dout("writepages %llu~%llu (%d pages)\n", off, len,
1955 req->r_num_pages); 1994 req->r_num_pages);
1956 1995
1957 rc = ceph_osdc_start_request(osdc, req, nofail); 1996 rc = ceph_osdc_start_request(osdc, req, true);
1958 if (!rc) 1997 if (!rc)
1959 rc = ceph_osdc_wait_request(osdc, req); 1998 rc = ceph_osdc_wait_request(osdc, req);
1960 1999
@@ -2047,7 +2086,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2047 if (data_len > 0) { 2086 if (data_len > 0) {
2048 int want = calc_pages_for(req->r_page_alignment, data_len); 2087 int want = calc_pages_for(req->r_page_alignment, data_len);
2049 2088
2050 if (unlikely(req->r_num_pages < want)) { 2089 if (req->r_pages && unlikely(req->r_num_pages < want)) {
2051 pr_warning("tid %lld reply has %d bytes %d pages, we" 2090 pr_warning("tid %lld reply has %d bytes %d pages, we"
2052 " had only %d pages ready\n", tid, data_len, 2091 " had only %d pages ready\n", tid, data_len,
2053 want, req->r_num_pages); 2092 want, req->r_num_pages);