aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2016-05-25 18:29:52 -0400
committerIlya Dryomov <idryomov@gmail.com>2016-05-25 18:36:27 -0400
commitbb873b539154ab51893430b4ad6ba4051775276a (patch)
tree9415938962eddb3d83e89bfa00eaab803b196bb6 /net/ceph
parenta66dd38309f5d9c66ec9bc7911ff8da8cc37bb9f (diff)
libceph: switch to calc_target(), part 2
The crux of this is getting rid of ceph_osdc_build_request(), so that MOSDOp can be encoded not before but after calc_target() calculates the actual target. Encoding now happens within ceph_osdc_start_request(). Also nuked is the accompanying bunch of pointers into the encoded buffer that was used to update fields on each send - instead, the entire front is re-encoded. If we want to support target->name_len != base->name_len in the future, there is no other way, because oid is surrounded by other fields in the encoded buffer. Encoding OSD ops and adding data items to the request message were mixed together in osd_req_encode_op(). While we want to re-encode OSD ops, we don't want to add duplicate data items to the message when resending, so all call to ceph_osdc_msg_data_add() are factored out into a new setup_request_data(). Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/debugfs.c61
-rw-r--r--net/ceph/osd_client.c355
2 files changed, 216 insertions, 200 deletions
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 0c11ab5f8c30..6d3ff713edeb 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -145,6 +145,43 @@ static int monc_show(struct seq_file *s, void *p)
145 return 0; 145 return 0;
146} 146}
147 147
148static void dump_target(struct seq_file *s, struct ceph_osd_request_target *t)
149{
150 int i;
151
152 seq_printf(s, "osd%d\t%llu.%x\t[", t->osd, t->pgid.pool, t->pgid.seed);
153 for (i = 0; i < t->up.size; i++)
154 seq_printf(s, "%s%d", (!i ? "" : ","), t->up.osds[i]);
155 seq_printf(s, "]/%d\t[", t->up.primary);
156 for (i = 0; i < t->acting.size; i++)
157 seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]);
158 seq_printf(s, "]/%d\t%*pE\t0x%x", t->acting.primary,
159 t->target_oid.name_len, t->target_oid.name, t->flags);
160 if (t->paused)
161 seq_puts(s, "\tP");
162}
163
164static void dump_request(struct seq_file *s, struct ceph_osd_request *req)
165{
166 int i;
167
168 seq_printf(s, "%llu\t", req->r_tid);
169 dump_target(s, &req->r_t);
170
171 seq_printf(s, "\t%d\t%u'%llu", req->r_attempts,
172 le32_to_cpu(req->r_replay_version.epoch),
173 le64_to_cpu(req->r_replay_version.version));
174
175 for (i = 0; i < req->r_num_ops; i++) {
176 struct ceph_osd_req_op *op = &req->r_ops[i];
177
178 seq_printf(s, "%s%s", (i == 0 ? "\t" : ","),
179 ceph_osd_op_name(op->op));
180 }
181
182 seq_putc(s, '\n');
183}
184
148static int osdc_show(struct seq_file *s, void *pp) 185static int osdc_show(struct seq_file *s, void *pp)
149{ 186{
150 struct ceph_client *client = s->private; 187 struct ceph_client *client = s->private;
@@ -154,32 +191,10 @@ static int osdc_show(struct seq_file *s, void *pp)
154 mutex_lock(&osdc->request_mutex); 191 mutex_lock(&osdc->request_mutex);
155 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { 192 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
156 struct ceph_osd_request *req; 193 struct ceph_osd_request *req;
157 unsigned int i;
158 int opcode;
159 194
160 req = rb_entry(p, struct ceph_osd_request, r_node); 195 req = rb_entry(p, struct ceph_osd_request, r_node);
161 196
162 seq_printf(s, "%lld\tosd%d\t%lld.%x\t", req->r_tid, 197 dump_request(s, req);
163 req->r_osd ? req->r_osd->o_osd : -1,
164 req->r_t.pgid.pool, req->r_t.pgid.seed);
165
166 seq_printf(s, "%*pE", req->r_base_oid.name_len,
167 req->r_base_oid.name);
168
169 if (req->r_reassert_version.epoch)
170 seq_printf(s, "\t%u'%llu",
171 (unsigned int)le32_to_cpu(req->r_reassert_version.epoch),
172 le64_to_cpu(req->r_reassert_version.version));
173 else
174 seq_printf(s, "\t");
175
176 for (i = 0; i < req->r_num_ops; i++) {
177 opcode = req->r_ops[i].op;
178 seq_printf(s, "%s%s", (i == 0 ? "\t" : ","),
179 ceph_osd_op_name(opcode));
180 }
181
182 seq_printf(s, "\n");
183 } 198 }
184 mutex_unlock(&osdc->request_mutex); 199 mutex_unlock(&osdc->request_mutex);
185 return 0; 200 return 0;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 013101598c41..8a008f083283 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -34,8 +34,6 @@ static void __unregister_request(struct ceph_osd_client *osdc,
34static void __unregister_linger_request(struct ceph_osd_client *osdc, 34static void __unregister_linger_request(struct ceph_osd_client *osdc,
35 struct ceph_osd_request *req); 35 struct ceph_osd_request *req);
36static void __enqueue_request(struct ceph_osd_request *req); 36static void __enqueue_request(struct ceph_osd_request *req);
37static void __send_request(struct ceph_osd_client *osdc,
38 struct ceph_osd_request *req);
39 37
40/* 38/*
41 * Implement client access to distributed object storage cluster. 39 * Implement client access to distributed object storage cluster.
@@ -209,6 +207,8 @@ void osd_req_op_cls_request_data_pagelist(
209 207
210 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 208 osd_data = osd_req_op_data(osd_req, which, cls, request_data);
211 ceph_osd_data_pagelist_init(osd_data, pagelist); 209 ceph_osd_data_pagelist_init(osd_data, pagelist);
210 osd_req->r_ops[which].cls.indata_len += pagelist->length;
211 osd_req->r_ops[which].indata_len += pagelist->length;
212} 212}
213EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist); 213EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
214 214
@@ -221,6 +221,8 @@ void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
221 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 221 osd_data = osd_req_op_data(osd_req, which, cls, request_data);
222 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 222 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
223 pages_from_pool, own_pages); 223 pages_from_pool, own_pages);
224 osd_req->r_ops[which].cls.indata_len += length;
225 osd_req->r_ops[which].indata_len += length;
224} 226}
225EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); 227EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
226 228
@@ -610,8 +612,6 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
610 612
611 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); 613 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
612 614
613 op->cls.argc = 0; /* currently unused */
614
615 op->indata_len = payload_len; 615 op->indata_len = payload_len;
616} 616}
617EXPORT_SYMBOL(osd_req_op_cls_init); 617EXPORT_SYMBOL(osd_req_op_cls_init);
@@ -709,16 +709,9 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
709 } 709 }
710} 710}
711 711
712static u64 osd_req_encode_op(struct ceph_osd_request *req, 712static u32 osd_req_encode_op(struct ceph_osd_op *dst,
713 struct ceph_osd_op *dst, unsigned int which) 713 const struct ceph_osd_req_op *src)
714{ 714{
715 struct ceph_osd_req_op *src;
716 struct ceph_osd_data *osd_data;
717 u64 request_data_len = 0;
718 u64 data_length;
719
720 BUG_ON(which >= req->r_num_ops);
721 src = &req->r_ops[which];
722 if (WARN_ON(!osd_req_opcode_valid(src->op))) { 715 if (WARN_ON(!osd_req_opcode_valid(src->op))) {
723 pr_err("unrecognized osd opcode %d\n", src->op); 716 pr_err("unrecognized osd opcode %d\n", src->op);
724 717
@@ -727,49 +720,23 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
727 720
728 switch (src->op) { 721 switch (src->op) {
729 case CEPH_OSD_OP_STAT: 722 case CEPH_OSD_OP_STAT:
730 osd_data = &src->raw_data_in;
731 ceph_osdc_msg_data_add(req->r_reply, osd_data);
732 break; 723 break;
733 case CEPH_OSD_OP_READ: 724 case CEPH_OSD_OP_READ:
734 case CEPH_OSD_OP_WRITE: 725 case CEPH_OSD_OP_WRITE:
735 case CEPH_OSD_OP_WRITEFULL: 726 case CEPH_OSD_OP_WRITEFULL:
736 case CEPH_OSD_OP_ZERO: 727 case CEPH_OSD_OP_ZERO:
737 case CEPH_OSD_OP_TRUNCATE: 728 case CEPH_OSD_OP_TRUNCATE:
738 if (src->op == CEPH_OSD_OP_WRITE ||
739 src->op == CEPH_OSD_OP_WRITEFULL)
740 request_data_len = src->extent.length;
741 dst->extent.offset = cpu_to_le64(src->extent.offset); 729 dst->extent.offset = cpu_to_le64(src->extent.offset);
742 dst->extent.length = cpu_to_le64(src->extent.length); 730 dst->extent.length = cpu_to_le64(src->extent.length);
743 dst->extent.truncate_size = 731 dst->extent.truncate_size =
744 cpu_to_le64(src->extent.truncate_size); 732 cpu_to_le64(src->extent.truncate_size);
745 dst->extent.truncate_seq = 733 dst->extent.truncate_seq =
746 cpu_to_le32(src->extent.truncate_seq); 734 cpu_to_le32(src->extent.truncate_seq);
747 osd_data = &src->extent.osd_data;
748 if (src->op == CEPH_OSD_OP_WRITE ||
749 src->op == CEPH_OSD_OP_WRITEFULL)
750 ceph_osdc_msg_data_add(req->r_request, osd_data);
751 else
752 ceph_osdc_msg_data_add(req->r_reply, osd_data);
753 break; 735 break;
754 case CEPH_OSD_OP_CALL: 736 case CEPH_OSD_OP_CALL:
755 dst->cls.class_len = src->cls.class_len; 737 dst->cls.class_len = src->cls.class_len;
756 dst->cls.method_len = src->cls.method_len; 738 dst->cls.method_len = src->cls.method_len;
757 osd_data = &src->cls.request_info; 739 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
758 ceph_osdc_msg_data_add(req->r_request, osd_data);
759 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
760 request_data_len = osd_data->pagelist->length;
761
762 osd_data = &src->cls.request_data;
763 data_length = ceph_osd_data_length(osd_data);
764 if (data_length) {
765 BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
766 dst->cls.indata_len = cpu_to_le32(data_length);
767 ceph_osdc_msg_data_add(req->r_request, osd_data);
768 src->indata_len += data_length;
769 request_data_len += data_length;
770 }
771 osd_data = &src->cls.response_data;
772 ceph_osdc_msg_data_add(req->r_reply, osd_data);
773 break; 740 break;
774 case CEPH_OSD_OP_STARTSYNC: 741 case CEPH_OSD_OP_STARTSYNC:
775 break; 742 break;
@@ -791,9 +758,6 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
791 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 758 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
792 dst->xattr.cmp_op = src->xattr.cmp_op; 759 dst->xattr.cmp_op = src->xattr.cmp_op;
793 dst->xattr.cmp_mode = src->xattr.cmp_mode; 760 dst->xattr.cmp_mode = src->xattr.cmp_mode;
794 osd_data = &src->xattr.osd_data;
795 ceph_osdc_msg_data_add(req->r_request, osd_data);
796 request_data_len = osd_data->pagelist->length;
797 break; 761 break;
798 case CEPH_OSD_OP_CREATE: 762 case CEPH_OSD_OP_CREATE:
799 case CEPH_OSD_OP_DELETE: 763 case CEPH_OSD_OP_DELETE:
@@ -810,7 +774,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
810 dst->flags = cpu_to_le32(src->flags); 774 dst->flags = cpu_to_le32(src->flags);
811 dst->payload_len = cpu_to_le32(src->indata_len); 775 dst->payload_len = cpu_to_le32(src->indata_len);
812 776
813 return request_data_len; 777 return src->indata_len;
814} 778}
815 779
816/* 780/*
@@ -852,8 +816,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
852 goto fail; 816 goto fail;
853 } 817 }
854 818
855 req->r_flags = flags;
856
857 /* calculate max write size */ 819 /* calculate max write size */
858 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 820 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
859 if (r) 821 if (r)
@@ -877,9 +839,14 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
877 truncate_size, truncate_seq); 839 truncate_size, truncate_seq);
878 } 840 }
879 841
842 req->r_flags = flags;
880 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); 843 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
881 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum); 844 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
882 845
846 req->r_snapid = vino.snap;
847 if (flags & CEPH_OSD_FLAG_WRITE)
848 req->r_data_offset = off;
849
883 r = ceph_osdc_alloc_messages(req, GFP_NOFS); 850 r = ceph_osdc_alloc_messages(req, GFP_NOFS);
884 if (r) 851 if (r)
885 goto fail; 852 goto fail;
@@ -1509,37 +1476,173 @@ out:
1509 return err; 1476 return err;
1510} 1477}
1511 1478
1512/* 1479static void setup_request_data(struct ceph_osd_request *req,
1513 * caller should hold map_sem (for read) and request_mutex 1480 struct ceph_msg *msg)
1514 */
1515static void __send_request(struct ceph_osd_client *osdc,
1516 struct ceph_osd_request *req)
1517{ 1481{
1518 void *p; 1482 u32 data_len = 0;
1483 int i;
1484
1485 if (!list_empty(&msg->data))
1486 return;
1519 1487
1520 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", 1488 WARN_ON(msg->data_length);
1521 req, req->r_tid, req->r_osd->o_osd, req->r_flags, 1489 for (i = 0; i < req->r_num_ops; i++) {
1522 req->r_t.pgid.pool, req->r_t.pgid.seed); 1490 struct ceph_osd_req_op *op = &req->r_ops[i];
1491
1492 switch (op->op) {
1493 /* request */
1494 case CEPH_OSD_OP_WRITE:
1495 case CEPH_OSD_OP_WRITEFULL:
1496 WARN_ON(op->indata_len != op->extent.length);
1497 ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
1498 break;
1499 case CEPH_OSD_OP_SETXATTR:
1500 case CEPH_OSD_OP_CMPXATTR:
1501 WARN_ON(op->indata_len != op->xattr.name_len +
1502 op->xattr.value_len);
1503 ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
1504 break;
1505
1506 /* reply */
1507 case CEPH_OSD_OP_STAT:
1508 ceph_osdc_msg_data_add(req->r_reply,
1509 &op->raw_data_in);
1510 break;
1511 case CEPH_OSD_OP_READ:
1512 ceph_osdc_msg_data_add(req->r_reply,
1513 &op->extent.osd_data);
1514 break;
1515
1516 /* both */
1517 case CEPH_OSD_OP_CALL:
1518 WARN_ON(op->indata_len != op->cls.class_len +
1519 op->cls.method_len +
1520 op->cls.indata_len);
1521 ceph_osdc_msg_data_add(msg, &op->cls.request_info);
1522 /* optional, can be NONE */
1523 ceph_osdc_msg_data_add(msg, &op->cls.request_data);
1524 /* optional, can be NONE */
1525 ceph_osdc_msg_data_add(req->r_reply,
1526 &op->cls.response_data);
1527 break;
1528 }
1529
1530 data_len += op->indata_len;
1531 }
1523 1532
1524 /* fill in message content that changes each time we send it */ 1533 WARN_ON(data_len != msg->data_length);
1525 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1534}
1526 put_unaligned_le32(req->r_flags, req->r_request_flags); 1535
1527 put_unaligned_le64(req->r_t.target_oloc.pool, req->r_request_pool); 1536static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
1528 p = req->r_request_pgid; 1537{
1538 void *p = msg->front.iov_base;
1539 void *const end = p + msg->front_alloc_len;
1540 u32 data_len = 0;
1541 int i;
1542
1543 if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
1544 /* snapshots aren't writeable */
1545 WARN_ON(req->r_snapid != CEPH_NOSNAP);
1546 } else {
1547 WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
1548 req->r_data_offset || req->r_snapc);
1549 }
1550
1551 setup_request_data(req, msg);
1552
1553 ceph_encode_32(&p, 1); /* client_inc, always 1 */
1554 ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
1555 ceph_encode_32(&p, req->r_flags);
1556 ceph_encode_timespec(p, &req->r_mtime);
1557 p += sizeof(struct ceph_timespec);
1558 /* aka reassert_version */
1559 memcpy(p, &req->r_replay_version, sizeof(req->r_replay_version));
1560 p += sizeof(req->r_replay_version);
1561
1562 /* oloc */
1563 ceph_encode_8(&p, 4);
1564 ceph_encode_8(&p, 4);
1565 ceph_encode_32(&p, 8 + 4 + 4);
1566 ceph_encode_64(&p, req->r_t.target_oloc.pool);
1567 ceph_encode_32(&p, -1); /* preferred */
1568 ceph_encode_32(&p, 0); /* key len */
1569
1570 /* pgid */
1571 ceph_encode_8(&p, 1);
1529 ceph_encode_64(&p, req->r_t.pgid.pool); 1572 ceph_encode_64(&p, req->r_t.pgid.pool);
1530 ceph_encode_32(&p, req->r_t.pgid.seed); 1573 ceph_encode_32(&p, req->r_t.pgid.seed);
1531 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ 1574 ceph_encode_32(&p, -1); /* preferred */
1532 memcpy(req->r_request_reassert_version, &req->r_reassert_version,
1533 sizeof(req->r_reassert_version));
1534 1575
1535 req->r_stamp = jiffies; 1576 /* oid */
1536 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1577 ceph_encode_32(&p, req->r_t.target_oid.name_len);
1578 memcpy(p, req->r_t.target_oid.name, req->r_t.target_oid.name_len);
1579 p += req->r_t.target_oid.name_len;
1537 1580
1538 ceph_msg_get(req->r_request); /* send consumes a ref */ 1581 /* ops, can imply data */
1582 ceph_encode_16(&p, req->r_num_ops);
1583 for (i = 0; i < req->r_num_ops; i++) {
1584 data_len += osd_req_encode_op(p, &req->r_ops[i]);
1585 p += sizeof(struct ceph_osd_op);
1586 }
1539 1587
1540 req->r_sent = req->r_osd->o_incarnation; 1588 ceph_encode_64(&p, req->r_snapid); /* snapid */
1589 if (req->r_snapc) {
1590 ceph_encode_64(&p, req->r_snapc->seq);
1591 ceph_encode_32(&p, req->r_snapc->num_snaps);
1592 for (i = 0; i < req->r_snapc->num_snaps; i++)
1593 ceph_encode_64(&p, req->r_snapc->snaps[i]);
1594 } else {
1595 ceph_encode_64(&p, 0); /* snap_seq */
1596 ceph_encode_32(&p, 0); /* snaps len */
1597 }
1598
1599 ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
1600
1601 BUG_ON(p > end);
1602 msg->front.iov_len = p - msg->front.iov_base;
1603 msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
1604 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1605 msg->hdr.data_len = cpu_to_le32(data_len);
1606 /*
1607 * The header "data_off" is a hint to the receiver allowing it
1608 * to align received data into its buffers such that there's no
1609 * need to re-copy it before writing it to disk (direct I/O).
1610 */
1611 msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
1541 1612
1542 ceph_con_send(&req->r_osd->o_con, req->r_request); 1613 dout("%s req %p oid %*pE oid_len %d front %zu data %u\n", __func__,
1614 req, req->r_t.target_oid.name_len, req->r_t.target_oid.name,
1615 req->r_t.target_oid.name_len, msg->front.iov_len, data_len);
1616}
1617
1618/*
1619 * @req has to be assigned a tid and registered.
1620 */
1621static void send_request(struct ceph_osd_request *req)
1622{
1623 struct ceph_osd *osd = req->r_osd;
1624
1625 WARN_ON(osd->o_osd != req->r_t.osd);
1626
1627 req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
1628 if (req->r_attempts)
1629 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1630 else
1631 WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
1632
1633 encode_request(req, req->r_request);
1634
1635 dout("%s req %p tid %llu to pg %llu.%x osd%d flags 0x%x attempt %d\n",
1636 __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
1637 req->r_t.osd, req->r_flags, req->r_attempts);
1638
1639 req->r_t.paused = false;
1640 req->r_stamp = jiffies;
1641 req->r_attempts++;
1642
1643 req->r_sent = osd->o_incarnation;
1644 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
1645 ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
1543} 1646}
1544 1647
1545/* 1648/*
@@ -1550,8 +1653,10 @@ static void __send_queued(struct ceph_osd_client *osdc)
1550 struct ceph_osd_request *req, *tmp; 1653 struct ceph_osd_request *req, *tmp;
1551 1654
1552 dout("__send_queued\n"); 1655 dout("__send_queued\n");
1553 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) 1656 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
1554 __send_request(osdc, req); 1657 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1658 send_request(req);
1659 }
1555} 1660}
1556 1661
1557/* 1662/*
@@ -1915,8 +2020,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1915 req->r_result = bytes; 2020 req->r_result = bytes;
1916 2021
1917 /* in case this is a write and we need to replay, */ 2022 /* in case this is a write and we need to replay, */
1918 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); 2023 req->r_replay_version.epoch = cpu_to_le32(reassert_epoch);
1919 req->r_reassert_version.version = cpu_to_le64(reassert_version); 2024 req->r_replay_version.version = cpu_to_le64(reassert_version);
1920 2025
1921 req->r_got_reply = 1; 2026 req->r_got_reply = 1;
1922 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 2027 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
@@ -2433,105 +2538,6 @@ bad:
2433} 2538}
2434 2539
2435/* 2540/*
2436 * build new request AND message
2437 *
2438 */
2439void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
2440 struct ceph_snap_context *snapc, u64 snap_id,
2441 struct timespec *mtime)
2442{
2443 struct ceph_msg *msg = req->r_request;
2444 void *p;
2445 size_t msg_size;
2446 int flags = req->r_flags;
2447 u64 data_len;
2448 unsigned int i;
2449
2450 req->r_snapid = snap_id;
2451 WARN_ON(snapc != req->r_snapc);
2452
2453 /* encode request */
2454 msg->hdr.version = cpu_to_le16(4);
2455
2456 p = msg->front.iov_base;
2457 ceph_encode_32(&p, 1); /* client_inc is always 1 */
2458 req->r_request_osdmap_epoch = p;
2459 p += 4;
2460 req->r_request_flags = p;
2461 p += 4;
2462 if (req->r_flags & CEPH_OSD_FLAG_WRITE)
2463 ceph_encode_timespec(p, mtime);
2464 p += sizeof(struct ceph_timespec);
2465 req->r_request_reassert_version = p;
2466 p += sizeof(struct ceph_eversion); /* will get filled in */
2467
2468 /* oloc */
2469 ceph_encode_8(&p, 4);
2470 ceph_encode_8(&p, 4);
2471 ceph_encode_32(&p, 8 + 4 + 4);
2472 req->r_request_pool = p;
2473 p += 8;
2474 ceph_encode_32(&p, -1); /* preferred */
2475 ceph_encode_32(&p, 0); /* key len */
2476
2477 ceph_encode_8(&p, 1);
2478 req->r_request_pgid = p;
2479 p += 8 + 4;
2480 ceph_encode_32(&p, -1); /* preferred */
2481
2482 /* oid */
2483 ceph_encode_32(&p, req->r_base_oid.name_len);
2484 memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len);
2485 dout("oid %*pE len %d\n", req->r_base_oid.name_len,
2486 req->r_base_oid.name, req->r_base_oid.name_len);
2487 p += req->r_base_oid.name_len;
2488
2489 /* ops--can imply data */
2490 ceph_encode_16(&p, (u16)req->r_num_ops);
2491 data_len = 0;
2492 for (i = 0; i < req->r_num_ops; i++) {
2493 data_len += osd_req_encode_op(req, p, i);
2494 p += sizeof(struct ceph_osd_op);
2495 }
2496
2497 /* snaps */
2498 ceph_encode_64(&p, req->r_snapid);
2499 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
2500 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
2501 if (req->r_snapc) {
2502 for (i = 0; i < req->r_snapc->num_snaps; i++) {
2503 ceph_encode_64(&p, req->r_snapc->snaps[i]);
2504 }
2505 }
2506
2507 req->r_request_attempts = p;
2508 p += 4;
2509
2510 /* data */
2511 if (flags & CEPH_OSD_FLAG_WRITE) {
2512 u16 data_off;
2513
2514 /*
2515 * The header "data_off" is a hint to the receiver
2516 * allowing it to align received data into its
2517 * buffers such that there's no need to re-copy
2518 * it before writing it to disk (direct I/O).
2519 */
2520 data_off = (u16) (off & 0xffff);
2521 req->r_request->hdr.data_off = cpu_to_le16(data_off);
2522 }
2523 req->r_request->hdr.data_len = cpu_to_le32(data_len);
2524
2525 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
2526 msg_size = p - msg->front.iov_base;
2527 msg->front.iov_len = msg_size;
2528 msg->hdr.front_len = cpu_to_le32(msg_size);
2529
2530 dout("build_request msg_size was %d\n", (int)msg_size);
2531}
2532EXPORT_SYMBOL(ceph_osdc_build_request);
2533
2534/*
2535 * Register request, send initial attempt. 2541 * Register request, send initial attempt.
2536 */ 2542 */
2537int ceph_osdc_start_request(struct ceph_osd_client *osdc, 2543int ceph_osdc_start_request(struct ceph_osd_client *osdc,
@@ -2749,15 +2755,12 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
2749 return PTR_ERR(req); 2755 return PTR_ERR(req);
2750 2756
2751 /* it may be a short read due to an object boundary */ 2757 /* it may be a short read due to an object boundary */
2752
2753 osd_req_op_extent_osd_data_pages(req, 0, 2758 osd_req_op_extent_osd_data_pages(req, 0,
2754 pages, *plen, page_align, false, false); 2759 pages, *plen, page_align, false, false);
2755 2760
2756 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 2761 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
2757 off, *plen, *plen, page_align); 2762 off, *plen, *plen, page_align);
2758 2763
2759 ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
2760
2761 rc = ceph_osdc_start_request(osdc, req, false); 2764 rc = ceph_osdc_start_request(osdc, req, false);
2762 if (!rc) 2765 if (!rc)
2763 rc = ceph_osdc_wait_request(osdc, req); 2766 rc = ceph_osdc_wait_request(osdc, req);
@@ -2783,7 +2786,6 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
2783 int rc = 0; 2786 int rc = 0;
2784 int page_align = off & ~PAGE_MASK; 2787 int page_align = off & ~PAGE_MASK;
2785 2788
2786 BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */
2787 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1, 2789 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
2788 CEPH_OSD_OP_WRITE, 2790 CEPH_OSD_OP_WRITE,
2789 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2791 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
@@ -2797,8 +2799,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
2797 false, false); 2799 false, false);
2798 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); 2800 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
2799 2801
2800 ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime); 2802 req->r_mtime = *mtime;
2801
2802 rc = ceph_osdc_start_request(osdc, req, true); 2803 rc = ceph_osdc_start_request(osdc, req, true);
2803 if (!rc) 2804 if (!rc)
2804 rc = ceph_osdc_wait_request(osdc, req); 2805 rc = ceph_osdc_wait_request(osdc, req);