aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/ceph/osd_client.c187
-rw-r--r--fs/ceph/osd_client.h25
2 files changed, 155 insertions, 57 deletions
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 3b5571b8ce22..2647dafd96f5 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -22,6 +22,35 @@ static int __kick_requests(struct ceph_osd_client *osdc,
22 22
23static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); 23static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
24 24
25void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
26 struct ceph_file_layout *layout,
27 u64 snapid,
28 u64 off, u64 len, u64 *bno,
29 struct ceph_osd_request *req)
30{
31 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
32 struct ceph_osd_op *op = (void *)(reqhead + 1);
33 u64 orig_len = len;
34 u64 objoff, objlen; /* extent in object */
35
36 reqhead->snapid = cpu_to_le64(snapid);
37
38 /* object extent? */
39 ceph_calc_file_object_mapping(layout, off, &len, bno,
40 &objoff, &objlen);
41 if (len < orig_len)
42 dout(" skipping last %llu, final file extent %llu~%llu\n",
43 orig_len - len, off, len);
44
45 op->extent.offset = cpu_to_le64(objoff);
46 op->extent.length = cpu_to_le64(objlen);
47 req->r_num_pages = calc_pages_for(off, len);
48
49 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
50 *bno, objoff, objlen, req->r_num_pages);
51
52}
53
25/* 54/*
26 * Implement client access to distributed object storage cluster. 55 * Implement client access to distributed object storage cluster.
27 * 56 *
@@ -48,34 +77,17 @@ static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
48 * fill osd op in request message. 77 * fill osd op in request message.
49 */ 78 */
50static void calc_layout(struct ceph_osd_client *osdc, 79static void calc_layout(struct ceph_osd_client *osdc,
51 struct ceph_vino vino, struct ceph_file_layout *layout, 80 struct ceph_vino vino,
81 struct ceph_file_layout *layout,
52 u64 off, u64 *plen, 82 u64 off, u64 *plen,
53 struct ceph_osd_request *req) 83 struct ceph_osd_request *req)
54{ 84{
55 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
56 struct ceph_osd_op *op = (void *)(reqhead + 1);
57 u64 orig_len = *plen;
58 u64 objoff, objlen; /* extent in object */
59 u64 bno; 85 u64 bno;
60 86
61 reqhead->snapid = cpu_to_le64(vino.snap); 87 ceph_calc_raw_layout(osdc, layout, vino.snap, off, *plen, &bno, req);
62
63 /* object extent? */
64 ceph_calc_file_object_mapping(layout, off, plen, &bno,
65 &objoff, &objlen);
66 if (*plen < orig_len)
67 dout(" skipping last %llu, final file extent %llu~%llu\n",
68 orig_len - *plen, off, *plen);
69 88
70 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno); 89 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
71 req->r_oid_len = strlen(req->r_oid); 90 req->r_oid_len = strlen(req->r_oid);
72
73 op->extent.offset = cpu_to_le64(objoff);
74 op->extent.length = cpu_to_le64(objlen);
75 req->r_num_pages = calc_pages_for(off, *plen);
76
77 dout("calc_layout %s (%d) %llu~%llu (%d pages)\n",
78 req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
79} 91}
80 92
81/* 93/*
@@ -108,43 +120,34 @@ void ceph_osdc_release_request(struct kref *kref)
108 kfree(req); 120 kfree(req);
109} 121}
110 122
111/* 123struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
112 * build new request AND message, calculate layout, and adjust file 124 int flags,
113 * extent as needed.
114 *
115 * if the file was recently truncated, we include information about its
116 * old and new size so that the object can be updated appropriately. (we
117 * avoid synchronously deleting truncated objects because it's slow.)
118 *
119 * if @do_sync, include a 'startsync' command so that the osd will flush
120 * data quickly.
121 */
122struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
123 struct ceph_file_layout *layout,
124 struct ceph_vino vino,
125 u64 off, u64 *plen,
126 int opcode, int flags,
127 struct ceph_snap_context *snapc, 125 struct ceph_snap_context *snapc,
128 int do_sync, 126 int do_sync,
129 u32 truncate_seq, 127 bool use_mempool,
130 u64 truncate_size, 128 gfp_t gfp_flags,
131 struct timespec *mtime, 129 struct page **pages)
132 bool use_mempool, int num_reply)
133{ 130{
134 struct ceph_osd_request *req; 131 struct ceph_osd_request *req;
135 struct ceph_msg *msg; 132 struct ceph_msg *msg;
136 struct ceph_osd_request_head *head;
137 struct ceph_osd_op *op;
138 void *p;
139 int num_op = 1 + do_sync; 133 int num_op = 1 + do_sync;
140 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 134 size_t msg_size = sizeof(struct ceph_osd_request_head) +
141 int i; 135 num_op*sizeof(struct ceph_osd_op);
136
137 if (use_mempool) {
138 req = mempool_alloc(osdc->req_mempool, gfp_flags);
139 memset(req, 0, sizeof(*req));
140 } else {
141 req = kzalloc(sizeof(*req), gfp_flags);
142 }
143 if (!req)
144 return NULL;
142 145
143 if (use_mempool) { 146 if (use_mempool) {
144 req = mempool_alloc(osdc->req_mempool, GFP_NOFS); 147 req = mempool_alloc(osdc->req_mempool, gfp_flags);
145 memset(req, 0, sizeof(*req)); 148 memset(req, 0, sizeof(*req));
146 } else { 149 } else {
147 req = kzalloc(sizeof(*req), GFP_NOFS); 150 req = kzalloc(sizeof(*req), gfp_flags);
148 } 151 }
149 if (req == NULL) 152 if (req == NULL)
150 return NULL; 153 return NULL;
@@ -164,7 +167,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
164 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 167 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
165 else 168 else
166 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 169 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
167 OSD_OPREPLY_FRONT_LEN, GFP_NOFS); 170 OSD_OPREPLY_FRONT_LEN, gfp_flags);
168 if (!msg) { 171 if (!msg) {
169 ceph_osdc_put_request(req); 172 ceph_osdc_put_request(req);
170 return NULL; 173 return NULL;
@@ -178,18 +181,48 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
178 if (use_mempool) 181 if (use_mempool)
179 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 182 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
180 else 183 else
181 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, GFP_NOFS); 184 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
182 if (!msg) { 185 if (!msg) {
183 ceph_osdc_put_request(req); 186 ceph_osdc_put_request(req);
184 return NULL; 187 return NULL;
185 } 188 }
186 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP); 189 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
187 memset(msg->front.iov_base, 0, msg->front.iov_len); 190 memset(msg->front.iov_base, 0, msg->front.iov_len);
191
192 req->r_request = msg;
193 req->r_pages = pages;
194
195 return req;
196}
197
198/*
199 * build new request AND message
200 *
201 */
202void ceph_osdc_build_request(struct ceph_osd_request *req,
203 u64 off, u64 *plen,
204 int opcode,
205 struct ceph_snap_context *snapc,
206 int do_sync,
207 u32 truncate_seq,
208 u64 truncate_size,
209 struct timespec *mtime,
210 const char *oid,
211 int oid_len)
212{
213 struct ceph_msg *msg = req->r_request;
214 struct ceph_osd_request_head *head;
215 struct ceph_osd_op *op;
216 void *p;
217 int num_op = 1 + do_sync;
218 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
219 int i;
220 int flags = req->r_flags;
221
188 head = msg->front.iov_base; 222 head = msg->front.iov_base;
189 op = (void *)(head + 1); 223 op = (void *)(head + 1);
190 p = (void *)(op + num_op); 224 p = (void *)(op + num_op);
191 225
192 req->r_request = msg;
193 req->r_snapc = ceph_get_snap_context(snapc); 226 req->r_snapc = ceph_get_snap_context(snapc);
194 227
195 head->client_inc = cpu_to_le32(1); /* always, for now. */ 228 head->client_inc = cpu_to_le32(1); /* always, for now. */
@@ -199,10 +232,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
199 head->num_ops = cpu_to_le16(num_op); 232 head->num_ops = cpu_to_le16(num_op);
200 op->op = cpu_to_le16(opcode); 233 op->op = cpu_to_le16(opcode);
201 234
202 /* calculate max write size */
203 calc_layout(osdc, vino, layout, off, plen, req);
204 req->r_file_layout = *layout; /* keep a copy */
205
206 if (flags & CEPH_OSD_FLAG_WRITE) { 235 if (flags & CEPH_OSD_FLAG_WRITE) {
207 req->r_request->hdr.data_off = cpu_to_le16(off); 236 req->r_request->hdr.data_off = cpu_to_le16(off);
208 req->r_request->hdr.data_len = cpu_to_le32(*plen); 237 req->r_request->hdr.data_len = cpu_to_le32(*plen);
@@ -212,9 +241,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
212 op->extent.truncate_seq = cpu_to_le32(truncate_seq); 241 op->extent.truncate_seq = cpu_to_le32(truncate_seq);
213 242
214 /* fill in oid */ 243 /* fill in oid */
215 head->object_len = cpu_to_le32(req->r_oid_len); 244 head->object_len = cpu_to_le32(oid_len);
216 memcpy(p, req->r_oid, req->r_oid_len); 245 memcpy(p, oid, oid_len);
217 p += req->r_oid_len; 246 p += oid_len;
218 247
219 if (do_sync) { 248 if (do_sync) {
220 op++; 249 op++;
@@ -233,6 +262,50 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
233 msg_size = p - msg->front.iov_base; 262 msg_size = p - msg->front.iov_base;
234 msg->front.iov_len = msg_size; 263 msg->front.iov_len = msg_size;
235 msg->hdr.front_len = cpu_to_le32(msg_size); 264 msg->hdr.front_len = cpu_to_le32(msg_size);
265 return;
266}
267
268/*
269 * build new request AND message, calculate layout, and adjust file
270 * extent as needed.
271 *
272 * if the file was recently truncated, we include information about its
273 * old and new size so that the object can be updated appropriately. (we
274 * avoid synchronously deleting truncated objects because it's slow.)
275 *
276 * if @do_sync, include a 'startsync' command so that the osd will flush
277 * data quickly.
278 */
279struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
280 struct ceph_file_layout *layout,
281 struct ceph_vino vino,
282 u64 off, u64 *plen,
283 int opcode, int flags,
284 struct ceph_snap_context *snapc,
285 int do_sync,
286 u32 truncate_seq,
287 u64 truncate_size,
288 struct timespec *mtime,
289 bool use_mempool, int num_reply)
290{
291 struct ceph_osd_request *req =
292 ceph_osdc_alloc_request(osdc, flags,
293 snapc, do_sync,
294 use_mempool,
295 GFP_NOFS, NULL);
296 if (IS_ERR(req))
297 return req;
298
299 /* calculate max write size */
300 calc_layout(osdc, vino, layout, off, plen, req);
301 req->r_file_layout = *layout; /* keep a copy */
302
303 ceph_osdc_build_request(req, off, plen, opcode,
304 snapc, do_sync,
305 truncate_seq, truncate_size,
306 mtime,
307 req->r_oid, req->r_oid_len);
308
236 return req; 309 return req;
237} 310}
238 311
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index ce776989ef6a..b687c2ea72e6 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -119,6 +119,31 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
119extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, 119extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
120 struct ceph_msg *msg); 120 struct ceph_msg *msg);
121 121
122extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
123 struct ceph_file_layout *layout,
124 u64 snapid,
125 u64 off, u64 len, u64 *bno,
126 struct ceph_osd_request *req);
127
128extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
129 int flags,
130 struct ceph_snap_context *snapc,
131 int do_sync,
132 bool use_mempool,
133 gfp_t gfp_flags,
134 struct page **pages);
135
136extern void ceph_osdc_build_request(struct ceph_osd_request *req,
137 u64 off, u64 *plen,
138 int opcode,
139 struct ceph_snap_context *snapc,
140 int do_sync,
141 u32 truncate_seq,
142 u64 truncate_size,
143 struct timespec *mtime,
144 const char *oid,
145 int oid_len);
146
122extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, 147extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
123 struct ceph_file_layout *layout, 148 struct ceph_file_layout *layout,
124 struct ceph_vino vino, 149 struct ceph_vino vino,