aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2016-05-25 18:29:52 -0400
committerIlya Dryomov <idryomov@gmail.com>2016-05-25 18:36:27 -0400
commitbb873b539154ab51893430b4ad6ba4051775276a (patch)
tree9415938962eddb3d83e89bfa00eaab803b196bb6
parenta66dd38309f5d9c66ec9bc7911ff8da8cc37bb9f (diff)
libceph: switch to calc_target(), part 2
The crux of this is getting rid of ceph_osdc_build_request(), so that MOSDOp can be encoded not before but after calc_target() calculates the actual target. Encoding now happens within ceph_osdc_start_request(). Also nuked is the accompanying bunch of pointers into the encoded buffer that was used to update fields on each send - instead, the entire front is re-encoded. If we want to support target->name_len != base->name_len in the future, there is no other way, because oid is surrounded by other fields in the encoded buffer. Encoding OSD ops and adding data items to the request message were mixed together in osd_req_encode_op(). While we want to re-encode OSD ops, we don't want to add duplicate data items to the message when resending, so all call to ceph_osdc_msg_data_add() are factored out into a new setup_request_data(). Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
-rw-r--r--drivers/block/rbd.c18
-rw-r--r--fs/ceph/addr.c16
-rw-r--r--fs/ceph/file.c16
-rw-r--r--include/linux/ceph/osd_client.h29
-rw-r--r--include/linux/ceph/rados.h7
-rw-r--r--net/ceph/debugfs.c61
-rw-r--r--net/ceph/osd_client.c355
7 files changed, 247 insertions, 255 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index f3ea927f93de..0e598916e048 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1896,27 +1896,17 @@ static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1896{ 1896{
1897 struct rbd_img_request *img_request = obj_request->img_request; 1897 struct rbd_img_request *img_request = obj_request->img_request;
1898 struct ceph_osd_request *osd_req = obj_request->osd_req; 1898 struct ceph_osd_request *osd_req = obj_request->osd_req;
1899 u64 snap_id;
1900
1901 rbd_assert(osd_req != NULL);
1902 1899
1903 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP; 1900 if (img_request)
1904 ceph_osdc_build_request(osd_req, obj_request->offset, 1901 osd_req->r_snapid = img_request->snap_id;
1905 NULL, snap_id, NULL);
1906} 1902}
1907 1903
1908static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) 1904static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1909{ 1905{
1910 struct rbd_img_request *img_request = obj_request->img_request;
1911 struct ceph_osd_request *osd_req = obj_request->osd_req; 1906 struct ceph_osd_request *osd_req = obj_request->osd_req;
1912 struct ceph_snap_context *snapc;
1913 struct timespec mtime = CURRENT_TIME;
1914
1915 rbd_assert(osd_req != NULL);
1916 1907
1917 snapc = img_request ? img_request->snapc : NULL; 1908 osd_req->r_mtime = CURRENT_TIME;
1918 ceph_osdc_build_request(osd_req, obj_request->offset, 1909 osd_req->r_data_offset = obj_request->offset;
1919 snapc, CEPH_NOSNAP, &mtime);
1920} 1910}
1921 1911
1922/* 1912/*
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index c5d75486823b..59b3c3fbd3bd 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -376,8 +376,6 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
376 req->r_callback = finish_read; 376 req->r_callback = finish_read;
377 req->r_inode = inode; 377 req->r_inode = inode;
378 378
379 ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
380
381 dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len); 379 dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
382 ret = ceph_osdc_start_request(osdc, req, false); 380 ret = ceph_osdc_start_request(osdc, req, false);
383 if (ret < 0) 381 if (ret < 0)
@@ -1063,10 +1061,7 @@ new_request:
1063 pages = NULL; 1061 pages = NULL;
1064 } 1062 }
1065 1063
1066 vino = ceph_vino(inode); 1064 req->r_mtime = inode->i_mtime;
1067 ceph_osdc_build_request(req, offset, snapc, vino.snap,
1068 &inode->i_mtime);
1069
1070 rc = ceph_osdc_start_request(&fsc->client->osdc, req, true); 1065 rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
1071 BUG_ON(rc); 1066 BUG_ON(rc);
1072 req = NULL; 1067 req = NULL;
@@ -1614,7 +1609,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
1614 goto out; 1609 goto out;
1615 } 1610 }
1616 1611
1617 ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime); 1612 req->r_mtime = inode->i_mtime;
1618 err = ceph_osdc_start_request(&fsc->client->osdc, req, false); 1613 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1619 if (!err) 1614 if (!err)
1620 err = ceph_osdc_wait_request(&fsc->client->osdc, req); 1615 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
@@ -1657,7 +1652,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
1657 goto out_put; 1652 goto out_put;
1658 } 1653 }
1659 1654
1660 ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime); 1655 req->r_mtime = inode->i_mtime;
1661 err = ceph_osdc_start_request(&fsc->client->osdc, req, false); 1656 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1662 if (!err) 1657 if (!err)
1663 err = ceph_osdc_wait_request(&fsc->client->osdc, req); 1658 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
@@ -1790,12 +1785,9 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
1790 1785
1791 osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE, 1786 osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
1792 0, false, true); 1787 0, false, true);
1793 ceph_osdc_build_request(rd_req, 0, NULL, CEPH_NOSNAP,
1794 &ci->vfs_inode.i_mtime);
1795 err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false); 1788 err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
1796 1789
1797 ceph_osdc_build_request(wr_req, 0, NULL, CEPH_NOSNAP, 1790 wr_req->r_mtime = ci->vfs_inode.i_mtime;
1798 &ci->vfs_inode.i_mtime);
1799 err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false); 1791 err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
1800 1792
1801 if (!err) 1793 if (!err)
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 36b4a41dfa67..52e4b72dd5de 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -727,8 +727,8 @@ static void ceph_aio_retry_work(struct work_struct *work)
727 req->r_ops[0] = orig_req->r_ops[0]; 727 req->r_ops[0] = orig_req->r_ops[0];
728 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); 728 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
729 729
730 ceph_osdc_build_request(req, req->r_ops[0].extent.offset, 730 req->r_mtime = aio_req->mtime;
731 snapc, CEPH_NOSNAP, &aio_req->mtime); 731 req->r_data_offset = req->r_ops[0].extent.offset;
732 732
733 ceph_osdc_put_request(orig_req); 733 ceph_osdc_put_request(orig_req);
734 734
@@ -882,14 +882,12 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
882 (pos+len) | (PAGE_SIZE - 1)); 882 (pos+len) | (PAGE_SIZE - 1));
883 883
884 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); 884 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
885 req->r_mtime = mtime;
885 } 886 }
886 887
887
888 osd_req_op_extent_osd_data_pages(req, 0, pages, len, start, 888 osd_req_op_extent_osd_data_pages(req, 0, pages, len, start,
889 false, false); 889 false, false);
890 890
891 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
892
893 if (aio_req) { 891 if (aio_req) {
894 aio_req->total_len += len; 892 aio_req->total_len += len;
895 aio_req->num_reqs++; 893 aio_req->num_reqs++;
@@ -1074,9 +1072,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
1074 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, 1072 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
1075 false, true); 1073 false, true);
1076 1074
1077 /* BUG_ON(vino.snap != CEPH_NOSNAP); */ 1075 req->r_mtime = mtime;
1078 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
1079
1080 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); 1076 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1081 if (!ret) 1077 if (!ret)
1082 ret = ceph_osdc_wait_request(&fsc->client->osdc, req); 1078 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
@@ -1532,9 +1528,7 @@ static int ceph_zero_partial_object(struct inode *inode,
1532 goto out; 1528 goto out;
1533 } 1529 }
1534 1530
1535 ceph_osdc_build_request(req, offset, NULL, ceph_vino(inode).snap, 1531 req->r_mtime = inode->i_mtime;
1536 &inode->i_mtime);
1537
1538 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); 1532 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1539 if (!ret) { 1533 if (!ret) {
1540 ret = ceph_osdc_wait_request(&fsc->client->osdc, req); 1534 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 03bf9d9e1517..67a37d98e0ca 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -104,7 +104,7 @@ struct ceph_osd_req_op {
104 struct ceph_osd_data response_data; 104 struct ceph_osd_data response_data;
105 __u8 class_len; 105 __u8 class_len;
106 __u8 method_len; 106 __u8 method_len;
107 __u8 argc; 107 u32 indata_len;
108 } cls; 108 } cls;
109 struct { 109 struct {
110 u64 cookie; 110 u64 cookie;
@@ -162,14 +162,6 @@ struct ceph_osd_request {
162 /* request osd ops array */ 162 /* request osd ops array */
163 unsigned int r_num_ops; 163 unsigned int r_num_ops;
164 164
165 /* these are updated on each send */
166 __le32 *r_request_osdmap_epoch;
167 __le32 *r_request_flags;
168 __le64 *r_request_pool;
169 void *r_request_pgid;
170 __le32 *r_request_attempts;
171 struct ceph_eversion *r_request_reassert_version;
172
173 int r_result; 165 int r_result;
174 int r_got_reply; 166 int r_got_reply;
175 int r_linger; 167 int r_linger;
@@ -180,16 +172,22 @@ struct ceph_osd_request {
180 struct completion r_completion, r_safe_completion; 172 struct completion r_completion, r_safe_completion;
181 ceph_osdc_callback_t r_callback; 173 ceph_osdc_callback_t r_callback;
182 ceph_osdc_unsafe_callback_t r_unsafe_callback; 174 ceph_osdc_unsafe_callback_t r_unsafe_callback;
183 struct ceph_eversion r_reassert_version;
184 struct list_head r_unsafe_item; 175 struct list_head r_unsafe_item;
185 176
186 struct inode *r_inode; /* for use by callbacks */ 177 struct inode *r_inode; /* for use by callbacks */
187 void *r_priv; /* ditto */ 178 void *r_priv; /* ditto */
188 179
189 u64 r_snapid; 180 /* set by submitter */
190 unsigned long r_stamp; /* send OR check time */ 181 u64 r_snapid; /* for reads, CEPH_NOSNAP o/w */
182 struct ceph_snap_context *r_snapc; /* for writes */
183 struct timespec r_mtime; /* ditto */
184 u64 r_data_offset; /* ditto */
191 185
192 struct ceph_snap_context *r_snapc; /* snap context for writes */ 186 /* internal */
187 unsigned long r_stamp; /* jiffies, send or check time */
188 int r_attempts;
189 struct ceph_eversion r_replay_version; /* aka reassert_version */
190 u32 r_last_force_resend;
193 191
194 struct ceph_osd_req_op r_ops[]; 192 struct ceph_osd_req_op r_ops[];
195}; 193};
@@ -334,11 +332,6 @@ extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *
334 gfp_t gfp_flags); 332 gfp_t gfp_flags);
335int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp); 333int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp);
336 334
337extern void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
338 struct ceph_snap_context *snapc,
339 u64 snap_id,
340 struct timespec *mtime);
341
342extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, 335extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
343 struct ceph_file_layout *layout, 336 struct ceph_file_layout *layout,
344 struct ceph_vino vino, 337 struct ceph_vino vino,
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index f28ed864e682..28740a58f32c 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -394,6 +394,13 @@ enum {
394 CEPH_OSD_FLAG_SKIPRWLOCKS = 0x10000, /* skip rw locks */ 394 CEPH_OSD_FLAG_SKIPRWLOCKS = 0x10000, /* skip rw locks */
395 CEPH_OSD_FLAG_IGNORE_OVERLAY = 0x20000, /* ignore pool overlay */ 395 CEPH_OSD_FLAG_IGNORE_OVERLAY = 0x20000, /* ignore pool overlay */
396 CEPH_OSD_FLAG_FLUSH = 0x40000, /* this is part of flush */ 396 CEPH_OSD_FLAG_FLUSH = 0x40000, /* this is part of flush */
397 CEPH_OSD_FLAG_MAP_SNAP_CLONE = 0x80000, /* map snap direct to clone id */
398 CEPH_OSD_FLAG_ENFORCE_SNAPC = 0x100000, /* use snapc provided even if
399 pool uses pool snaps */
400 CEPH_OSD_FLAG_REDIRECTED = 0x200000, /* op has been redirected */
401 CEPH_OSD_FLAG_KNOWN_REDIR = 0x400000, /* redirect bit is authoritative */
402 CEPH_OSD_FLAG_FULL_TRY = 0x800000, /* try op despite full flag */
403 CEPH_OSD_FLAG_FULL_FORCE = 0x1000000, /* force op despite full flag */
397}; 404};
398 405
399enum { 406enum {
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 0c11ab5f8c30..6d3ff713edeb 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -145,6 +145,43 @@ static int monc_show(struct seq_file *s, void *p)
145 return 0; 145 return 0;
146} 146}
147 147
148static void dump_target(struct seq_file *s, struct ceph_osd_request_target *t)
149{
150 int i;
151
152 seq_printf(s, "osd%d\t%llu.%x\t[", t->osd, t->pgid.pool, t->pgid.seed);
153 for (i = 0; i < t->up.size; i++)
154 seq_printf(s, "%s%d", (!i ? "" : ","), t->up.osds[i]);
155 seq_printf(s, "]/%d\t[", t->up.primary);
156 for (i = 0; i < t->acting.size; i++)
157 seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]);
158 seq_printf(s, "]/%d\t%*pE\t0x%x", t->acting.primary,
159 t->target_oid.name_len, t->target_oid.name, t->flags);
160 if (t->paused)
161 seq_puts(s, "\tP");
162}
163
164static void dump_request(struct seq_file *s, struct ceph_osd_request *req)
165{
166 int i;
167
168 seq_printf(s, "%llu\t", req->r_tid);
169 dump_target(s, &req->r_t);
170
171 seq_printf(s, "\t%d\t%u'%llu", req->r_attempts,
172 le32_to_cpu(req->r_replay_version.epoch),
173 le64_to_cpu(req->r_replay_version.version));
174
175 for (i = 0; i < req->r_num_ops; i++) {
176 struct ceph_osd_req_op *op = &req->r_ops[i];
177
178 seq_printf(s, "%s%s", (i == 0 ? "\t" : ","),
179 ceph_osd_op_name(op->op));
180 }
181
182 seq_putc(s, '\n');
183}
184
148static int osdc_show(struct seq_file *s, void *pp) 185static int osdc_show(struct seq_file *s, void *pp)
149{ 186{
150 struct ceph_client *client = s->private; 187 struct ceph_client *client = s->private;
@@ -154,32 +191,10 @@ static int osdc_show(struct seq_file *s, void *pp)
154 mutex_lock(&osdc->request_mutex); 191 mutex_lock(&osdc->request_mutex);
155 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { 192 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
156 struct ceph_osd_request *req; 193 struct ceph_osd_request *req;
157 unsigned int i;
158 int opcode;
159 194
160 req = rb_entry(p, struct ceph_osd_request, r_node); 195 req = rb_entry(p, struct ceph_osd_request, r_node);
161 196
162 seq_printf(s, "%lld\tosd%d\t%lld.%x\t", req->r_tid, 197 dump_request(s, req);
163 req->r_osd ? req->r_osd->o_osd : -1,
164 req->r_t.pgid.pool, req->r_t.pgid.seed);
165
166 seq_printf(s, "%*pE", req->r_base_oid.name_len,
167 req->r_base_oid.name);
168
169 if (req->r_reassert_version.epoch)
170 seq_printf(s, "\t%u'%llu",
171 (unsigned int)le32_to_cpu(req->r_reassert_version.epoch),
172 le64_to_cpu(req->r_reassert_version.version));
173 else
174 seq_printf(s, "\t");
175
176 for (i = 0; i < req->r_num_ops; i++) {
177 opcode = req->r_ops[i].op;
178 seq_printf(s, "%s%s", (i == 0 ? "\t" : ","),
179 ceph_osd_op_name(opcode));
180 }
181
182 seq_printf(s, "\n");
183 } 198 }
184 mutex_unlock(&osdc->request_mutex); 199 mutex_unlock(&osdc->request_mutex);
185 return 0; 200 return 0;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 013101598c41..8a008f083283 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -34,8 +34,6 @@ static void __unregister_request(struct ceph_osd_client *osdc,
34static void __unregister_linger_request(struct ceph_osd_client *osdc, 34static void __unregister_linger_request(struct ceph_osd_client *osdc,
35 struct ceph_osd_request *req); 35 struct ceph_osd_request *req);
36static void __enqueue_request(struct ceph_osd_request *req); 36static void __enqueue_request(struct ceph_osd_request *req);
37static void __send_request(struct ceph_osd_client *osdc,
38 struct ceph_osd_request *req);
39 37
40/* 38/*
41 * Implement client access to distributed object storage cluster. 39 * Implement client access to distributed object storage cluster.
@@ -209,6 +207,8 @@ void osd_req_op_cls_request_data_pagelist(
209 207
210 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 208 osd_data = osd_req_op_data(osd_req, which, cls, request_data);
211 ceph_osd_data_pagelist_init(osd_data, pagelist); 209 ceph_osd_data_pagelist_init(osd_data, pagelist);
210 osd_req->r_ops[which].cls.indata_len += pagelist->length;
211 osd_req->r_ops[which].indata_len += pagelist->length;
212} 212}
213EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist); 213EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
214 214
@@ -221,6 +221,8 @@ void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
221 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 221 osd_data = osd_req_op_data(osd_req, which, cls, request_data);
222 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 222 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
223 pages_from_pool, own_pages); 223 pages_from_pool, own_pages);
224 osd_req->r_ops[which].cls.indata_len += length;
225 osd_req->r_ops[which].indata_len += length;
224} 226}
225EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); 227EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
226 228
@@ -610,8 +612,6 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
610 612
611 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); 613 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
612 614
613 op->cls.argc = 0; /* currently unused */
614
615 op->indata_len = payload_len; 615 op->indata_len = payload_len;
616} 616}
617EXPORT_SYMBOL(osd_req_op_cls_init); 617EXPORT_SYMBOL(osd_req_op_cls_init);
@@ -709,16 +709,9 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
709 } 709 }
710} 710}
711 711
712static u64 osd_req_encode_op(struct ceph_osd_request *req, 712static u32 osd_req_encode_op(struct ceph_osd_op *dst,
713 struct ceph_osd_op *dst, unsigned int which) 713 const struct ceph_osd_req_op *src)
714{ 714{
715 struct ceph_osd_req_op *src;
716 struct ceph_osd_data *osd_data;
717 u64 request_data_len = 0;
718 u64 data_length;
719
720 BUG_ON(which >= req->r_num_ops);
721 src = &req->r_ops[which];
722 if (WARN_ON(!osd_req_opcode_valid(src->op))) { 715 if (WARN_ON(!osd_req_opcode_valid(src->op))) {
723 pr_err("unrecognized osd opcode %d\n", src->op); 716 pr_err("unrecognized osd opcode %d\n", src->op);
724 717
@@ -727,49 +720,23 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
727 720
728 switch (src->op) { 721 switch (src->op) {
729 case CEPH_OSD_OP_STAT: 722 case CEPH_OSD_OP_STAT:
730 osd_data = &src->raw_data_in;
731 ceph_osdc_msg_data_add(req->r_reply, osd_data);
732 break; 723 break;
733 case CEPH_OSD_OP_READ: 724 case CEPH_OSD_OP_READ:
734 case CEPH_OSD_OP_WRITE: 725 case CEPH_OSD_OP_WRITE:
735 case CEPH_OSD_OP_WRITEFULL: 726 case CEPH_OSD_OP_WRITEFULL:
736 case CEPH_OSD_OP_ZERO: 727 case CEPH_OSD_OP_ZERO:
737 case CEPH_OSD_OP_TRUNCATE: 728 case CEPH_OSD_OP_TRUNCATE:
738 if (src->op == CEPH_OSD_OP_WRITE ||
739 src->op == CEPH_OSD_OP_WRITEFULL)
740 request_data_len = src->extent.length;
741 dst->extent.offset = cpu_to_le64(src->extent.offset); 729 dst->extent.offset = cpu_to_le64(src->extent.offset);
742 dst->extent.length = cpu_to_le64(src->extent.length); 730 dst->extent.length = cpu_to_le64(src->extent.length);
743 dst->extent.truncate_size = 731 dst->extent.truncate_size =
744 cpu_to_le64(src->extent.truncate_size); 732 cpu_to_le64(src->extent.truncate_size);
745 dst->extent.truncate_seq = 733 dst->extent.truncate_seq =
746 cpu_to_le32(src->extent.truncate_seq); 734 cpu_to_le32(src->extent.truncate_seq);
747 osd_data = &src->extent.osd_data;
748 if (src->op == CEPH_OSD_OP_WRITE ||
749 src->op == CEPH_OSD_OP_WRITEFULL)
750 ceph_osdc_msg_data_add(req->r_request, osd_data);
751 else
752 ceph_osdc_msg_data_add(req->r_reply, osd_data);
753 break; 735 break;
754 case CEPH_OSD_OP_CALL: 736 case CEPH_OSD_OP_CALL:
755 dst->cls.class_len = src->cls.class_len; 737 dst->cls.class_len = src->cls.class_len;
756 dst->cls.method_len = src->cls.method_len; 738 dst->cls.method_len = src->cls.method_len;
757 osd_data = &src->cls.request_info; 739 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
758 ceph_osdc_msg_data_add(req->r_request, osd_data);
759 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
760 request_data_len = osd_data->pagelist->length;
761
762 osd_data = &src->cls.request_data;
763 data_length = ceph_osd_data_length(osd_data);
764 if (data_length) {
765 BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
766 dst->cls.indata_len = cpu_to_le32(data_length);
767 ceph_osdc_msg_data_add(req->r_request, osd_data);
768 src->indata_len += data_length;
769 request_data_len += data_length;
770 }
771 osd_data = &src->cls.response_data;
772 ceph_osdc_msg_data_add(req->r_reply, osd_data);
773 break; 740 break;
774 case CEPH_OSD_OP_STARTSYNC: 741 case CEPH_OSD_OP_STARTSYNC:
775 break; 742 break;
@@ -791,9 +758,6 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
791 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 758 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
792 dst->xattr.cmp_op = src->xattr.cmp_op; 759 dst->xattr.cmp_op = src->xattr.cmp_op;
793 dst->xattr.cmp_mode = src->xattr.cmp_mode; 760 dst->xattr.cmp_mode = src->xattr.cmp_mode;
794 osd_data = &src->xattr.osd_data;
795 ceph_osdc_msg_data_add(req->r_request, osd_data);
796 request_data_len = osd_data->pagelist->length;
797 break; 761 break;
798 case CEPH_OSD_OP_CREATE: 762 case CEPH_OSD_OP_CREATE:
799 case CEPH_OSD_OP_DELETE: 763 case CEPH_OSD_OP_DELETE:
@@ -810,7 +774,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
810 dst->flags = cpu_to_le32(src->flags); 774 dst->flags = cpu_to_le32(src->flags);
811 dst->payload_len = cpu_to_le32(src->indata_len); 775 dst->payload_len = cpu_to_le32(src->indata_len);
812 776
813 return request_data_len; 777 return src->indata_len;
814} 778}
815 779
816/* 780/*
@@ -852,8 +816,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
852 goto fail; 816 goto fail;
853 } 817 }
854 818
855 req->r_flags = flags;
856
857 /* calculate max write size */ 819 /* calculate max write size */
858 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 820 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
859 if (r) 821 if (r)
@@ -877,9 +839,14 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
877 truncate_size, truncate_seq); 839 truncate_size, truncate_seq);
878 } 840 }
879 841
842 req->r_flags = flags;
880 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); 843 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
881 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum); 844 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
882 845
846 req->r_snapid = vino.snap;
847 if (flags & CEPH_OSD_FLAG_WRITE)
848 req->r_data_offset = off;
849
883 r = ceph_osdc_alloc_messages(req, GFP_NOFS); 850 r = ceph_osdc_alloc_messages(req, GFP_NOFS);
884 if (r) 851 if (r)
885 goto fail; 852 goto fail;
@@ -1509,37 +1476,173 @@ out:
1509 return err; 1476 return err;
1510} 1477}
1511 1478
1512/* 1479static void setup_request_data(struct ceph_osd_request *req,
1513 * caller should hold map_sem (for read) and request_mutex 1480 struct ceph_msg *msg)
1514 */
1515static void __send_request(struct ceph_osd_client *osdc,
1516 struct ceph_osd_request *req)
1517{ 1481{
1518 void *p; 1482 u32 data_len = 0;
1483 int i;
1484
1485 if (!list_empty(&msg->data))
1486 return;
1519 1487
1520 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", 1488 WARN_ON(msg->data_length);
1521 req, req->r_tid, req->r_osd->o_osd, req->r_flags, 1489 for (i = 0; i < req->r_num_ops; i++) {
1522 req->r_t.pgid.pool, req->r_t.pgid.seed); 1490 struct ceph_osd_req_op *op = &req->r_ops[i];
1491
1492 switch (op->op) {
1493 /* request */
1494 case CEPH_OSD_OP_WRITE:
1495 case CEPH_OSD_OP_WRITEFULL:
1496 WARN_ON(op->indata_len != op->extent.length);
1497 ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
1498 break;
1499 case CEPH_OSD_OP_SETXATTR:
1500 case CEPH_OSD_OP_CMPXATTR:
1501 WARN_ON(op->indata_len != op->xattr.name_len +
1502 op->xattr.value_len);
1503 ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
1504 break;
1505
1506 /* reply */
1507 case CEPH_OSD_OP_STAT:
1508 ceph_osdc_msg_data_add(req->r_reply,
1509 &op->raw_data_in);
1510 break;
1511 case CEPH_OSD_OP_READ:
1512 ceph_osdc_msg_data_add(req->r_reply,
1513 &op->extent.osd_data);
1514 break;
1515
1516 /* both */
1517 case CEPH_OSD_OP_CALL:
1518 WARN_ON(op->indata_len != op->cls.class_len +
1519 op->cls.method_len +
1520 op->cls.indata_len);
1521 ceph_osdc_msg_data_add(msg, &op->cls.request_info);
1522 /* optional, can be NONE */
1523 ceph_osdc_msg_data_add(msg, &op->cls.request_data);
1524 /* optional, can be NONE */
1525 ceph_osdc_msg_data_add(req->r_reply,
1526 &op->cls.response_data);
1527 break;
1528 }
1529
1530 data_len += op->indata_len;
1531 }
1523 1532
1524 /* fill in message content that changes each time we send it */ 1533 WARN_ON(data_len != msg->data_length);
1525 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1534}
1526 put_unaligned_le32(req->r_flags, req->r_request_flags); 1535
1527 put_unaligned_le64(req->r_t.target_oloc.pool, req->r_request_pool); 1536static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
1528 p = req->r_request_pgid; 1537{
1538 void *p = msg->front.iov_base;
1539 void *const end = p + msg->front_alloc_len;
1540 u32 data_len = 0;
1541 int i;
1542
1543 if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
1544 /* snapshots aren't writeable */
1545 WARN_ON(req->r_snapid != CEPH_NOSNAP);
1546 } else {
1547 WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
1548 req->r_data_offset || req->r_snapc);
1549 }
1550
1551 setup_request_data(req, msg);
1552
1553 ceph_encode_32(&p, 1); /* client_inc, always 1 */
1554 ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
1555 ceph_encode_32(&p, req->r_flags);
1556 ceph_encode_timespec(p, &req->r_mtime);
1557 p += sizeof(struct ceph_timespec);
1558 /* aka reassert_version */
1559 memcpy(p, &req->r_replay_version, sizeof(req->r_replay_version));
1560 p += sizeof(req->r_replay_version);
1561
1562 /* oloc */
1563 ceph_encode_8(&p, 4);
1564 ceph_encode_8(&p, 4);
1565 ceph_encode_32(&p, 8 + 4 + 4);
1566 ceph_encode_64(&p, req->r_t.target_oloc.pool);
1567 ceph_encode_32(&p, -1); /* preferred */
1568 ceph_encode_32(&p, 0); /* key len */
1569
1570 /* pgid */
1571 ceph_encode_8(&p, 1);
1529 ceph_encode_64(&p, req->r_t.pgid.pool); 1572 ceph_encode_64(&p, req->r_t.pgid.pool);
1530 ceph_encode_32(&p, req->r_t.pgid.seed); 1573 ceph_encode_32(&p, req->r_t.pgid.seed);
1531 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ 1574 ceph_encode_32(&p, -1); /* preferred */
1532 memcpy(req->r_request_reassert_version, &req->r_reassert_version,
1533 sizeof(req->r_reassert_version));
1534 1575
1535 req->r_stamp = jiffies; 1576 /* oid */
1536 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1577 ceph_encode_32(&p, req->r_t.target_oid.name_len);
1578 memcpy(p, req->r_t.target_oid.name, req->r_t.target_oid.name_len);
1579 p += req->r_t.target_oid.name_len;
1537 1580
1538 ceph_msg_get(req->r_request); /* send consumes a ref */ 1581 /* ops, can imply data */
1582 ceph_encode_16(&p, req->r_num_ops);
1583 for (i = 0; i < req->r_num_ops; i++) {
1584 data_len += osd_req_encode_op(p, &req->r_ops[i]);
1585 p += sizeof(struct ceph_osd_op);
1586 }
1539 1587
1540 req->r_sent = req->r_osd->o_incarnation; 1588 ceph_encode_64(&p, req->r_snapid); /* snapid */
1589 if (req->r_snapc) {
1590 ceph_encode_64(&p, req->r_snapc->seq);
1591 ceph_encode_32(&p, req->r_snapc->num_snaps);
1592 for (i = 0; i < req->r_snapc->num_snaps; i++)
1593 ceph_encode_64(&p, req->r_snapc->snaps[i]);
1594 } else {
1595 ceph_encode_64(&p, 0); /* snap_seq */
1596 ceph_encode_32(&p, 0); /* snaps len */
1597 }
1598
1599 ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
1600
1601 BUG_ON(p > end);
1602 msg->front.iov_len = p - msg->front.iov_base;
1603 msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
1604 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1605 msg->hdr.data_len = cpu_to_le32(data_len);
1606 /*
1607 * The header "data_off" is a hint to the receiver allowing it
1608 * to align received data into its buffers such that there's no
1609 * need to re-copy it before writing it to disk (direct I/O).
1610 */
1611 msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
1541 1612
1542 ceph_con_send(&req->r_osd->o_con, req->r_request); 1613 dout("%s req %p oid %*pE oid_len %d front %zu data %u\n", __func__,
1614 req, req->r_t.target_oid.name_len, req->r_t.target_oid.name,
1615 req->r_t.target_oid.name_len, msg->front.iov_len, data_len);
1616}
1617
1618/*
1619 * @req has to be assigned a tid and registered.
1620 */
1621static void send_request(struct ceph_osd_request *req)
1622{
1623 struct ceph_osd *osd = req->r_osd;
1624
1625 WARN_ON(osd->o_osd != req->r_t.osd);
1626
1627 req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
1628 if (req->r_attempts)
1629 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1630 else
1631 WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
1632
1633 encode_request(req, req->r_request);
1634
1635 dout("%s req %p tid %llu to pg %llu.%x osd%d flags 0x%x attempt %d\n",
1636 __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
1637 req->r_t.osd, req->r_flags, req->r_attempts);
1638
1639 req->r_t.paused = false;
1640 req->r_stamp = jiffies;
1641 req->r_attempts++;
1642
1643 req->r_sent = osd->o_incarnation;
1644 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
1645 ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
1543} 1646}
1544 1647
1545/* 1648/*
@@ -1550,8 +1653,10 @@ static void __send_queued(struct ceph_osd_client *osdc)
1550 struct ceph_osd_request *req, *tmp; 1653 struct ceph_osd_request *req, *tmp;
1551 1654
1552 dout("__send_queued\n"); 1655 dout("__send_queued\n");
1553 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) 1656 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
1554 __send_request(osdc, req); 1657 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1658 send_request(req);
1659 }
1555} 1660}
1556 1661
1557/* 1662/*
@@ -1915,8 +2020,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1915 req->r_result = bytes; 2020 req->r_result = bytes;
1916 2021
1917 /* in case this is a write and we need to replay, */ 2022 /* in case this is a write and we need to replay, */
1918 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); 2023 req->r_replay_version.epoch = cpu_to_le32(reassert_epoch);
1919 req->r_reassert_version.version = cpu_to_le64(reassert_version); 2024 req->r_replay_version.version = cpu_to_le64(reassert_version);
1920 2025
1921 req->r_got_reply = 1; 2026 req->r_got_reply = 1;
1922 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 2027 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
@@ -2433,105 +2538,6 @@ bad:
2433} 2538}
2434 2539
2435/* 2540/*
2436 * build new request AND message
2437 *
2438 */
2439void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
2440 struct ceph_snap_context *snapc, u64 snap_id,
2441 struct timespec *mtime)
2442{
2443 struct ceph_msg *msg = req->r_request;
2444 void *p;
2445 size_t msg_size;
2446 int flags = req->r_flags;
2447 u64 data_len;
2448 unsigned int i;
2449
2450 req->r_snapid = snap_id;
2451 WARN_ON(snapc != req->r_snapc);
2452
2453 /* encode request */
2454 msg->hdr.version = cpu_to_le16(4);
2455
2456 p = msg->front.iov_base;
2457 ceph_encode_32(&p, 1); /* client_inc is always 1 */
2458 req->r_request_osdmap_epoch = p;
2459 p += 4;
2460 req->r_request_flags = p;
2461 p += 4;
2462 if (req->r_flags & CEPH_OSD_FLAG_WRITE)
2463 ceph_encode_timespec(p, mtime);
2464 p += sizeof(struct ceph_timespec);
2465 req->r_request_reassert_version = p;
2466 p += sizeof(struct ceph_eversion); /* will get filled in */
2467
2468 /* oloc */
2469 ceph_encode_8(&p, 4);
2470 ceph_encode_8(&p, 4);
2471 ceph_encode_32(&p, 8 + 4 + 4);
2472 req->r_request_pool = p;
2473 p += 8;
2474 ceph_encode_32(&p, -1); /* preferred */
2475 ceph_encode_32(&p, 0); /* key len */
2476
2477 ceph_encode_8(&p, 1);
2478 req->r_request_pgid = p;
2479 p += 8 + 4;
2480 ceph_encode_32(&p, -1); /* preferred */
2481
2482 /* oid */
2483 ceph_encode_32(&p, req->r_base_oid.name_len);
2484 memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len);
2485 dout("oid %*pE len %d\n", req->r_base_oid.name_len,
2486 req->r_base_oid.name, req->r_base_oid.name_len);
2487 p += req->r_base_oid.name_len;
2488
2489 /* ops--can imply data */
2490 ceph_encode_16(&p, (u16)req->r_num_ops);
2491 data_len = 0;
2492 for (i = 0; i < req->r_num_ops; i++) {
2493 data_len += osd_req_encode_op(req, p, i);
2494 p += sizeof(struct ceph_osd_op);
2495 }
2496
2497 /* snaps */
2498 ceph_encode_64(&p, req->r_snapid);
2499 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
2500 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
2501 if (req->r_snapc) {
2502 for (i = 0; i < req->r_snapc->num_snaps; i++) {
2503 ceph_encode_64(&p, req->r_snapc->snaps[i]);
2504 }
2505 }
2506
2507 req->r_request_attempts = p;
2508 p += 4;
2509
2510 /* data */
2511 if (flags & CEPH_OSD_FLAG_WRITE) {
2512 u16 data_off;
2513
2514 /*
2515 * The header "data_off" is a hint to the receiver
2516 * allowing it to align received data into its
2517 * buffers such that there's no need to re-copy
2518 * it before writing it to disk (direct I/O).
2519 */
2520 data_off = (u16) (off & 0xffff);
2521 req->r_request->hdr.data_off = cpu_to_le16(data_off);
2522 }
2523 req->r_request->hdr.data_len = cpu_to_le32(data_len);
2524
2525 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
2526 msg_size = p - msg->front.iov_base;
2527 msg->front.iov_len = msg_size;
2528 msg->hdr.front_len = cpu_to_le32(msg_size);
2529
2530 dout("build_request msg_size was %d\n", (int)msg_size);
2531}
2532EXPORT_SYMBOL(ceph_osdc_build_request);
2533
2534/*
2535 * Register request, send initial attempt. 2541 * Register request, send initial attempt.
2536 */ 2542 */
2537int ceph_osdc_start_request(struct ceph_osd_client *osdc, 2543int ceph_osdc_start_request(struct ceph_osd_client *osdc,
@@ -2749,15 +2755,12 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
2749 return PTR_ERR(req); 2755 return PTR_ERR(req);
2750 2756
2751 /* it may be a short read due to an object boundary */ 2757 /* it may be a short read due to an object boundary */
2752
2753 osd_req_op_extent_osd_data_pages(req, 0, 2758 osd_req_op_extent_osd_data_pages(req, 0,
2754 pages, *plen, page_align, false, false); 2759 pages, *plen, page_align, false, false);
2755 2760
2756 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 2761 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
2757 off, *plen, *plen, page_align); 2762 off, *plen, *plen, page_align);
2758 2763
2759 ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
2760
2761 rc = ceph_osdc_start_request(osdc, req, false); 2764 rc = ceph_osdc_start_request(osdc, req, false);
2762 if (!rc) 2765 if (!rc)
2763 rc = ceph_osdc_wait_request(osdc, req); 2766 rc = ceph_osdc_wait_request(osdc, req);
@@ -2783,7 +2786,6 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
2783 int rc = 0; 2786 int rc = 0;
2784 int page_align = off & ~PAGE_MASK; 2787 int page_align = off & ~PAGE_MASK;
2785 2788
2786 BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */
2787 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1, 2789 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
2788 CEPH_OSD_OP_WRITE, 2790 CEPH_OSD_OP_WRITE,
2789 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2791 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
@@ -2797,8 +2799,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
2797 false, false); 2799 false, false);
2798 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); 2800 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
2799 2801
2800 ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime); 2802 req->r_mtime = *mtime;
2801
2802 rc = ceph_osdc_start_request(osdc, req, true); 2803 rc = ceph_osdc_start_request(osdc, req, true);
2803 if (!rc) 2804 if (!rc)
2804 rc = ceph_osdc_wait_request(osdc, req); 2805 rc = ceph_osdc_wait_request(osdc, req);