aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-02-25 19:11:12 -0500
committerSage Weil <sage@inktank.com>2013-02-26 18:02:50 -0500
commit1b83bef24c6746a146d39915a18fb5425f2facb0 (patch)
treea765aeb136f4c7e354c01314e5fdfb776d503fb7
parent2169aea649c08374bec7d220a3b8f64712275356 (diff)
libceph: update osd request/reply encoding
Use the new version of the encoding for osd requests and replies. In the process, update the way we are tracking request ops and reply lengths and results in the struct ceph_osd_request. Update the rbd and fs/ceph users appropriately. The main changes are: - we keep pointers into the request memory for fields we need to update each time the request is sent out over the wire - we keep information about the result in an array in the request struct where the users can easily get at it. Signed-off-by: Sage Weil <sage@inktank.com> Reviewed-by: Alex Elder <elder@inktank.com>
-rw-r--r--drivers/block/rbd.c52
-rw-r--r--fs/ceph/addr.c31
-rw-r--r--include/linux/ceph/osd_client.h19
-rw-r--r--include/linux/ceph/rados.h38
-rw-r--r--net/ceph/debugfs.c18
-rw-r--r--net/ceph/osd_client.c233
6 files changed, 222 insertions, 169 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 22085e86a409..6c81a4c040b9 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -196,7 +196,7 @@ struct rbd_obj_request {
196 196
197 u64 xferred; /* bytes transferred */ 197 u64 xferred; /* bytes transferred */
198 u64 version; 198 u64 version;
199 s32 result; 199 int result;
200 atomic_t done; 200 atomic_t done;
201 201
202 rbd_obj_callback_t callback; 202 rbd_obj_callback_t callback;
@@ -1282,12 +1282,19 @@ static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1282 1282
1283static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) 1283static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1284{ 1284{
1285
1286 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request, 1285 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1287 obj_request->result, obj_request->xferred, obj_request->length); 1286 obj_request->result, obj_request->xferred, obj_request->length);
1288 if (obj_request->result == (s32) -ENOENT) { 1287 /*
1288 * ENOENT means a hole in the object. We zero-fill the
1289 * entire length of the request. A short read also implies
1290 * zero-fill to the end of the request. Either way we
1291 * update the xferred count to indicate the whole request
1292 * was satisfied.
1293 */
1294 if (obj_request->result == -ENOENT) {
1289 zero_bio_chain(obj_request->bio_list, 0); 1295 zero_bio_chain(obj_request->bio_list, 0);
1290 obj_request->result = 0; 1296 obj_request->result = 0;
1297 obj_request->xferred = obj_request->length;
1291 } else if (obj_request->xferred < obj_request->length && 1298 } else if (obj_request->xferred < obj_request->length &&
1292 !obj_request->result) { 1299 !obj_request->result) {
1293 zero_bio_chain(obj_request->bio_list, obj_request->xferred); 1300 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
@@ -1298,20 +1305,14 @@ static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1298 1305
1299static void rbd_osd_write_callback(struct rbd_obj_request *obj_request) 1306static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1300{ 1307{
1301 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request, 1308 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1302 obj_request->result, obj_request->xferred, obj_request->length); 1309 obj_request->result, obj_request->length);
1303 1310 /*
1304 /* A short write really shouldn't occur. Warn if we see one */ 1311 * There is no such thing as a successful short write.
1305 1312 * Our xferred value is the number of bytes transferred
1306 if (obj_request->xferred != obj_request->length) { 1313 * back. Set it to our originally-requested length.
1307 struct rbd_img_request *img_request = obj_request->img_request; 1314 */
1308 struct rbd_device *rbd_dev; 1315 obj_request->xferred = obj_request->length;
1309
1310 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1311 rbd_warn(rbd_dev, "wrote %llu want %llu\n",
1312 obj_request->xferred, obj_request->length);
1313 }
1314
1315 obj_request_done_set(obj_request); 1316 obj_request_done_set(obj_request);
1316} 1317}
1317 1318
@@ -1329,9 +1330,6 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1329 struct ceph_msg *msg) 1330 struct ceph_msg *msg)
1330{ 1331{
1331 struct rbd_obj_request *obj_request = osd_req->r_priv; 1332 struct rbd_obj_request *obj_request = osd_req->r_priv;
1332 struct ceph_osd_reply_head *reply_head;
1333 struct ceph_osd_op *op;
1334 u32 num_ops;
1335 u16 opcode; 1333 u16 opcode;
1336 1334
1337 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg); 1335 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
@@ -1339,22 +1337,19 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1339 rbd_assert(!!obj_request->img_request ^ 1337 rbd_assert(!!obj_request->img_request ^
1340 (obj_request->which == BAD_WHICH)); 1338 (obj_request->which == BAD_WHICH));
1341 1339
1342 reply_head = msg->front.iov_base; 1340 if (osd_req->r_result < 0)
1343 obj_request->result = (s32) le32_to_cpu(reply_head->result); 1341 obj_request->result = osd_req->r_result;
1344 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version); 1342 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1345 1343
1346 num_ops = le32_to_cpu(reply_head->num_ops); 1344 WARN_ON(osd_req->r_num_ops != 1); /* For now */
1347 WARN_ON(num_ops != 1); /* For now */
1348 1345
1349 /* 1346 /*
1350 * We support a 64-bit length, but ultimately it has to be 1347 * We support a 64-bit length, but ultimately it has to be
1351 * passed to blk_end_request(), which takes an unsigned int. 1348 * passed to blk_end_request(), which takes an unsigned int.
1352 */ 1349 */
1353 op = &reply_head->ops[0]; 1350 obj_request->xferred = osd_req->r_reply_op_len[0];
1354 obj_request->xferred = le64_to_cpu(op->extent.length);
1355 rbd_assert(obj_request->xferred < (u64) UINT_MAX); 1351 rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1356 1352 opcode = osd_req->r_request_ops[0].op;
1357 opcode = le16_to_cpu(op->op);
1358 switch (opcode) { 1353 switch (opcode) {
1359 case CEPH_OSD_OP_READ: 1354 case CEPH_OSD_OP_READ:
1360 rbd_osd_read_callback(obj_request); 1355 rbd_osd_read_callback(obj_request);
@@ -1719,6 +1714,7 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1719 more = blk_end_request(img_request->rq, result, xferred); 1714 more = blk_end_request(img_request->rq, result, xferred);
1720 which++; 1715 which++;
1721 } 1716 }
1717
1722 rbd_assert(more ^ (which == img_request->obj_request_count)); 1718 rbd_assert(more ^ (which == img_request->obj_request_count));
1723 img_request->next_completion = which; 1719 img_request->next_completion = which;
1724out: 1720out:
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index fc613715af46..cfef3e01a9b3 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -236,16 +236,10 @@ static int ceph_readpage(struct file *filp, struct page *page)
236static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) 236static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
237{ 237{
238 struct inode *inode = req->r_inode; 238 struct inode *inode = req->r_inode;
239 struct ceph_osd_reply_head *replyhead; 239 int rc = req->r_result;
240 int rc, bytes; 240 int bytes = le32_to_cpu(msg->hdr.data_len);
241 int i; 241 int i;
242 242
243 /* parse reply */
244 replyhead = msg->front.iov_base;
245 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
246 rc = le32_to_cpu(replyhead->result);
247 bytes = le32_to_cpu(msg->hdr.data_len);
248
249 dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); 243 dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
250 244
251 /* unlock all pages, zeroing any data we didn't read */ 245 /* unlock all pages, zeroing any data we didn't read */
@@ -553,27 +547,18 @@ static void writepages_finish(struct ceph_osd_request *req,
553 struct ceph_msg *msg) 547 struct ceph_msg *msg)
554{ 548{
555 struct inode *inode = req->r_inode; 549 struct inode *inode = req->r_inode;
556 struct ceph_osd_reply_head *replyhead;
557 struct ceph_osd_op *op;
558 struct ceph_inode_info *ci = ceph_inode(inode); 550 struct ceph_inode_info *ci = ceph_inode(inode);
559 unsigned wrote; 551 unsigned wrote;
560 struct page *page; 552 struct page *page;
561 int i; 553 int i;
562 struct ceph_snap_context *snapc = req->r_snapc; 554 struct ceph_snap_context *snapc = req->r_snapc;
563 struct address_space *mapping = inode->i_mapping; 555 struct address_space *mapping = inode->i_mapping;
564 __s32 rc = -EIO; 556 int rc = req->r_result;
565 u64 bytes = 0; 557 u64 bytes = le64_to_cpu(req->r_request_ops[0].extent.length);
566 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 558 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
567 long writeback_stat; 559 long writeback_stat;
568 unsigned issued = ceph_caps_issued(ci); 560 unsigned issued = ceph_caps_issued(ci);
569 561
570 /* parse reply */
571 replyhead = msg->front.iov_base;
572 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
573 op = (void *)(replyhead + 1);
574 rc = le32_to_cpu(replyhead->result);
575 bytes = le64_to_cpu(op->extent.length);
576
577 if (rc >= 0) { 562 if (rc >= 0) {
578 /* 563 /*
579 * Assume we wrote the pages we originally sent. The 564 * Assume we wrote the pages we originally sent. The
@@ -740,8 +725,6 @@ retry:
740 struct page *page; 725 struct page *page;
741 int want; 726 int want;
742 u64 offset, len; 727 u64 offset, len;
743 struct ceph_osd_request_head *reqhead;
744 struct ceph_osd_op *op;
745 long writeback_stat; 728 long writeback_stat;
746 729
747 next = 0; 730 next = 0;
@@ -905,10 +888,8 @@ get_more_pages:
905 888
906 /* revise final length, page count */ 889 /* revise final length, page count */
907 req->r_num_pages = locked_pages; 890 req->r_num_pages = locked_pages;
908 reqhead = req->r_request->front.iov_base; 891 req->r_request_ops[0].extent.length = cpu_to_le64(len);
909 op = (void *)(reqhead + 1); 892 req->r_request_ops[0].payload_len = cpu_to_le32(len);
910 op->extent.length = cpu_to_le64(len);
911 op->payload_len = cpu_to_le32(len);
912 req->r_request->hdr.data_len = cpu_to_le32(len); 893 req->r_request->hdr.data_len = cpu_to_le32(len);
913 894
914 rc = ceph_osdc_start_request(&fsc->client->osdc, req, true); 895 rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index ad8899fc3157..1dd5d466b6f9 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -47,6 +47,9 @@ struct ceph_osd {
47 struct list_head o_keepalive_item; 47 struct list_head o_keepalive_item;
48}; 48};
49 49
50
51#define CEPH_OSD_MAX_OP 10
52
50/* an in-flight request */ 53/* an in-flight request */
51struct ceph_osd_request { 54struct ceph_osd_request {
52 u64 r_tid; /* unique for this client */ 55 u64 r_tid; /* unique for this client */
@@ -63,9 +66,23 @@ struct ceph_osd_request {
63 struct ceph_connection *r_con_filling_msg; 66 struct ceph_connection *r_con_filling_msg;
64 67
65 struct ceph_msg *r_request, *r_reply; 68 struct ceph_msg *r_request, *r_reply;
66 int r_result;
67 int r_flags; /* any additional flags for the osd */ 69 int r_flags; /* any additional flags for the osd */
68 u32 r_sent; /* >0 if r_request is sending/sent */ 70 u32 r_sent; /* >0 if r_request is sending/sent */
71 int r_num_ops;
72
73 /* encoded message content */
74 struct ceph_osd_op *r_request_ops;
75 /* these are updated on each send */
76 __le32 *r_request_osdmap_epoch;
77 __le32 *r_request_flags;
78 __le64 *r_request_pool;
79 void *r_request_pgid;
80 __le32 *r_request_attempts;
81 struct ceph_eversion *r_request_reassert_version;
82
83 int r_result;
84 int r_reply_op_len[CEPH_OSD_MAX_OP];
85 s32 r_reply_op_result[CEPH_OSD_MAX_OP];
69 int r_got_reply; 86 int r_got_reply;
70 int r_linger; 87 int r_linger;
71 88
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index d784c8dfb09a..68c96a508ac2 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -416,43 +416,5 @@ struct ceph_osd_op {
416 __le32 payload_len; 416 __le32 payload_len;
417} __attribute__ ((packed)); 417} __attribute__ ((packed));
418 418
419/*
420 * osd request message header. each request may include multiple
421 * ceph_osd_op object operations.
422 */
423struct ceph_osd_request_head {
424 __le32 client_inc; /* client incarnation */
425 struct ceph_object_layout layout; /* pgid */
426 __le32 osdmap_epoch; /* client's osdmap epoch */
427
428 __le32 flags;
429
430 struct ceph_timespec mtime; /* for mutations only */
431 struct ceph_eversion reassert_version; /* if we are replaying op */
432
433 __le32 object_len; /* length of object name */
434
435 __le64 snapid; /* snapid to read */
436 __le64 snap_seq; /* writer's snap context */
437 __le32 num_snaps;
438
439 __le16 num_ops;
440 struct ceph_osd_op ops[]; /* followed by ops[], obj, ticket, snaps */
441} __attribute__ ((packed));
442
443struct ceph_osd_reply_head {
444 __le32 client_inc; /* client incarnation */
445 __le32 flags;
446 struct ceph_object_layout layout;
447 __le32 osdmap_epoch;
448 struct ceph_eversion reassert_version; /* for replaying uncommitted */
449
450 __le32 result; /* result code */
451
452 __le32 object_len; /* length of object name */
453 __le32 num_ops;
454 struct ceph_osd_op ops[0]; /* ops[], object */
455} __attribute__ ((packed));
456
457 419
458#endif 420#endif
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index f4d4b27d6026..00d051f4894e 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -123,10 +123,7 @@ static int osdc_show(struct seq_file *s, void *pp)
123 mutex_lock(&osdc->request_mutex); 123 mutex_lock(&osdc->request_mutex);
124 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { 124 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
125 struct ceph_osd_request *req; 125 struct ceph_osd_request *req;
126 struct ceph_osd_request_head *head; 126 int opcode;
127 struct ceph_osd_op *op;
128 int num_ops;
129 int opcode, olen;
130 int i; 127 int i;
131 128
132 req = rb_entry(p, struct ceph_osd_request, r_node); 129 req = rb_entry(p, struct ceph_osd_request, r_node);
@@ -135,13 +132,7 @@ static int osdc_show(struct seq_file *s, void *pp)
135 req->r_osd ? req->r_osd->o_osd : -1, 132 req->r_osd ? req->r_osd->o_osd : -1,
136 req->r_pgid.pool, req->r_pgid.seed); 133 req->r_pgid.pool, req->r_pgid.seed);
137 134
138 head = req->r_request->front.iov_base; 135 seq_printf(s, "%.*s", req->r_oid_len, req->r_oid);
139 op = (void *)(head + 1);
140
141 num_ops = le16_to_cpu(head->num_ops);
142 olen = le32_to_cpu(head->object_len);
143 seq_printf(s, "%.*s", olen,
144 (const char *)(head->ops + num_ops));
145 136
146 if (req->r_reassert_version.epoch) 137 if (req->r_reassert_version.epoch)
147 seq_printf(s, "\t%u'%llu", 138 seq_printf(s, "\t%u'%llu",
@@ -150,10 +141,9 @@ static int osdc_show(struct seq_file *s, void *pp)
150 else 141 else
151 seq_printf(s, "\t"); 142 seq_printf(s, "\t");
152 143
153 for (i = 0; i < num_ops; i++) { 144 for (i = 0; i < req->r_num_ops; i++) {
154 opcode = le16_to_cpu(op->op); 145 opcode = le16_to_cpu(req->r_request_ops[i].op);
155 seq_printf(s, "\t%s", ceph_osd_op_name(opcode)); 146 seq_printf(s, "\t%s", ceph_osd_op_name(opcode));
156 op++;
157 } 147 }
158 148
159 seq_printf(s, "\n"); 149 seq_printf(s, "\n");
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 5584f0a08e28..d730dd4d8eb2 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -146,15 +146,23 @@ EXPORT_SYMBOL(ceph_osdc_release_request);
146 146
147struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 147struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
148 struct ceph_snap_context *snapc, 148 struct ceph_snap_context *snapc,
149 unsigned int num_op, 149 unsigned int num_ops,
150 bool use_mempool, 150 bool use_mempool,
151 gfp_t gfp_flags) 151 gfp_t gfp_flags)
152{ 152{
153 struct ceph_osd_request *req; 153 struct ceph_osd_request *req;
154 struct ceph_msg *msg; 154 struct ceph_msg *msg;
155 size_t msg_size = sizeof(struct ceph_osd_request_head); 155 size_t msg_size;
156 156
157 msg_size += num_op*sizeof(struct ceph_osd_op); 157 msg_size = 4 + 4 + 8 + 8 + 4+8;
158 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
159 msg_size += 1 + 8 + 4 + 4; /* pg_t */
160 msg_size += 4 + MAX_OBJ_NAME_SIZE;
161 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
162 msg_size += 8; /* snapid */
163 msg_size += 8; /* snap_seq */
164 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
165 msg_size += 4;
158 166
159 if (use_mempool) { 167 if (use_mempool) {
160 req = mempool_alloc(osdc->req_mempool, gfp_flags); 168 req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -193,9 +201,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
193 ceph_pagelist_init(&req->r_trail); 201 ceph_pagelist_init(&req->r_trail);
194 202
195 /* create request message; allow space for oid */ 203 /* create request message; allow space for oid */
196 msg_size += MAX_OBJ_NAME_SIZE;
197 if (snapc)
198 msg_size += sizeof(u64) * snapc->num_snaps;
199 if (use_mempool) 204 if (use_mempool)
200 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 205 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
201 else 206 else
@@ -324,55 +329,80 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
324 * 329 *
325 */ 330 */
326void ceph_osdc_build_request(struct ceph_osd_request *req, 331void ceph_osdc_build_request(struct ceph_osd_request *req,
327 u64 off, u64 len, unsigned int num_op, 332 u64 off, u64 len, unsigned int num_ops,
328 struct ceph_osd_req_op *src_ops, 333 struct ceph_osd_req_op *src_ops,
329 struct ceph_snap_context *snapc, u64 snap_id, 334 struct ceph_snap_context *snapc, u64 snap_id,
330 struct timespec *mtime) 335 struct timespec *mtime)
331{ 336{
332 struct ceph_msg *msg = req->r_request; 337 struct ceph_msg *msg = req->r_request;
333 struct ceph_osd_request_head *head;
334 struct ceph_osd_req_op *src_op; 338 struct ceph_osd_req_op *src_op;
335 struct ceph_osd_op *op;
336 void *p; 339 void *p;
337 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 340 size_t msg_size;
338 int flags = req->r_flags; 341 int flags = req->r_flags;
339 u64 data_len; 342 u64 data_len;
340 int i; 343 int i;
341 344
342 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); 345 req->r_num_ops = num_ops;
343 346 req->r_snapid = snap_id;
344 head = msg->front.iov_base;
345 head->snapid = cpu_to_le64(snap_id);
346 op = (void *)(head + 1);
347 p = (void *)(op + num_op);
348
349 req->r_snapc = ceph_get_snap_context(snapc); 347 req->r_snapc = ceph_get_snap_context(snapc);
350 348
351 head->client_inc = cpu_to_le32(1); /* always, for now. */ 349 /* encode request */
352 head->flags = cpu_to_le32(flags); 350 msg->hdr.version = cpu_to_le16(4);
353 if (flags & CEPH_OSD_FLAG_WRITE)
354 ceph_encode_timespec(&head->mtime, mtime);
355 BUG_ON(num_op > (unsigned int) ((u16) -1));
356 head->num_ops = cpu_to_le16(num_op);
357 351
358 /* fill in oid */ 352 p = msg->front.iov_base;
359 head->object_len = cpu_to_le32(req->r_oid_len); 353 ceph_encode_32(&p, 1); /* client_inc is always 1 */
354 req->r_request_osdmap_epoch = p;
355 p += 4;
356 req->r_request_flags = p;
357 p += 4;
358 if (req->r_flags & CEPH_OSD_FLAG_WRITE)
359 ceph_encode_timespec(p, mtime);
360 p += sizeof(struct ceph_timespec);
361 req->r_request_reassert_version = p;
362 p += sizeof(struct ceph_eversion); /* will get filled in */
363
364 /* oloc */
365 ceph_encode_8(&p, 4);
366 ceph_encode_8(&p, 4);
367 ceph_encode_32(&p, 8 + 4 + 4);
368 req->r_request_pool = p;
369 p += 8;
370 ceph_encode_32(&p, -1); /* preferred */
371 ceph_encode_32(&p, 0); /* key len */
372
373 ceph_encode_8(&p, 1);
374 req->r_request_pgid = p;
375 p += 8 + 4;
376 ceph_encode_32(&p, -1); /* preferred */
377
378 /* oid */
379 ceph_encode_32(&p, req->r_oid_len);
360 memcpy(p, req->r_oid, req->r_oid_len); 380 memcpy(p, req->r_oid, req->r_oid_len);
381 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
361 p += req->r_oid_len; 382 p += req->r_oid_len;
362 383
384 /* ops */
385 ceph_encode_16(&p, num_ops);
363 src_op = src_ops; 386 src_op = src_ops;
364 while (num_op--) 387 req->r_request_ops = p;
365 osd_req_encode_op(req, op++, src_op++); 388 for (i = 0; i < num_ops; i++, src_op++) {
389 osd_req_encode_op(req, p, src_op);
390 p += sizeof(struct ceph_osd_op);
391 }
366 392
367 if (snapc) { 393 /* snaps */
368 head->snap_seq = cpu_to_le64(snapc->seq); 394 ceph_encode_64(&p, req->r_snapid);
369 head->num_snaps = cpu_to_le32(snapc->num_snaps); 395 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
396 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
397 if (req->r_snapc) {
370 for (i = 0; i < snapc->num_snaps; i++) { 398 for (i = 0; i < snapc->num_snaps; i++) {
371 put_unaligned_le64(snapc->snaps[i], p); 399 ceph_encode_64(&p, req->r_snapc->snaps[i]);
372 p += sizeof(u64);
373 } 400 }
374 } 401 }
375 402
403 req->r_request_attempts = p;
404 p += 4;
405
376 data_len = req->r_trail.length; 406 data_len = req->r_trail.length;
377 if (flags & CEPH_OSD_FLAG_WRITE) { 407 if (flags & CEPH_OSD_FLAG_WRITE) {
378 req->r_request->hdr.data_off = cpu_to_le16(off); 408 req->r_request->hdr.data_off = cpu_to_le16(off);
@@ -385,6 +415,9 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
385 msg_size = p - msg->front.iov_base; 415 msg_size = p - msg->front.iov_base;
386 msg->front.iov_len = msg_size; 416 msg->front.iov_len = msg_size;
387 msg->hdr.front_len = cpu_to_le32(msg_size); 417 msg->hdr.front_len = cpu_to_le32(msg_size);
418
419 dout("build_request msg_size was %d num_ops %d\n", (int)msg_size,
420 num_ops);
388 return; 421 return;
389} 422}
390EXPORT_SYMBOL(ceph_osdc_build_request); 423EXPORT_SYMBOL(ceph_osdc_build_request);
@@ -991,21 +1024,22 @@ out:
991static void __send_request(struct ceph_osd_client *osdc, 1024static void __send_request(struct ceph_osd_client *osdc,
992 struct ceph_osd_request *req) 1025 struct ceph_osd_request *req)
993{ 1026{
994 struct ceph_osd_request_head *reqhead; 1027 void *p;
995
996 dout("send_request %p tid %llu to osd%d flags %d\n",
997 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
998
999 reqhead = req->r_request->front.iov_base;
1000 reqhead->snapid = cpu_to_le64(req->r_snapid);
1001 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
1002 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
1003 reqhead->reassert_version = req->r_reassert_version;
1004 1028
1005 reqhead->layout.ol_pgid.ps = cpu_to_le16(req->r_pgid.seed); 1029 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
1006 reqhead->layout.ol_pgid.pool = cpu_to_le32(req->r_pgid.pool); 1030 req, req->r_tid, req->r_osd->o_osd, req->r_flags,
1007 reqhead->layout.ol_pgid.preferred = cpu_to_le16(-1); 1031 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
1008 reqhead->layout.ol_stripe_unit = 0; 1032
1033 /* fill in message content that changes each time we send it */
1034 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1035 put_unaligned_le32(req->r_flags, req->r_request_flags);
1036 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
1037 p = req->r_request_pgid;
1038 ceph_encode_64(&p, req->r_pgid.pool);
1039 ceph_encode_32(&p, req->r_pgid.seed);
1040 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */
1041 memcpy(req->r_request_reassert_version, &req->r_reassert_version,
1042 sizeof(req->r_reassert_version));
1009 1043
1010 req->r_stamp = jiffies; 1044 req->r_stamp = jiffies;
1011 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1045 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
@@ -1105,6 +1139,26 @@ static void complete_request(struct ceph_osd_request *req)
1105 complete_all(&req->r_safe_completion); /* fsync waiter */ 1139 complete_all(&req->r_safe_completion); /* fsync waiter */
1106} 1140}
1107 1141
1142static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
1143{
1144 __u8 v;
1145
1146 ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
1147 v = ceph_decode_8(p);
1148 if (v > 1) {
1149 pr_warning("do not understand pg encoding %d > 1", v);
1150 return -EINVAL;
1151 }
1152 pgid->pool = ceph_decode_64(p);
1153 pgid->seed = ceph_decode_32(p);
1154 *p += 4;
1155 return 0;
1156
1157bad:
1158 pr_warning("incomplete pg encoding");
1159 return -EINVAL;
1160}
1161
1108/* 1162/*
1109 * handle osd op reply. either call the callback if it is specified, 1163 * handle osd op reply. either call the callback if it is specified,
1110 * or do the completion to wake up the waiting thread. 1164 * or do the completion to wake up the waiting thread.
@@ -1112,22 +1166,42 @@ static void complete_request(struct ceph_osd_request *req)
1112static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1166static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1113 struct ceph_connection *con) 1167 struct ceph_connection *con)
1114{ 1168{
1115 struct ceph_osd_reply_head *rhead = msg->front.iov_base; 1169 void *p, *end;
1116 struct ceph_osd_request *req; 1170 struct ceph_osd_request *req;
1117 u64 tid; 1171 u64 tid;
1118 int numops, object_len, flags; 1172 int object_len;
1173 int numops, payload_len, flags;
1119 s32 result; 1174 s32 result;
1175 s32 retry_attempt;
1176 struct ceph_pg pg;
1177 int err;
1178 u32 reassert_epoch;
1179 u64 reassert_version;
1180 u32 osdmap_epoch;
1181 int i;
1120 1182
1121 tid = le64_to_cpu(msg->hdr.tid); 1183 tid = le64_to_cpu(msg->hdr.tid);
1122 if (msg->front.iov_len < sizeof(*rhead)) 1184 dout("handle_reply %p tid %llu\n", msg, tid);
1123 goto bad; 1185
1124 numops = le32_to_cpu(rhead->num_ops); 1186 p = msg->front.iov_base;
1125 object_len = le32_to_cpu(rhead->object_len); 1187 end = p + msg->front.iov_len;
1126 result = le32_to_cpu(rhead->result); 1188
1127 if (msg->front.iov_len != sizeof(*rhead) + object_len + 1189 ceph_decode_need(&p, end, 4, bad);
1128 numops * sizeof(struct ceph_osd_op)) 1190 object_len = ceph_decode_32(&p);
1191 ceph_decode_need(&p, end, object_len, bad);
1192 p += object_len;
1193
1194 err = __decode_pgid(&p, end, &pg);
1195 if (err)
1129 goto bad; 1196 goto bad;
1130 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); 1197
1198 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
1199 flags = ceph_decode_64(&p);
1200 result = ceph_decode_32(&p);
1201 reassert_epoch = ceph_decode_32(&p);
1202 reassert_version = ceph_decode_64(&p);
1203 osdmap_epoch = ceph_decode_32(&p);
1204
1131 /* lookup */ 1205 /* lookup */
1132 mutex_lock(&osdc->request_mutex); 1206 mutex_lock(&osdc->request_mutex);
1133 req = __lookup_request(osdc, tid); 1207 req = __lookup_request(osdc, tid);
@@ -1137,7 +1211,38 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1137 return; 1211 return;
1138 } 1212 }
1139 ceph_osdc_get_request(req); 1213 ceph_osdc_get_request(req);
1140 flags = le32_to_cpu(rhead->flags); 1214
1215 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
1216 req, result);
1217
1218 ceph_decode_need(&p, end, 4, bad);
1219 numops = ceph_decode_32(&p);
1220 if (numops > CEPH_OSD_MAX_OP)
1221 goto bad_put;
1222 if (numops != req->r_num_ops)
1223 goto bad_put;
1224 payload_len = 0;
1225 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
1226 for (i = 0; i < numops; i++) {
1227 struct ceph_osd_op *op = p;
1228 int len;
1229
1230 len = le32_to_cpu(op->payload_len);
1231 req->r_reply_op_len[i] = len;
1232 dout(" op %d has %d bytes\n", i, len);
1233 payload_len += len;
1234 p += sizeof(*op);
1235 }
1236 if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
1237 pr_warning("sum of op payload lens %d != data_len %d",
1238 payload_len, le32_to_cpu(msg->hdr.data_len));
1239 goto bad_put;
1240 }
1241
1242 ceph_decode_need(&p, end, 4 + numops * 4, bad);
1243 retry_attempt = ceph_decode_32(&p);
1244 for (i = 0; i < numops; i++)
1245 req->r_reply_op_result[i] = ceph_decode_32(&p);
1141 1246
1142 /* 1247 /*
1143 * if this connection filled our message, drop our reference now, to 1248 * if this connection filled our message, drop our reference now, to
@@ -1152,7 +1257,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1152 if (!req->r_got_reply) { 1257 if (!req->r_got_reply) {
1153 unsigned int bytes; 1258 unsigned int bytes;
1154 1259
1155 req->r_result = le32_to_cpu(rhead->result); 1260 req->r_result = result;
1156 bytes = le32_to_cpu(msg->hdr.data_len); 1261 bytes = le32_to_cpu(msg->hdr.data_len);
1157 dout("handle_reply result %d bytes %d\n", req->r_result, 1262 dout("handle_reply result %d bytes %d\n", req->r_result,
1158 bytes); 1263 bytes);
@@ -1160,7 +1265,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1160 req->r_result = bytes; 1265 req->r_result = bytes;
1161 1266
1162 /* in case this is a write and we need to replay, */ 1267 /* in case this is a write and we need to replay, */
1163 req->r_reassert_version = rhead->reassert_version; 1268 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
1269 req->r_reassert_version.version = cpu_to_le64(reassert_version);
1164 1270
1165 req->r_got_reply = 1; 1271 req->r_got_reply = 1;
1166 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1272 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
@@ -1195,10 +1301,11 @@ done:
1195 ceph_osdc_put_request(req); 1301 ceph_osdc_put_request(req);
1196 return; 1302 return;
1197 1303
1304bad_put:
1305 ceph_osdc_put_request(req);
1198bad: 1306bad:
1199 pr_err("corrupt osd_op_reply got %d %d expected %d\n", 1307 pr_err("corrupt osd_op_reply got %d %d\n",
1200 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len), 1308 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
1201 (int)sizeof(*rhead));
1202 ceph_msg_dump(msg); 1309 ceph_msg_dump(msg);
1203} 1310}
1204 1311