aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2011-08-03 12:58:09 -0400
committerSage Weil <sage@newdream.net>2011-10-25 19:10:14 -0400
commit7c272194e66e91830b90f6202e61c69f8590f1eb (patch)
tree9a899f357cbf005235fd80ab3b4a240e42498b54 /fs/ceph
parentc3b92c8787367a8bb53d57d9789b558f1295cc96 (diff)
ceph: make readpages fully async
When we get a ->readpages() aop, submit async reads for all page ranges in the provided page list. Lock the pages immediately, so that VFS/MM will block until the reads complete. Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/addr.c185
1 files changed, 115 insertions, 70 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 5a3953db8118..5bb39a50f904 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -228,102 +228,147 @@ static int ceph_readpage(struct file *filp, struct page *page)
228} 228}
229 229
230/* 230/*
231 * Build a vector of contiguous pages from the provided page list. 231 * Finish an async read(ahead) op.
232 */ 232 */
233static struct page **page_vector_from_list(struct list_head *page_list, 233static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
234 unsigned *nr_pages)
235{ 234{
236 struct page **pages; 235 struct inode *inode = req->r_inode;
237 struct page *page; 236 struct ceph_osd_reply_head *replyhead;
238 int next_index, contig_pages = 0; 237 int rc, bytes;
238 int i;
239 239
240 /* build page vector */ 240 /* parse reply */
241 pages = kmalloc(sizeof(*pages) * *nr_pages, GFP_NOFS); 241 replyhead = msg->front.iov_base;
242 if (!pages) 242 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
243 return ERR_PTR(-ENOMEM); 243 rc = le32_to_cpu(replyhead->result);
244 bytes = le32_to_cpu(msg->hdr.data_len);
244 245
245 BUG_ON(list_empty(page_list)); 246 dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
246 next_index = list_entry(page_list->prev, struct page, lru)->index; 247
247 list_for_each_entry_reverse(page, page_list, lru) { 248 /* unlock all pages, zeroing any data we didn't read */
248 if (page->index == next_index) { 249 for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) {
249 dout("readpages page %d %p\n", contig_pages, page); 250 struct page *page = req->r_pages[i];
250 pages[contig_pages] = page; 251
251 contig_pages++; 252 if (bytes < (int)PAGE_CACHE_SIZE) {
252 next_index++; 253 /* zero (remainder of) page */
253 } else { 254 int s = bytes < 0 ? 0 : bytes;
254 break; 255 zero_user_segment(page, s, PAGE_CACHE_SIZE);
255 } 256 }
257 dout("finish_read %p uptodate %p idx %lu\n", inode, page,
258 page->index);
259 flush_dcache_page(page);
260 SetPageUptodate(page);
261 unlock_page(page);
262 page_cache_release(page);
256 } 263 }
257 *nr_pages = contig_pages; 264 kfree(req->r_pages);
258 return pages;
259} 265}
260 266
261/* 267/*
262 * Read multiple pages. Leave pages we don't read + unlock in page_list; 268 * start an async read(ahead) operation. return nr_pages we submitted
263 * the caller (VM) cleans them up. 269 * a read for on success, or negative error code.
264 */ 270 */
265static int ceph_readpages(struct file *file, struct address_space *mapping, 271static int start_read(struct inode *inode, struct list_head *page_list)
266 struct list_head *page_list, unsigned nr_pages)
267{ 272{
268 struct inode *inode = file->f_dentry->d_inode;
269 struct ceph_inode_info *ci = ceph_inode(inode);
270 struct ceph_osd_client *osdc = 273 struct ceph_osd_client *osdc =
271 &ceph_inode_to_client(inode)->client->osdc; 274 &ceph_inode_to_client(inode)->client->osdc;
272 int rc = 0; 275 struct ceph_inode_info *ci = ceph_inode(inode);
273 struct page **pages; 276 struct page *page = list_entry(page_list->prev, struct page, lru);
274 loff_t offset; 277 struct ceph_osd_request *req;
278 u64 off;
275 u64 len; 279 u64 len;
280 int i;
281 struct page **pages;
282 pgoff_t next_index;
283 int nr_pages = 0;
284 int ret;
276 285
277 dout("readpages %p file %p nr_pages %d\n", 286 off = page->index << PAGE_CACHE_SHIFT;
278 inode, file, nr_pages);
279
280 pages = page_vector_from_list(page_list, &nr_pages);
281 if (IS_ERR(pages))
282 return PTR_ERR(pages);
283 287
284 /* guess read extent */ 288 /* count pages */
285 offset = pages[0]->index << PAGE_CACHE_SHIFT; 289 next_index = page->index;
290 list_for_each_entry_reverse(page, page_list, lru) {
291 if (page->index != next_index)
292 break;
293 nr_pages++;
294 next_index++;
295 }
286 len = nr_pages << PAGE_CACHE_SHIFT; 296 len = nr_pages << PAGE_CACHE_SHIFT;
287 rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, 297 dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
288 offset, &len, 298 off, len);
289 ci->i_truncate_seq, ci->i_truncate_size, 299
290 pages, nr_pages, 0); 300 req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode),
291 if (rc == -ENOENT) 301 off, &len,
292 rc = 0; 302 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
293 if (rc < 0) 303 NULL, 0,
294 goto out; 304 ci->i_truncate_seq, ci->i_truncate_size,
295 305 NULL, false, 1, 0);
296 for (; !list_empty(page_list) && len > 0; 306 if (!req)
297 rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) { 307 return -ENOMEM;
298 struct page *page =
299 list_entry(page_list->prev, struct page, lru);
300 308
309 /* build page vector */
310 nr_pages = len >> PAGE_CACHE_SHIFT;
311 pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
312 ret = -ENOMEM;
313 if (!pages)
314 goto out;
315 for (i = 0; i < nr_pages; ++i) {
316 page = list_entry(page_list->prev, struct page, lru);
317 BUG_ON(PageLocked(page));
301 list_del(&page->lru); 318 list_del(&page->lru);
302 319
303 if (rc < (int)PAGE_CACHE_SIZE) { 320 dout("start_read %p adding %p idx %lu\n", inode, page,
304 /* zero (remainder of) page */ 321 page->index);
305 int s = rc < 0 ? 0 : rc; 322 if (add_to_page_cache_lru(page, &inode->i_data, page->index,
306 zero_user_segment(page, s, PAGE_CACHE_SIZE);
307 }
308
309 if (add_to_page_cache_lru(page, mapping, page->index,
310 GFP_NOFS)) { 323 GFP_NOFS)) {
311 page_cache_release(page); 324 page_cache_release(page);
312 dout("readpages %p add_to_page_cache failed %p\n", 325 dout("start_read %p add_to_page_cache failed %p\n",
313 inode, page); 326 inode, page);
314 continue; 327 nr_pages = i;
328 goto out_pages;
315 } 329 }
316 dout("readpages %p adding %p idx %lu\n", inode, page, 330 pages[i] = page;
317 page->index);
318 flush_dcache_page(page);
319 SetPageUptodate(page);
320 unlock_page(page);
321 page_cache_release(page);
322 } 331 }
323 rc = 0; 332 req->r_pages = pages;
333 req->r_num_pages = nr_pages;
334 req->r_callback = finish_read;
335 req->r_inode = inode;
336
337 dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
338 ret = ceph_osdc_start_request(osdc, req, false);
339 if (ret < 0)
340 goto out_pages;
341 ceph_osdc_put_request(req);
342 return nr_pages;
324 343
325out: 344out_pages:
345 ceph_release_page_vector(pages, nr_pages);
326 kfree(pages); 346 kfree(pages);
347out:
348 ceph_osdc_put_request(req);
349 return ret;
350}
351
352
353/*
354 * Read multiple pages. Leave pages we don't read + unlock in page_list;
355 * the caller (VM) cleans them up.
356 */
357static int ceph_readpages(struct file *file, struct address_space *mapping,
358 struct list_head *page_list, unsigned nr_pages)
359{
360 struct inode *inode = file->f_dentry->d_inode;
361 int rc = 0;
362
363 dout("readpages %p file %p nr_pages %d\n", inode, file, nr_pages);
364 while (!list_empty(page_list)) {
365 rc = start_read(inode, page_list);
366 if (rc < 0)
367 goto out;
368 BUG_ON(rc == 0);
369 }
370out:
371 dout("readpages %p file %p ret %d\n", inode, file, rc);
327 return rc; 372 return rc;
328} 373}
329 374