aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2010-04-06 17:51:47 -0400
committerSage Weil <sage@newdream.net>2010-10-20 18:36:01 -0400
commit3499e8a5d4dbb083324efd942e2c4fb7eb65f27c (patch)
tree7c3914991f81bcf2a9153047e7f1dad2ae0da74b
parent7669a2c95e502a77f93f27e5449fc93a00d588b6 (diff)
ceph: refactor osdc requests creation functions
The osd requests creation are being decoupled from the vino parameter, allowing clients using the osd to use other arbitrary object names that are not necessarily vino based. Also, calc_raw_layout now takes a snap id. Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net> Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r--fs/ceph/osd_client.c187
-rw-r--r--fs/ceph/osd_client.h25
2 files changed, 155 insertions, 57 deletions
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 3b5571b8ce22..2647dafd96f5 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -22,6 +22,35 @@ static int __kick_requests(struct ceph_osd_client *osdc,
22 22
23static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); 23static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
24 24
25void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
26 struct ceph_file_layout *layout,
27 u64 snapid,
28 u64 off, u64 len, u64 *bno,
29 struct ceph_osd_request *req)
30{
31 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
32 struct ceph_osd_op *op = (void *)(reqhead + 1);
33 u64 orig_len = len;
34 u64 objoff, objlen; /* extent in object */
35
36 reqhead->snapid = cpu_to_le64(snapid);
37
38 /* object extent? */
39 ceph_calc_file_object_mapping(layout, off, &len, bno,
40 &objoff, &objlen);
41 if (len < orig_len)
42 dout(" skipping last %llu, final file extent %llu~%llu\n",
43 orig_len - len, off, len);
44
45 op->extent.offset = cpu_to_le64(objoff);
46 op->extent.length = cpu_to_le64(objlen);
47 req->r_num_pages = calc_pages_for(off, len);
48
49 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
50 *bno, objoff, objlen, req->r_num_pages);
51
52}
53
25/* 54/*
26 * Implement client access to distributed object storage cluster. 55 * Implement client access to distributed object storage cluster.
27 * 56 *
@@ -48,34 +77,17 @@ static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
48 * fill osd op in request message. 77 * fill osd op in request message.
49 */ 78 */
50static void calc_layout(struct ceph_osd_client *osdc, 79static void calc_layout(struct ceph_osd_client *osdc,
51 struct ceph_vino vino, struct ceph_file_layout *layout, 80 struct ceph_vino vino,
81 struct ceph_file_layout *layout,
52 u64 off, u64 *plen, 82 u64 off, u64 *plen,
53 struct ceph_osd_request *req) 83 struct ceph_osd_request *req)
54{ 84{
55 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
56 struct ceph_osd_op *op = (void *)(reqhead + 1);
57 u64 orig_len = *plen;
58 u64 objoff, objlen; /* extent in object */
59 u64 bno; 85 u64 bno;
60 86
61 reqhead->snapid = cpu_to_le64(vino.snap); 87 ceph_calc_raw_layout(osdc, layout, vino.snap, off, *plen, &bno, req);
62
63 /* object extent? */
64 ceph_calc_file_object_mapping(layout, off, plen, &bno,
65 &objoff, &objlen);
66 if (*plen < orig_len)
67 dout(" skipping last %llu, final file extent %llu~%llu\n",
68 orig_len - *plen, off, *plen);
69 88
70 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno); 89 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
71 req->r_oid_len = strlen(req->r_oid); 90 req->r_oid_len = strlen(req->r_oid);
72
73 op->extent.offset = cpu_to_le64(objoff);
74 op->extent.length = cpu_to_le64(objlen);
75 req->r_num_pages = calc_pages_for(off, *plen);
76
77 dout("calc_layout %s (%d) %llu~%llu (%d pages)\n",
78 req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
79} 91}
80 92
81/* 93/*
@@ -108,43 +120,34 @@ void ceph_osdc_release_request(struct kref *kref)
108 kfree(req); 120 kfree(req);
109} 121}
110 122
111/* 123struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
112 * build new request AND message, calculate layout, and adjust file 124 int flags,
113 * extent as needed.
114 *
115 * if the file was recently truncated, we include information about its
116 * old and new size so that the object can be updated appropriately. (we
117 * avoid synchronously deleting truncated objects because it's slow.)
118 *
119 * if @do_sync, include a 'startsync' command so that the osd will flush
120 * data quickly.
121 */
122struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
123 struct ceph_file_layout *layout,
124 struct ceph_vino vino,
125 u64 off, u64 *plen,
126 int opcode, int flags,
127 struct ceph_snap_context *snapc, 125 struct ceph_snap_context *snapc,
128 int do_sync, 126 int do_sync,
129 u32 truncate_seq, 127 bool use_mempool,
130 u64 truncate_size, 128 gfp_t gfp_flags,
131 struct timespec *mtime, 129 struct page **pages)
132 bool use_mempool, int num_reply)
133{ 130{
134 struct ceph_osd_request *req; 131 struct ceph_osd_request *req;
135 struct ceph_msg *msg; 132 struct ceph_msg *msg;
136 struct ceph_osd_request_head *head;
137 struct ceph_osd_op *op;
138 void *p;
139 int num_op = 1 + do_sync; 133 int num_op = 1 + do_sync;
140 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 134 size_t msg_size = sizeof(struct ceph_osd_request_head) +
141 int i; 135 num_op*sizeof(struct ceph_osd_op);
136
137 if (use_mempool) {
138 req = mempool_alloc(osdc->req_mempool, gfp_flags);
139 memset(req, 0, sizeof(*req));
140 } else {
141 req = kzalloc(sizeof(*req), gfp_flags);
142 }
143 if (!req)
144 return NULL;
142 145
143 if (use_mempool) { 146 if (use_mempool) {
144 req = mempool_alloc(osdc->req_mempool, GFP_NOFS); 147 req = mempool_alloc(osdc->req_mempool, gfp_flags);
145 memset(req, 0, sizeof(*req)); 148 memset(req, 0, sizeof(*req));
146 } else { 149 } else {
147 req = kzalloc(sizeof(*req), GFP_NOFS); 150 req = kzalloc(sizeof(*req), gfp_flags);
148 } 151 }
149 if (req == NULL) 152 if (req == NULL)
150 return NULL; 153 return NULL;
@@ -164,7 +167,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
164 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 167 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
165 else 168 else
166 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 169 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
167 OSD_OPREPLY_FRONT_LEN, GFP_NOFS); 170 OSD_OPREPLY_FRONT_LEN, gfp_flags);
168 if (!msg) { 171 if (!msg) {
169 ceph_osdc_put_request(req); 172 ceph_osdc_put_request(req);
170 return NULL; 173 return NULL;
@@ -178,18 +181,48 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
178 if (use_mempool) 181 if (use_mempool)
179 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 182 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
180 else 183 else
181 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, GFP_NOFS); 184 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
182 if (!msg) { 185 if (!msg) {
183 ceph_osdc_put_request(req); 186 ceph_osdc_put_request(req);
184 return NULL; 187 return NULL;
185 } 188 }
186 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP); 189 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
187 memset(msg->front.iov_base, 0, msg->front.iov_len); 190 memset(msg->front.iov_base, 0, msg->front.iov_len);
191
192 req->r_request = msg;
193 req->r_pages = pages;
194
195 return req;
196}
197
198/*
199 * build new request AND message
200 *
201 */
202void ceph_osdc_build_request(struct ceph_osd_request *req,
203 u64 off, u64 *plen,
204 int opcode,
205 struct ceph_snap_context *snapc,
206 int do_sync,
207 u32 truncate_seq,
208 u64 truncate_size,
209 struct timespec *mtime,
210 const char *oid,
211 int oid_len)
212{
213 struct ceph_msg *msg = req->r_request;
214 struct ceph_osd_request_head *head;
215 struct ceph_osd_op *op;
216 void *p;
217 int num_op = 1 + do_sync;
218 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
219 int i;
220 int flags = req->r_flags;
221
188 head = msg->front.iov_base; 222 head = msg->front.iov_base;
189 op = (void *)(head + 1); 223 op = (void *)(head + 1);
190 p = (void *)(op + num_op); 224 p = (void *)(op + num_op);
191 225
192 req->r_request = msg;
193 req->r_snapc = ceph_get_snap_context(snapc); 226 req->r_snapc = ceph_get_snap_context(snapc);
194 227
195 head->client_inc = cpu_to_le32(1); /* always, for now. */ 228 head->client_inc = cpu_to_le32(1); /* always, for now. */
@@ -199,10 +232,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
199 head->num_ops = cpu_to_le16(num_op); 232 head->num_ops = cpu_to_le16(num_op);
200 op->op = cpu_to_le16(opcode); 233 op->op = cpu_to_le16(opcode);
201 234
202 /* calculate max write size */
203 calc_layout(osdc, vino, layout, off, plen, req);
204 req->r_file_layout = *layout; /* keep a copy */
205
206 if (flags & CEPH_OSD_FLAG_WRITE) { 235 if (flags & CEPH_OSD_FLAG_WRITE) {
207 req->r_request->hdr.data_off = cpu_to_le16(off); 236 req->r_request->hdr.data_off = cpu_to_le16(off);
208 req->r_request->hdr.data_len = cpu_to_le32(*plen); 237 req->r_request->hdr.data_len = cpu_to_le32(*plen);
@@ -212,9 +241,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
212 op->extent.truncate_seq = cpu_to_le32(truncate_seq); 241 op->extent.truncate_seq = cpu_to_le32(truncate_seq);
213 242
214 /* fill in oid */ 243 /* fill in oid */
215 head->object_len = cpu_to_le32(req->r_oid_len); 244 head->object_len = cpu_to_le32(oid_len);
216 memcpy(p, req->r_oid, req->r_oid_len); 245 memcpy(p, oid, oid_len);
217 p += req->r_oid_len; 246 p += oid_len;
218 247
219 if (do_sync) { 248 if (do_sync) {
220 op++; 249 op++;
@@ -233,6 +262,50 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
233 msg_size = p - msg->front.iov_base; 262 msg_size = p - msg->front.iov_base;
234 msg->front.iov_len = msg_size; 263 msg->front.iov_len = msg_size;
235 msg->hdr.front_len = cpu_to_le32(msg_size); 264 msg->hdr.front_len = cpu_to_le32(msg_size);
265 return;
266}
267
268/*
269 * build new request AND message, calculate layout, and adjust file
270 * extent as needed.
271 *
272 * if the file was recently truncated, we include information about its
273 * old and new size so that the object can be updated appropriately. (we
274 * avoid synchronously deleting truncated objects because it's slow.)
275 *
276 * if @do_sync, include a 'startsync' command so that the osd will flush
277 * data quickly.
278 */
279struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
280 struct ceph_file_layout *layout,
281 struct ceph_vino vino,
282 u64 off, u64 *plen,
283 int opcode, int flags,
284 struct ceph_snap_context *snapc,
285 int do_sync,
286 u32 truncate_seq,
287 u64 truncate_size,
288 struct timespec *mtime,
289 bool use_mempool, int num_reply)
290{
291 struct ceph_osd_request *req =
292 ceph_osdc_alloc_request(osdc, flags,
293 snapc, do_sync,
294 use_mempool,
295 GFP_NOFS, NULL);
296 if (IS_ERR(req))
297 return req;
298
299 /* calculate max write size */
300 calc_layout(osdc, vino, layout, off, plen, req);
301 req->r_file_layout = *layout; /* keep a copy */
302
303 ceph_osdc_build_request(req, off, plen, opcode,
304 snapc, do_sync,
305 truncate_seq, truncate_size,
306 mtime,
307 req->r_oid, req->r_oid_len);
308
236 return req; 309 return req;
237} 310}
238 311
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index ce776989ef6a..b687c2ea72e6 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -119,6 +119,31 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
119extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, 119extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
120 struct ceph_msg *msg); 120 struct ceph_msg *msg);
121 121
122extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
123 struct ceph_file_layout *layout,
124 u64 snapid,
125 u64 off, u64 len, u64 *bno,
126 struct ceph_osd_request *req);
127
128extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
129 int flags,
130 struct ceph_snap_context *snapc,
131 int do_sync,
132 bool use_mempool,
133 gfp_t gfp_flags,
134 struct page **pages);
135
136extern void ceph_osdc_build_request(struct ceph_osd_request *req,
137 u64 off, u64 *plen,
138 int opcode,
139 struct ceph_snap_context *snapc,
140 int do_sync,
141 u32 truncate_seq,
142 u64 truncate_size,
143 struct timespec *mtime,
144 const char *oid,
145 int oid_len);
146
122extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, 147extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
123 struct ceph_file_layout *layout, 148 struct ceph_file_layout *layout,
124 struct ceph_vino vino, 149 struct ceph_vino vino,