aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--drivers/block/rbd.c52
-rw-r--r--fs/ceph/addr.c31
-rw-r--r--include/linux/ceph/osd_client.h19
-rw-r--r--include/linux/ceph/rados.h38
-rw-r--r--net/ceph/debugfs.c18
-rw-r--r--net/ceph/osd_client.c233
6 files changed, 222 insertions, 169 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 22085e86a409..6c81a4c040b9 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -196,7 +196,7 @@ struct rbd_obj_request {
196 196
197 u64 xferred; /* bytes transferred */ 197 u64 xferred; /* bytes transferred */
198 u64 version; 198 u64 version;
199 s32 result; 199 int result;
200 atomic_t done; 200 atomic_t done;
201 201
202 rbd_obj_callback_t callback; 202 rbd_obj_callback_t callback;
@@ -1282,12 +1282,19 @@ static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1282 1282
1283static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) 1283static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1284{ 1284{
1285
1286 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request, 1285 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1287 obj_request->result, obj_request->xferred, obj_request->length); 1286 obj_request->result, obj_request->xferred, obj_request->length);
1288 if (obj_request->result == (s32) -ENOENT) { 1287 /*
1288 * ENOENT means a hole in the object. We zero-fill the
1289 * entire length of the request. A short read also implies
1290 * zero-fill to the end of the request. Either way we
1291 * update the xferred count to indicate the whole request
1292 * was satisfied.
1293 */
1294 if (obj_request->result == -ENOENT) {
1289 zero_bio_chain(obj_request->bio_list, 0); 1295 zero_bio_chain(obj_request->bio_list, 0);
1290 obj_request->result = 0; 1296 obj_request->result = 0;
1297 obj_request->xferred = obj_request->length;
1291 } else if (obj_request->xferred < obj_request->length && 1298 } else if (obj_request->xferred < obj_request->length &&
1292 !obj_request->result) { 1299 !obj_request->result) {
1293 zero_bio_chain(obj_request->bio_list, obj_request->xferred); 1300 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
@@ -1298,20 +1305,14 @@ static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1298 1305
1299static void rbd_osd_write_callback(struct rbd_obj_request *obj_request) 1306static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1300{ 1307{
1301 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request, 1308 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1302 obj_request->result, obj_request->xferred, obj_request->length); 1309 obj_request->result, obj_request->length);
1303 1310 /*
1304 /* A short write really shouldn't occur. Warn if we see one */ 1311 * There is no such thing as a successful short write.
1305 1312 * Our xferred value is the number of bytes transferred
1306 if (obj_request->xferred != obj_request->length) { 1313 * back. Set it to our originally-requested length.
1307 struct rbd_img_request *img_request = obj_request->img_request; 1314 */
1308 struct rbd_device *rbd_dev; 1315 obj_request->xferred = obj_request->length;
1309
1310 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1311 rbd_warn(rbd_dev, "wrote %llu want %llu\n",
1312 obj_request->xferred, obj_request->length);
1313 }
1314
1315 obj_request_done_set(obj_request); 1316 obj_request_done_set(obj_request);
1316} 1317}
1317 1318
@@ -1329,9 +1330,6 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1329 struct ceph_msg *msg) 1330 struct ceph_msg *msg)
1330{ 1331{
1331 struct rbd_obj_request *obj_request = osd_req->r_priv; 1332 struct rbd_obj_request *obj_request = osd_req->r_priv;
1332 struct ceph_osd_reply_head *reply_head;
1333 struct ceph_osd_op *op;
1334 u32 num_ops;
1335 u16 opcode; 1333 u16 opcode;
1336 1334
1337 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg); 1335 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
@@ -1339,22 +1337,19 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1339 rbd_assert(!!obj_request->img_request ^ 1337 rbd_assert(!!obj_request->img_request ^
1340 (obj_request->which == BAD_WHICH)); 1338 (obj_request->which == BAD_WHICH));
1341 1339
1342 reply_head = msg->front.iov_base; 1340 if (osd_req->r_result < 0)
1343 obj_request->result = (s32) le32_to_cpu(reply_head->result); 1341 obj_request->result = osd_req->r_result;
1344 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version); 1342 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1345 1343
1346 num_ops = le32_to_cpu(reply_head->num_ops); 1344 WARN_ON(osd_req->r_num_ops != 1); /* For now */
1347 WARN_ON(num_ops != 1); /* For now */
1348 1345
1349 /* 1346 /*
1350 * We support a 64-bit length, but ultimately it has to be 1347 * We support a 64-bit length, but ultimately it has to be
1351 * passed to blk_end_request(), which takes an unsigned int. 1348 * passed to blk_end_request(), which takes an unsigned int.
1352 */ 1349 */
1353 op = &reply_head->ops[0]; 1350 obj_request->xferred = osd_req->r_reply_op_len[0];
1354 obj_request->xferred = le64_to_cpu(op->extent.length);
1355 rbd_assert(obj_request->xferred < (u64) UINT_MAX); 1351 rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1356 1352 opcode = osd_req->r_request_ops[0].op;
1357 opcode = le16_to_cpu(op->op);
1358 switch (opcode) { 1353 switch (opcode) {
1359 case CEPH_OSD_OP_READ: 1354 case CEPH_OSD_OP_READ:
1360 rbd_osd_read_callback(obj_request); 1355 rbd_osd_read_callback(obj_request);
@@ -1719,6 +1714,7 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1719 more = blk_end_request(img_request->rq, result, xferred); 1714 more = blk_end_request(img_request->rq, result, xferred);
1720 which++; 1715 which++;
1721 } 1716 }
1717
1722 rbd_assert(more ^ (which == img_request->obj_request_count)); 1718 rbd_assert(more ^ (which == img_request->obj_request_count));
1723 img_request->next_completion = which; 1719 img_request->next_completion = which;
1724out: 1720out:
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index fc613715af46..cfef3e01a9b3 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -236,16 +236,10 @@ static int ceph_readpage(struct file *filp, struct page *page)
236static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) 236static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
237{ 237{
238 struct inode *inode = req->r_inode; 238 struct inode *inode = req->r_inode;
239 struct ceph_osd_reply_head *replyhead; 239 int rc = req->r_result;
240 int rc, bytes; 240 int bytes = le32_to_cpu(msg->hdr.data_len);
241 int i; 241 int i;
242 242
243 /* parse reply */
244 replyhead = msg->front.iov_base;
245 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
246 rc = le32_to_cpu(replyhead->result);
247 bytes = le32_to_cpu(msg->hdr.data_len);
248
249 dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); 243 dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
250 244
251 /* unlock all pages, zeroing any data we didn't read */ 245 /* unlock all pages, zeroing any data we didn't read */
@@ -553,27 +547,18 @@ static void writepages_finish(struct ceph_osd_request *req,
553 struct ceph_msg *msg) 547 struct ceph_msg *msg)
554{ 548{
555 struct inode *inode = req->r_inode; 549 struct inode *inode = req->r_inode;
556 struct ceph_osd_reply_head *replyhead;
557 struct ceph_osd_op *op;
558 struct ceph_inode_info *ci = ceph_inode(inode); 550 struct ceph_inode_info *ci = ceph_inode(inode);
559 unsigned wrote; 551 unsigned wrote;
560 struct page *page; 552 struct page *page;
561 int i; 553 int i;
562 struct ceph_snap_context *snapc = req->r_snapc; 554 struct ceph_snap_context *snapc = req->r_snapc;
563 struct address_space *mapping = inode->i_mapping; 555 struct address_space *mapping = inode->i_mapping;
564 __s32 rc = -EIO; 556 int rc = req->r_result;
565 u64 bytes = 0; 557 u64 bytes = le64_to_cpu(req->r_request_ops[0].extent.length);
566 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 558 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
567 long writeback_stat; 559 long writeback_stat;
568 unsigned issued = ceph_caps_issued(ci); 560 unsigned issued = ceph_caps_issued(ci);
569 561
570 /* parse reply */
571 replyhead = msg->front.iov_base;
572 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
573 op = (void *)(replyhead + 1);
574 rc = le32_to_cpu(replyhead->result);
575 bytes = le64_to_cpu(op->extent.length);
576
577 if (rc >= 0) { 562 if (rc >= 0) {
578 /* 563 /*
579 * Assume we wrote the pages we originally sent. The 564 * Assume we wrote the pages we originally sent. The
@@ -740,8 +725,6 @@ retry:
740 struct page *page; 725 struct page *page;
741 int want; 726 int want;
742 u64 offset, len; 727 u64 offset, len;
743 struct ceph_osd_request_head *reqhead;
744 struct ceph_osd_op *op;
745 long writeback_stat; 728 long writeback_stat;
746 729
747 next = 0; 730 next = 0;
@@ -905,10 +888,8 @@ get_more_pages:
905 888
906 /* revise final length, page count */ 889 /* revise final length, page count */
907 req->r_num_pages = locked_pages; 890 req->r_num_pages = locked_pages;
908 reqhead = req->r_request->front.iov_base; 891 req->r_request_ops[0].extent.length = cpu_to_le64(len);
909 op = (void *)(reqhead + 1); 892 req->r_request_ops[0].payload_len = cpu_to_le32(len);
910 op->extent.length = cpu_to_le64(len);
911 op->payload_len = cpu_to_le32(len);
912 req->r_request->hdr.data_len = cpu_to_le32(len); 893 req->r_request->hdr.data_len = cpu_to_le32(len);
913 894
914 rc = ceph_osdc_start_request(&fsc->client->osdc, req, true); 895 rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index ad8899fc3157..1dd5d466b6f9 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -47,6 +47,9 @@ struct ceph_osd {
47 struct list_head o_keepalive_item; 47 struct list_head o_keepalive_item;
48}; 48};
49 49
50
51#define CEPH_OSD_MAX_OP 10
52
50/* an in-flight request */ 53/* an in-flight request */
51struct ceph_osd_request { 54struct ceph_osd_request {
52 u64 r_tid; /* unique for this client */ 55 u64 r_tid; /* unique for this client */
@@ -63,9 +66,23 @@ struct ceph_osd_request {
63 struct ceph_connection *r_con_filling_msg; 66 struct ceph_connection *r_con_filling_msg;
64 67
65 struct ceph_msg *r_request, *r_reply; 68 struct ceph_msg *r_request, *r_reply;
66 int r_result;
67 int r_flags; /* any additional flags for the osd */ 69 int r_flags; /* any additional flags for the osd */
68 u32 r_sent; /* >0 if r_request is sending/sent */ 70 u32 r_sent; /* >0 if r_request is sending/sent */
71 int r_num_ops;
72
73 /* encoded message content */
74 struct ceph_osd_op *r_request_ops;
75 /* these are updated on each send */
76 __le32 *r_request_osdmap_epoch;
77 __le32 *r_request_flags;
78 __le64 *r_request_pool;
79 void *r_request_pgid;
80 __le32 *r_request_attempts;
81 struct ceph_eversion *r_request_reassert_version;
82
83 int r_result;
84 int r_reply_op_len[CEPH_OSD_MAX_OP];
85 s32 r_reply_op_result[CEPH_OSD_MAX_OP];
69 int r_got_reply; 86 int r_got_reply;
70 int r_linger; 87 int r_linger;
71 88
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index d784c8dfb09a..68c96a508ac2 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -416,43 +416,5 @@ struct ceph_osd_op {
416 __le32 payload_len; 416 __le32 payload_len;
417} __attribute__ ((packed)); 417} __attribute__ ((packed));
418 418
419/*
420 * osd request message header. each request may include multiple
421 * ceph_osd_op object operations.
422 */
423struct ceph_osd_request_head {
424 __le32 client_inc; /* client incarnation */
425 struct ceph_object_layout layout; /* pgid */
426 __le32 osdmap_epoch; /* client's osdmap epoch */
427
428 __le32 flags;
429
430 struct ceph_timespec mtime; /* for mutations only */
431 struct ceph_eversion reassert_version; /* if we are replaying op */
432
433 __le32 object_len; /* length of object name */
434
435 __le64 snapid; /* snapid to read */
436 __le64 snap_seq; /* writer's snap context */
437 __le32 num_snaps;
438
439 __le16 num_ops;
440 struct ceph_osd_op ops[]; /* followed by ops[], obj, ticket, snaps */
441} __attribute__ ((packed));
442
443struct ceph_osd_reply_head {
444 __le32 client_inc; /* client incarnation */
445 __le32 flags;
446 struct ceph_object_layout layout;
447 __le32 osdmap_epoch;
448 struct ceph_eversion reassert_version; /* for replaying uncommitted */
449
450 __le32 result; /* result code */
451
452 __le32 object_len; /* length of object name */
453 __le32 num_ops;
454 struct ceph_osd_op ops[0]; /* ops[], object */
455} __attribute__ ((packed));
456
457 419
458#endif 420#endif
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index f4d4b27d6026..00d051f4894e 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -123,10 +123,7 @@ static int osdc_show(struct seq_file *s, void *pp)
123 mutex_lock(&osdc->request_mutex); 123 mutex_lock(&osdc->request_mutex);
124 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { 124 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
125 struct ceph_osd_request *req; 125 struct ceph_osd_request *req;
126 struct ceph_osd_request_head *head; 126 int opcode;
127 struct ceph_osd_op *op;
128 int num_ops;
129 int opcode, olen;
130 int i; 127 int i;
131 128
132 req = rb_entry(p, struct ceph_osd_request, r_node); 129 req = rb_entry(p, struct ceph_osd_request, r_node);
@@ -135,13 +132,7 @@ static int osdc_show(struct seq_file *s, void *pp)
135 req->r_osd ? req->r_osd->o_osd : -1, 132 req->r_osd ? req->r_osd->o_osd : -1,
136 req->r_pgid.pool, req->r_pgid.seed); 133 req->r_pgid.pool, req->r_pgid.seed);
137 134
138 head = req->r_request->front.iov_base; 135 seq_printf(s, "%.*s", req->r_oid_len, req->r_oid);
139 op = (void *)(head + 1);
140
141 num_ops = le16_to_cpu(head->num_ops);
142 olen = le32_to_cpu(head->object_len);
143 seq_printf(s, "%.*s", olen,
144 (const char *)(head->ops + num_ops));
145 136
146 if (req->r_reassert_version.epoch) 137 if (req->r_reassert_version.epoch)
147 seq_printf(s, "\t%u'%llu", 138 seq_printf(s, "\t%u'%llu",
@@ -150,10 +141,9 @@ static int osdc_show(struct seq_file *s, void *pp)
150 else 141 else
151 seq_printf(s, "\t"); 142 seq_printf(s, "\t");
152 143
153 for (i = 0; i < num_ops; i++) { 144 for (i = 0; i < req->r_num_ops; i++) {
154 opcode = le16_to_cpu(op->op); 145 opcode = le16_to_cpu(req->r_request_ops[i].op);
155 seq_printf(s, "\t%s", ceph_osd_op_name(opcode)); 146 seq_printf(s, "\t%s", ceph_osd_op_name(opcode));
156 op++;
157 } 147 }
158 148
159 seq_printf(s, "\n"); 149 seq_printf(s, "\n");
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 5584f0a08e28..d730dd4d8eb2 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -146,15 +146,23 @@ EXPORT_SYMBOL(ceph_osdc_release_request);
146 146
147struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 147struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
148 struct ceph_snap_context *snapc, 148 struct ceph_snap_context *snapc,
149 unsigned int num_op, 149 unsigned int num_ops,
150 bool use_mempool, 150 bool use_mempool,
151 gfp_t gfp_flags) 151 gfp_t gfp_flags)
152{ 152{
153 struct ceph_osd_request *req; 153 struct ceph_osd_request *req;
154 struct ceph_msg *msg; 154 struct ceph_msg *msg;
155 size_t msg_size = sizeof(struct ceph_osd_request_head); 155 size_t msg_size;
156 156
157 msg_size += num_op*sizeof(struct ceph_osd_op); 157 msg_size = 4 + 4 + 8 + 8 + 4+8;
158 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
159 msg_size += 1 + 8 + 4 + 4; /* pg_t */
160 msg_size += 4 + MAX_OBJ_NAME_SIZE;
161 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
162 msg_size += 8; /* snapid */
163 msg_size += 8; /* snap_seq */
164 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
165 msg_size += 4;
158 166
159 if (use_mempool) { 167 if (use_mempool) {
160 req = mempool_alloc(osdc->req_mempool, gfp_flags); 168 req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -193,9 +201,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
193 ceph_pagelist_init(&req->r_trail); 201 ceph_pagelist_init(&req->r_trail);
194 202
195 /* create request message; allow space for oid */ 203 /* create request message; allow space for oid */
196 msg_size += MAX_OBJ_NAME_SIZE;
197 if (snapc)
198 msg_size += sizeof(u64) * snapc->num_snaps;
199 if (use_mempool) 204 if (use_mempool)
200 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 205 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
201 else 206 else
@@ -324,55 +329,80 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
324 * 329 *
325 */ 330 */
326void ceph_osdc_build_request(struct ceph_osd_request *req, 331void ceph_osdc_build_request(struct ceph_osd_request *req,
327 u64 off, u64 len, unsigned int num_op, 332 u64 off, u64 len, unsigned int num_ops,
328 struct ceph_osd_req_op *src_ops, 333 struct ceph_osd_req_op *src_ops,
329 struct ceph_snap_context *snapc, u64 snap_id, 334 struct ceph_snap_context *snapc, u64 snap_id,
330 struct timespec *mtime) 335 struct timespec *mtime)
331{ 336{
332 struct ceph_msg *msg = req->r_request; 337 struct ceph_msg *msg = req->r_request;
333 struct ceph_osd_request_head *head;
334 struct ceph_osd_req_op *src_op; 338 struct ceph_osd_req_op *src_op;
335 struct ceph_osd_op *op;
336 void *p; 339 void *p;
337 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 340 size_t msg_size;
338 int flags = req->r_flags; 341 int flags = req->r_flags;
339 u64 data_len; 342 u64 data_len;
340 int i; 343 int i;
341 344
342 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); 345 req->r_num_ops = num_ops;
343 346 req->r_snapid = snap_id;
344 head = msg->front.iov_base;
345 head->snapid = cpu_to_le64(snap_id);
346 op = (void *)(head + 1);
347 p = (void *)(op + num_op);
348
349 req->r_snapc = ceph_get_snap_context(snapc); 347 req->r_snapc = ceph_get_snap_context(snapc);
350 348
351 head->client_inc = cpu_to_le32(1); /* always, for now. */ 349 /* encode request */
352 head->flags = cpu_to_le32(flags); 350 msg->hdr.version = cpu_to_le16(4);
353 if (flags & CEPH_OSD_FLAG_WRITE)
354 ceph_encode_timespec(&head->mtime, mtime);
355 BUG_ON(num_op > (unsigned int) ((u16) -1));
356 head->num_ops = cpu_to_le16(num_op);
357 351
358 /* fill in oid */ 352 p = msg->front.iov_base;
359 head->object_len = cpu_to_le32(req->r_oid_len); 353 ceph_encode_32(&p, 1); /* client_inc is always 1 */
354 req->r_request_osdmap_epoch = p;
355 p += 4;
356 req->r_request_flags = p;
357 p += 4;
358 if (req->r_flags & CEPH_OSD_FLAG_WRITE)
359 ceph_encode_timespec(p, mtime);
360 p += sizeof(struct ceph_timespec);
361 req->r_request_reassert_version = p;
362 p += sizeof(struct ceph_eversion); /* will get filled in */
363
364 /* oloc */
365 ceph_encode_8(&p, 4);
366 ceph_encode_8(&p, 4);
367 ceph_encode_32(&p, 8 + 4 + 4);
368 req->r_request_pool = p;
369 p += 8;
370 ceph_encode_32(&p, -1); /* preferred */
371 ceph_encode_32(&p, 0); /* key len */
372
373 ceph_encode_8(&p, 1);
374 req->r_request_pgid = p;
375 p += 8 + 4;
376 ceph_encode_32(&p, -1); /* preferred */
377
378 /* oid */
379 ceph_encode_32(&p, req->r_oid_len);
360 memcpy(p, req->r_oid, req->r_oid_len); 380 memcpy(p, req->r_oid, req->r_oid_len);
381 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
361 p += req->r_oid_len; 382 p += req->r_oid_len;
362 383
384 /* ops */
385 ceph_encode_16(&p, num_ops);
363 src_op = src_ops; 386 src_op = src_ops;
364 while (num_op--) 387 req->r_request_ops = p;
365 osd_req_encode_op(req, op++, src_op++); 388 for (i = 0; i < num_ops; i++, src_op++) {
389 osd_req_encode_op(req, p, src_op);
390 p += sizeof(struct ceph_osd_op);
391 }
366 392
367 if (snapc) { 393 /* snaps */
368 head->snap_seq = cpu_to_le64(snapc->seq); 394 ceph_encode_64(&p, req->r_snapid);
369 head->num_snaps = cpu_to_le32(snapc->num_snaps); 395 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
396 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
397 if (req->r_snapc) {
370 for (i = 0; i < snapc->num_snaps; i++) { 398 for (i = 0; i < snapc->num_snaps; i++) {
371 put_unaligned_le64(snapc->snaps[i], p); 399 ceph_encode_64(&p, req->r_snapc->snaps[i]);
372 p += sizeof(u64);
373 } 400 }
374 } 401 }
375 402
403 req->r_request_attempts = p;
404 p += 4;
405
376 data_len = req->r_trail.length; 406 data_len = req->r_trail.length;
377 if (flags & CEPH_OSD_FLAG_WRITE) { 407 if (flags & CEPH_OSD_FLAG_WRITE) {
378 req->r_request->hdr.data_off = cpu_to_le16(off); 408 req->r_request->hdr.data_off = cpu_to_le16(off);
@@ -385,6 +415,9 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
385 msg_size = p - msg->front.iov_base; 415 msg_size = p - msg->front.iov_base;
386 msg->front.iov_len = msg_size; 416 msg->front.iov_len = msg_size;
387 msg->hdr.front_len = cpu_to_le32(msg_size); 417 msg->hdr.front_len = cpu_to_le32(msg_size);
418
419 dout("build_request msg_size was %d num_ops %d\n", (int)msg_size,
420 num_ops);
388 return; 421 return;
389} 422}
390EXPORT_SYMBOL(ceph_osdc_build_request); 423EXPORT_SYMBOL(ceph_osdc_build_request);
@@ -991,21 +1024,22 @@ out:
991static void __send_request(struct ceph_osd_client *osdc, 1024static void __send_request(struct ceph_osd_client *osdc,
992 struct ceph_osd_request *req) 1025 struct ceph_osd_request *req)
993{ 1026{
994 struct ceph_osd_request_head *reqhead; 1027 void *p;
995
996 dout("send_request %p tid %llu to osd%d flags %d\n",
997 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
998
999 reqhead = req->r_request->front.iov_base;
1000 reqhead->snapid = cpu_to_le64(req->r_snapid);
1001 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
1002 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
1003 reqhead->reassert_version = req->r_reassert_version;
1004 1028
1005 reqhead->layout.ol_pgid.ps = cpu_to_le16(req->r_pgid.seed); 1029 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
1006 reqhead->layout.ol_pgid.pool = cpu_to_le32(req->r_pgid.pool); 1030 req, req->r_tid, req->r_osd->o_osd, req->r_flags,
1007 reqhead->layout.ol_pgid.preferred = cpu_to_le16(-1); 1031 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
1008 reqhead->layout.ol_stripe_unit = 0; 1032
1033 /* fill in message content that changes each time we send it */
1034 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1035 put_unaligned_le32(req->r_flags, req->r_request_flags);
1036 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
1037 p = req->r_request_pgid;
1038 ceph_encode_64(&p, req->r_pgid.pool);
1039 ceph_encode_32(&p, req->r_pgid.seed);
1040 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */
1041 memcpy(req->r_request_reassert_version, &req->r_reassert_version,
1042 sizeof(req->r_reassert_version));
1009 1043
1010 req->r_stamp = jiffies; 1044 req->r_stamp = jiffies;
1011 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1045 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
@@ -1105,6 +1139,26 @@ static void complete_request(struct ceph_osd_request *req)
1105 complete_all(&req->r_safe_completion); /* fsync waiter */ 1139 complete_all(&req->r_safe_completion); /* fsync waiter */
1106} 1140}
1107 1141
1142static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
1143{
1144 __u8 v;
1145
1146 ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
1147 v = ceph_decode_8(p);
1148 if (v > 1) {
1149 pr_warning("do not understand pg encoding %d > 1", v);
1150 return -EINVAL;
1151 }
1152 pgid->pool = ceph_decode_64(p);
1153 pgid->seed = ceph_decode_32(p);
1154 *p += 4;
1155 return 0;
1156
1157bad:
1158 pr_warning("incomplete pg encoding");
1159 return -EINVAL;
1160}
1161
1108/* 1162/*
1109 * handle osd op reply. either call the callback if it is specified, 1163 * handle osd op reply. either call the callback if it is specified,
1110 * or do the completion to wake up the waiting thread. 1164 * or do the completion to wake up the waiting thread.
@@ -1112,22 +1166,42 @@ static void complete_request(struct ceph_osd_request *req)
1112static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1166static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1113 struct ceph_connection *con) 1167 struct ceph_connection *con)
1114{ 1168{
1115 struct ceph_osd_reply_head *rhead = msg->front.iov_base; 1169 void *p, *end;
1116 struct ceph_osd_request *req; 1170 struct ceph_osd_request *req;
1117 u64 tid; 1171 u64 tid;
1118 int numops, object_len, flags; 1172 int object_len;
1173 int numops, payload_len, flags;
1119 s32 result; 1174 s32 result;
1175 s32 retry_attempt;
1176 struct ceph_pg pg;
1177 int err;
1178 u32 reassert_epoch;
1179 u64 reassert_version;
1180 u32 osdmap_epoch;
1181 int i;
1120 1182
1121 tid = le64_to_cpu(msg->hdr.tid); 1183 tid = le64_to_cpu(msg->hdr.tid);
1122 if (msg->front.iov_len < sizeof(*rhead)) 1184 dout("handle_reply %p tid %llu\n", msg, tid);
1123 goto bad; 1185
1124 numops = le32_to_cpu(rhead->num_ops); 1186 p = msg->front.iov_base;
1125 object_len = le32_to_cpu(rhead->object_len); 1187 end = p + msg->front.iov_len;
1126 result = le32_to_cpu(rhead->result); 1188
1127 if (msg->front.iov_len != sizeof(*rhead) + object_len + 1189 ceph_decode_need(&p, end, 4, bad);
1128 numops * sizeof(struct ceph_osd_op)) 1190 object_len = ceph_decode_32(&p);
1191 ceph_decode_need(&p, end, object_len, bad);
1192 p += object_len;
1193
1194 err = __decode_pgid(&p, end, &pg);
1195 if (err)
1129 goto bad; 1196 goto bad;
1130 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); 1197
1198 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
1199 flags = ceph_decode_64(&p);
1200 result = ceph_decode_32(&p);
1201 reassert_epoch = ceph_decode_32(&p);
1202 reassert_version = ceph_decode_64(&p);
1203 osdmap_epoch = ceph_decode_32(&p);
1204
1131 /* lookup */ 1205 /* lookup */
1132 mutex_lock(&osdc->request_mutex); 1206 mutex_lock(&osdc->request_mutex);
1133 req = __lookup_request(osdc, tid); 1207 req = __lookup_request(osdc, tid);
@@ -1137,7 +1211,38 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1137 return; 1211 return;
1138 } 1212 }
1139 ceph_osdc_get_request(req); 1213 ceph_osdc_get_request(req);
1140 flags = le32_to_cpu(rhead->flags); 1214
1215 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
1216 req, result);
1217
1218 ceph_decode_need(&p, end, 4, bad);
1219 numops = ceph_decode_32(&p);
1220 if (numops > CEPH_OSD_MAX_OP)
1221 goto bad_put;
1222 if (numops != req->r_num_ops)
1223 goto bad_put;
1224 payload_len = 0;
1225 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
1226 for (i = 0; i < numops; i++) {
1227 struct ceph_osd_op *op = p;
1228 int len;
1229
1230 len = le32_to_cpu(op->payload_len);
1231 req->r_reply_op_len[i] = len;
1232 dout(" op %d has %d bytes\n", i, len);
1233 payload_len += len;
1234 p += sizeof(*op);
1235 }
1236 if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
1237 pr_warning("sum of op payload lens %d != data_len %d",
1238 payload_len, le32_to_cpu(msg->hdr.data_len));
1239 goto bad_put;
1240 }
1241
1242 ceph_decode_need(&p, end, 4 + numops * 4, bad);
1243 retry_attempt = ceph_decode_32(&p);
1244 for (i = 0; i < numops; i++)
1245 req->r_reply_op_result[i] = ceph_decode_32(&p);
1141 1246
1142 /* 1247 /*
1143 * if this connection filled our message, drop our reference now, to 1248 * if this connection filled our message, drop our reference now, to
@@ -1152,7 +1257,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1152 if (!req->r_got_reply) { 1257 if (!req->r_got_reply) {
1153 unsigned int bytes; 1258 unsigned int bytes;
1154 1259
1155 req->r_result = le32_to_cpu(rhead->result); 1260 req->r_result = result;
1156 bytes = le32_to_cpu(msg->hdr.data_len); 1261 bytes = le32_to_cpu(msg->hdr.data_len);
1157 dout("handle_reply result %d bytes %d\n", req->r_result, 1262 dout("handle_reply result %d bytes %d\n", req->r_result,
1158 bytes); 1263 bytes);
@@ -1160,7 +1265,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1160 req->r_result = bytes; 1265 req->r_result = bytes;
1161 1266
1162 /* in case this is a write and we need to replay, */ 1267 /* in case this is a write and we need to replay, */
1163 req->r_reassert_version = rhead->reassert_version; 1268 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
1269 req->r_reassert_version.version = cpu_to_le64(reassert_version);
1164 1270
1165 req->r_got_reply = 1; 1271 req->r_got_reply = 1;
1166 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1272 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
@@ -1195,10 +1301,11 @@ done:
1195 ceph_osdc_put_request(req); 1301 ceph_osdc_put_request(req);
1196 return; 1302 return;
1197 1303
1304bad_put:
1305 ceph_osdc_put_request(req);
1198bad: 1306bad:
1199 pr_err("corrupt osd_op_reply got %d %d expected %d\n", 1307 pr_err("corrupt osd_op_reply got %d %d\n",
1200 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len), 1308 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
1201 (int)sizeof(*rhead));
1202 ceph_msg_dump(msg); 1309 ceph_msg_dump(msg);
1203} 1310}
1204 1311