aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/osd_client.c
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2010-04-06 18:01:27 -0400
committerSage Weil <sage@newdream.net>2010-10-20 18:37:18 -0400
commit68b4476b0bc13fef18266b4140309a30e86739d2 (patch)
tree47fab5ea2491c7bc75fe14a3b0d3a091eb6244b7 /fs/ceph/osd_client.c
parent3499e8a5d4dbb083324efd942e2c4fb7eb65f27c (diff)
ceph: messenger and osdc changes for rbd
Allow the messenger to send/receive data in a bio. This is added so that we wouldn't need to copy the data into pages or some other buffer when doing IO for an rbd block device. We can now have trailing variable sized data for osd ops. Also osd ops encoding is more modular. Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net> Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs/ceph/osd_client.c')
-rw-r--r--fs/ceph/osd_client.c249
1 files changed, 195 insertions, 54 deletions
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 2647dafd96f5..c5d818e73add 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -6,12 +6,16 @@
6#include <linux/pagemap.h> 6#include <linux/pagemap.h>
7#include <linux/slab.h> 7#include <linux/slab.h>
8#include <linux/uaccess.h> 8#include <linux/uaccess.h>
9#ifdef CONFIG_BLOCK
10#include <linux/bio.h>
11#endif
9 12
10#include "super.h" 13#include "super.h"
11#include "osd_client.h" 14#include "osd_client.h"
12#include "messenger.h" 15#include "messenger.h"
13#include "decode.h" 16#include "decode.h"
14#include "auth.h" 17#include "auth.h"
18#include "pagelist.h"
15 19
16#define OSD_OP_FRONT_LEN 4096 20#define OSD_OP_FRONT_LEN 4096
17#define OSD_OPREPLY_FRONT_LEN 512 21#define OSD_OPREPLY_FRONT_LEN 512
@@ -22,29 +26,50 @@ static int __kick_requests(struct ceph_osd_client *osdc,
22 26
23static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); 27static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
24 28
29static int op_needs_trail(int op)
30{
31 switch (op) {
32 case CEPH_OSD_OP_GETXATTR:
33 case CEPH_OSD_OP_SETXATTR:
34 case CEPH_OSD_OP_CMPXATTR:
35 case CEPH_OSD_OP_CALL:
36 return 1;
37 default:
38 return 0;
39 }
40}
41
42static int op_has_extent(int op)
43{
44 return (op == CEPH_OSD_OP_READ ||
45 op == CEPH_OSD_OP_WRITE);
46}
47
25void ceph_calc_raw_layout(struct ceph_osd_client *osdc, 48void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
26 struct ceph_file_layout *layout, 49 struct ceph_file_layout *layout,
27 u64 snapid, 50 u64 snapid,
28 u64 off, u64 len, u64 *bno, 51 u64 off, u64 *plen, u64 *bno,
29 struct ceph_osd_request *req) 52 struct ceph_osd_request *req,
53 struct ceph_osd_req_op *op)
30{ 54{
31 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; 55 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
32 struct ceph_osd_op *op = (void *)(reqhead + 1); 56 u64 orig_len = *plen;
33 u64 orig_len = len;
34 u64 objoff, objlen; /* extent in object */ 57 u64 objoff, objlen; /* extent in object */
35 58
36 reqhead->snapid = cpu_to_le64(snapid); 59 reqhead->snapid = cpu_to_le64(snapid);
37 60
38 /* object extent? */ 61 /* object extent? */
39 ceph_calc_file_object_mapping(layout, off, &len, bno, 62 ceph_calc_file_object_mapping(layout, off, plen, bno,
40 &objoff, &objlen); 63 &objoff, &objlen);
41 if (len < orig_len) 64 if (*plen < orig_len)
42 dout(" skipping last %llu, final file extent %llu~%llu\n", 65 dout(" skipping last %llu, final file extent %llu~%llu\n",
43 orig_len - len, off, len); 66 orig_len - *plen, off, *plen);
44 67
45 op->extent.offset = cpu_to_le64(objoff); 68 if (op_has_extent(op->op)) {
46 op->extent.length = cpu_to_le64(objlen); 69 op->extent.offset = objoff;
47 req->r_num_pages = calc_pages_for(off, len); 70 op->extent.length = objlen;
71 }
72 req->r_num_pages = calc_pages_for(off, *plen);
48 73
49 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", 74 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
50 *bno, objoff, objlen, req->r_num_pages); 75 *bno, objoff, objlen, req->r_num_pages);
@@ -80,11 +105,13 @@ static void calc_layout(struct ceph_osd_client *osdc,
80 struct ceph_vino vino, 105 struct ceph_vino vino,
81 struct ceph_file_layout *layout, 106 struct ceph_file_layout *layout,
82 u64 off, u64 *plen, 107 u64 off, u64 *plen,
83 struct ceph_osd_request *req) 108 struct ceph_osd_request *req,
109 struct ceph_osd_req_op *op)
84{ 110{
85 u64 bno; 111 u64 bno;
86 112
87 ceph_calc_raw_layout(osdc, layout, vino.snap, off, *plen, &bno, req); 113 ceph_calc_raw_layout(osdc, layout, vino.snap, off,
114 plen, &bno, req, op);
88 115
89 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno); 116 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
90 req->r_oid_len = strlen(req->r_oid); 117 req->r_oid_len = strlen(req->r_oid);
@@ -113,35 +140,64 @@ void ceph_osdc_release_request(struct kref *kref)
113 if (req->r_own_pages) 140 if (req->r_own_pages)
114 ceph_release_page_vector(req->r_pages, 141 ceph_release_page_vector(req->r_pages,
115 req->r_num_pages); 142 req->r_num_pages);
143#ifdef CONFIG_BLOCK
144 if (req->r_bio)
145 bio_put(req->r_bio);
146#endif
116 ceph_put_snap_context(req->r_snapc); 147 ceph_put_snap_context(req->r_snapc);
148 if (req->r_trail) {
149 ceph_pagelist_release(req->r_trail);
150 kfree(req->r_trail);
151 }
117 if (req->r_mempool) 152 if (req->r_mempool)
118 mempool_free(req, req->r_osdc->req_mempool); 153 mempool_free(req, req->r_osdc->req_mempool);
119 else 154 else
120 kfree(req); 155 kfree(req);
121} 156}
122 157
158static int op_needs_trail(int op)
159{
160 switch (op) {
161 case CEPH_OSD_OP_GETXATTR:
162 case CEPH_OSD_OP_SETXATTR:
163 case CEPH_OSD_OP_CMPXATTR:
164 return 1;
165 default:
166 return 0;
167 }
168}
169
170static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
171{
172 int i = 0;
173
174 if (needs_trail)
175 *needs_trail = 0;
176 while (ops[i].op) {
177 if (needs_trail && op_needs_trail(ops[i].op))
178 *needs_trail = 1;
179 i++;
180 }
181
182 return i;
183}
184
123struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 185struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
124 int flags, 186 int flags,
125 struct ceph_snap_context *snapc, 187 struct ceph_snap_context *snapc,
126 int do_sync, 188 struct ceph_osd_req_op *ops,
127 bool use_mempool, 189 bool use_mempool,
128 gfp_t gfp_flags, 190 gfp_t gfp_flags,
129 struct page **pages) 191 struct page **pages,
192 struct bio *bio)
130{ 193{
131 struct ceph_osd_request *req; 194 struct ceph_osd_request *req;
132 struct ceph_msg *msg; 195 struct ceph_msg *msg;
133 int num_op = 1 + do_sync; 196 int needs_trail;
134 size_t msg_size = sizeof(struct ceph_osd_request_head) + 197 int num_op = get_num_ops(ops, &needs_trail);
135 num_op*sizeof(struct ceph_osd_op); 198 size_t msg_size = sizeof(struct ceph_osd_request_head);
136 199
137 if (use_mempool) { 200 msg_size += num_op*sizeof(struct ceph_osd_op);
138 req = mempool_alloc(osdc->req_mempool, gfp_flags);
139 memset(req, 0, sizeof(*req));
140 } else {
141 req = kzalloc(sizeof(*req), gfp_flags);
142 }
143 if (!req)
144 return NULL;
145 201
146 if (use_mempool) { 202 if (use_mempool) {
147 req = mempool_alloc(osdc->req_mempool, gfp_flags); 203 req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -154,6 +210,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
154 210
155 req->r_osdc = osdc; 211 req->r_osdc = osdc;
156 req->r_mempool = use_mempool; 212 req->r_mempool = use_mempool;
213
157 kref_init(&req->r_kref); 214 kref_init(&req->r_kref);
158 init_completion(&req->r_completion); 215 init_completion(&req->r_completion);
159 init_completion(&req->r_safe_completion); 216 init_completion(&req->r_safe_completion);
@@ -174,6 +231,15 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
174 } 231 }
175 req->r_reply = msg; 232 req->r_reply = msg;
176 233
234 /* allocate space for the trailing data */
235 if (needs_trail) {
236 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
237 if (!req->r_trail) {
238 ceph_osdc_put_request(req);
239 return NULL;
240 }
241 ceph_pagelist_init(req->r_trail);
242 }
177 /* create request message; allow space for oid */ 243 /* create request message; allow space for oid */
178 msg_size += 40; 244 msg_size += 40;
179 if (snapc) 245 if (snapc)
@@ -186,38 +252,87 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
186 ceph_osdc_put_request(req); 252 ceph_osdc_put_request(req);
187 return NULL; 253 return NULL;
188 } 254 }
255
189 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP); 256 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
190 memset(msg->front.iov_base, 0, msg->front.iov_len); 257 memset(msg->front.iov_base, 0, msg->front.iov_len);
191 258
192 req->r_request = msg; 259 req->r_request = msg;
193 req->r_pages = pages; 260 req->r_pages = pages;
261#ifdef CONFIG_BLOCK
262 if (bio) {
263 req->r_bio = bio;
264 bio_get(req->r_bio);
265 }
266#endif
194 267
195 return req; 268 return req;
196} 269}
197 270
271static void osd_req_encode_op(struct ceph_osd_request *req,
272 struct ceph_osd_op *dst,
273 struct ceph_osd_req_op *src)
274{
275 dst->op = cpu_to_le16(src->op);
276
277 switch (dst->op) {
278 case CEPH_OSD_OP_READ:
279 case CEPH_OSD_OP_WRITE:
280 dst->extent.offset =
281 cpu_to_le64(src->extent.offset);
282 dst->extent.length =
283 cpu_to_le64(src->extent.length);
284 dst->extent.truncate_size =
285 cpu_to_le64(src->extent.truncate_size);
286 dst->extent.truncate_seq =
287 cpu_to_le32(src->extent.truncate_seq);
288 break;
289
290 case CEPH_OSD_OP_GETXATTR:
291 case CEPH_OSD_OP_SETXATTR:
292 case CEPH_OSD_OP_CMPXATTR:
293 BUG_ON(!req->r_trail);
294
295 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
296 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
297 dst->xattr.cmp_op = src->xattr.cmp_op;
298 dst->xattr.cmp_mode = src->xattr.cmp_mode;
299 ceph_pagelist_append(req->r_trail, src->xattr.name,
300 src->xattr.name_len);
301 ceph_pagelist_append(req->r_trail, src->xattr.val,
302 src->xattr.value_len);
303 break;
304 case CEPH_OSD_OP_STARTSYNC:
305 break;
306 default:
307 pr_err("unrecognized osd opcode %d\n", dst->op);
308 WARN_ON(1);
309 break;
310 }
311 dst->payload_len = cpu_to_le32(src->payload_len);
312}
313
198/* 314/*
199 * build new request AND message 315 * build new request AND message
200 * 316 *
201 */ 317 */
202void ceph_osdc_build_request(struct ceph_osd_request *req, 318void ceph_osdc_build_request(struct ceph_osd_request *req,
203 u64 off, u64 *plen, 319 u64 off, u64 *plen,
204 int opcode, 320 struct ceph_osd_req_op *src_ops,
205 struct ceph_snap_context *snapc, 321 struct ceph_snap_context *snapc,
206 int do_sync, 322 struct timespec *mtime,
207 u32 truncate_seq, 323 const char *oid,
208 u64 truncate_size, 324 int oid_len)
209 struct timespec *mtime,
210 const char *oid,
211 int oid_len)
212{ 325{
213 struct ceph_msg *msg = req->r_request; 326 struct ceph_msg *msg = req->r_request;
214 struct ceph_osd_request_head *head; 327 struct ceph_osd_request_head *head;
328 struct ceph_osd_req_op *src_op;
215 struct ceph_osd_op *op; 329 struct ceph_osd_op *op;
216 void *p; 330 void *p;
217 int num_op = 1 + do_sync; 331 int num_op = get_num_ops(src_ops, NULL);
218 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 332 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
219 int i;
220 int flags = req->r_flags; 333 int flags = req->r_flags;
334 u64 data_len = 0;
335 int i;
221 336
222 head = msg->front.iov_base; 337 head = msg->front.iov_base;
223 op = (void *)(head + 1); 338 op = (void *)(head + 1);
@@ -230,25 +345,23 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
230 if (flags & CEPH_OSD_FLAG_WRITE) 345 if (flags & CEPH_OSD_FLAG_WRITE)
231 ceph_encode_timespec(&head->mtime, mtime); 346 ceph_encode_timespec(&head->mtime, mtime);
232 head->num_ops = cpu_to_le16(num_op); 347 head->num_ops = cpu_to_le16(num_op);
233 op->op = cpu_to_le16(opcode);
234 348
235 if (flags & CEPH_OSD_FLAG_WRITE) {
236 req->r_request->hdr.data_off = cpu_to_le16(off);
237 req->r_request->hdr.data_len = cpu_to_le32(*plen);
238 op->payload_len = cpu_to_le32(*plen);
239 }
240 op->extent.truncate_size = cpu_to_le64(truncate_size);
241 op->extent.truncate_seq = cpu_to_le32(truncate_seq);
242 349
243 /* fill in oid */ 350 /* fill in oid */
244 head->object_len = cpu_to_le32(oid_len); 351 head->object_len = cpu_to_le32(oid_len);
245 memcpy(p, oid, oid_len); 352 memcpy(p, oid, oid_len);
246 p += oid_len; 353 p += oid_len;
247 354
248 if (do_sync) { 355 src_op = src_ops;
356 while (src_op->op) {
357 osd_req_encode_op(req, op, src_op);
358 src_op++;
249 op++; 359 op++;
250 op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
251 } 360 }
361
362 if (req->r_trail)
363 data_len += req->r_trail->length;
364
252 if (snapc) { 365 if (snapc) {
253 head->snap_seq = cpu_to_le64(snapc->seq); 366 head->snap_seq = cpu_to_le64(snapc->seq);
254 head->num_snaps = cpu_to_le32(snapc->num_snaps); 367 head->num_snaps = cpu_to_le32(snapc->num_snaps);
@@ -258,6 +371,14 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
258 } 371 }
259 } 372 }
260 373
374 if (flags & CEPH_OSD_FLAG_WRITE) {
375 req->r_request->hdr.data_off = cpu_to_le16(off);
376 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
377 } else if (data_len) {
378 req->r_request->hdr.data_off = 0;
379 req->r_request->hdr.data_len = cpu_to_le32(data_len);
380 }
381
261 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 382 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
262 msg_size = p - msg->front.iov_base; 383 msg_size = p - msg->front.iov_base;
263 msg->front.iov_len = msg_size; 384 msg->front.iov_len = msg_size;
@@ -288,21 +409,34 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
288 struct timespec *mtime, 409 struct timespec *mtime,
289 bool use_mempool, int num_reply) 410 bool use_mempool, int num_reply)
290{ 411{
291 struct ceph_osd_request *req = 412 struct ceph_osd_req_op ops[3];
292 ceph_osdc_alloc_request(osdc, flags, 413 struct ceph_osd_request *req;
293 snapc, do_sync, 414
415 ops[0].op = opcode;
416 ops[0].extent.truncate_seq = truncate_seq;
417 ops[0].extent.truncate_size = truncate_size;
418 ops[0].payload_len = 0;
419
420 if (do_sync) {
421 ops[1].op = CEPH_OSD_OP_STARTSYNC;
422 ops[1].payload_len = 0;
423 ops[2].op = 0;
424 } else
425 ops[1].op = 0;
426
427 req = ceph_osdc_alloc_request(osdc, flags,
428 snapc, ops,
294 use_mempool, 429 use_mempool,
295 GFP_NOFS, NULL); 430 GFP_NOFS, NULL, NULL);
296 if (IS_ERR(req)) 431 if (IS_ERR(req))
297 return req; 432 return req;
298 433
299 /* calculate max write size */ 434 /* calculate max write size */
300 calc_layout(osdc, vino, layout, off, plen, req); 435 calc_layout(osdc, vino, layout, off, plen, req, ops);
301 req->r_file_layout = *layout; /* keep a copy */ 436 req->r_file_layout = *layout; /* keep a copy */
302 437
303 ceph_osdc_build_request(req, off, plen, opcode, 438 ceph_osdc_build_request(req, off, plen, ops,
304 snapc, do_sync, 439 snapc,
305 truncate_seq, truncate_size,
306 mtime, 440 mtime,
307 req->r_oid, req->r_oid_len); 441 req->r_oid, req->r_oid_len);
308 442
@@ -1177,6 +1311,10 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1177 1311
1178 req->r_request->pages = req->r_pages; 1312 req->r_request->pages = req->r_pages;
1179 req->r_request->nr_pages = req->r_num_pages; 1313 req->r_request->nr_pages = req->r_num_pages;
1314#ifdef CONFIG_BLOCK
1315 req->r_request->bio = req->r_bio;
1316#endif
1317 req->r_request->trail = req->r_trail;
1180 1318
1181 register_request(osdc, req); 1319 register_request(osdc, req);
1182 1320
@@ -1493,6 +1631,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
1493 } 1631 }
1494 m->pages = req->r_pages; 1632 m->pages = req->r_pages;
1495 m->nr_pages = req->r_num_pages; 1633 m->nr_pages = req->r_num_pages;
1634#ifdef CONFIG_BLOCK
1635 m->bio = req->r_bio;
1636#endif
1496 } 1637 }
1497 *skip = 0; 1638 *skip = 0;
1498 req->r_con_filling_msg = ceph_con_get(con); 1639 req->r_con_filling_msg = ceph_con_get(con);