aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/addr.c
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-10-06 14:31:09 -0400
committerSage Weil <sage@newdream.net>2009-10-06 14:31:09 -0400
commit1d3576fd10f0d7a104204267b81cf84a07028dad (patch)
tree4567d205f28255ca7211b82c962ad42fd55e733b /fs/ceph/addr.c
parent124e68e74099090e28da5518f73fda878e7e8232 (diff)
ceph: address space operations
The ceph address space methods are concerned primarily with managing the dirty page accounting in the inode, which (among other things) must keep track of which snapshot context each page was dirtied in, and ensure that dirty data is written out to the OSDs in snapshort order. A writepage() on a page that is not currently writeable due to snapshot writeback ordering constraints is ignored (it was presumably called from kswapd). Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs/ceph/addr.c')
-rw-r--r--fs/ceph/addr.c1115
1 files changed, 1115 insertions, 0 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
new file mode 100644
index 000000000000..c7d673ffe023
--- /dev/null
+++ b/fs/ceph/addr.c
@@ -0,0 +1,1115 @@
1#include "ceph_debug.h"
2
3#include <linux/backing-dev.h>
4#include <linux/fs.h>
5#include <linux/mm.h>
6#include <linux/pagemap.h>
7#include <linux/writeback.h> /* generic_writepages */
8#include <linux/pagevec.h>
9#include <linux/task_io_accounting_ops.h>
10
11#include "super.h"
12#include "osd_client.h"
13
14/*
15 * Ceph address space ops.
16 *
17 * There are a few funny things going on here.
18 *
19 * The page->private field is used to reference a struct
20 * ceph_snap_context for _every_ dirty page. This indicates which
21 * snapshot the page was logically dirtied in, and thus which snap
22 * context needs to be associated with the osd write during writeback.
23 *
24 * Similarly, struct ceph_inode_info maintains a set of counters to
25 * count dirty pages on the inode. In the absense of snapshots,
26 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count.
27 *
28 * When a snapshot is taken (that is, when the client receives
29 * notification that a snapshot was taken), each inode with caps and
30 * with dirty pages (dirty pages implies there is a cap) gets a new
31 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending
32 * order, new snaps go to the tail). The i_wrbuffer_ref_head count is
33 * moved to capsnap->dirty. (Unless a sync write is currently in
34 * progress. In that case, the capsnap is said to be "pending", new
35 * writes cannot start, and the capsnap isn't "finalized" until the
36 * write completes (or fails) and a final size/mtime for the inode for
37 * that snap can be settled upon.) i_wrbuffer_ref_head is reset to 0.
38 *
39 * On writeback, we must submit writes to the osd IN SNAP ORDER. So,
40 * we look for the first capsnap in i_cap_snaps and write out pages in
41 * that snap context _only_. Then we move on to the next capsnap,
42 * eventually reaching the "live" or "head" context (i.e., pages that
43 * are not yet snapped) and are writing the most recently dirtied
44 * pages.
45 *
46 * Invalidate and so forth must take care to ensure the dirty page
47 * accounting is preserved.
48 */
49
50
51/*
52 * Dirty a page. Optimistically adjust accounting, on the assumption
53 * that we won't race with invalidate. If we do, readjust.
54 */
55static int ceph_set_page_dirty(struct page *page)
56{
57 struct address_space *mapping = page->mapping;
58 struct inode *inode;
59 struct ceph_inode_info *ci;
60 int undo = 0;
61 struct ceph_snap_context *snapc;
62
63 if (unlikely(!mapping))
64 return !TestSetPageDirty(page);
65
66 if (TestSetPageDirty(page)) {
67 dout("%p set_page_dirty %p idx %lu -- already dirty\n",
68 mapping->host, page, page->index);
69 return 0;
70 }
71
72 inode = mapping->host;
73 ci = ceph_inode(inode);
74
75 /*
76 * Note that we're grabbing a snapc ref here without holding
77 * any locks!
78 */
79 snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
80
81 /* dirty the head */
82 spin_lock(&inode->i_lock);
83 if (ci->i_wrbuffer_ref_head == 0)
84 ci->i_head_snapc = ceph_get_snap_context(snapc);
85 ++ci->i_wrbuffer_ref_head;
86 if (ci->i_wrbuffer_ref == 0)
87 igrab(inode);
88 ++ci->i_wrbuffer_ref;
89 dout("%p set_page_dirty %p idx %lu head %d/%d -> %d/%d "
90 "snapc %p seq %lld (%d snaps)\n",
91 mapping->host, page, page->index,
92 ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,
93 ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
94 snapc, snapc->seq, snapc->num_snaps);
95 spin_unlock(&inode->i_lock);
96
97 /* now adjust page */
98 spin_lock_irq(&mapping->tree_lock);
99 if (page->mapping) { /* Race with truncate? */
100 WARN_ON_ONCE(!PageUptodate(page));
101
102 if (mapping_cap_account_dirty(mapping)) {
103 __inc_zone_page_state(page, NR_FILE_DIRTY);
104 __inc_bdi_stat(mapping->backing_dev_info,
105 BDI_RECLAIMABLE);
106 task_io_account_write(PAGE_CACHE_SIZE);
107 }
108 radix_tree_tag_set(&mapping->page_tree,
109 page_index(page), PAGECACHE_TAG_DIRTY);
110
111 /*
112 * Reference snap context in page->private. Also set
113 * PagePrivate so that we get invalidatepage callback.
114 */
115 page->private = (unsigned long)snapc;
116 SetPagePrivate(page);
117 } else {
118 dout("ANON set_page_dirty %p (raced truncate?)\n", page);
119 undo = 1;
120 }
121
122 spin_unlock_irq(&mapping->tree_lock);
123
124 if (undo)
125 /* whoops, we failed to dirty the page */
126 ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
127
128 __mark_inode_dirty(mapping->host, I_DIRTY_PAGES);
129
130 BUG_ON(!PageDirty(page));
131 return 1;
132}
133
134/*
135 * If we are truncating the full page (i.e. offset == 0), adjust the
136 * dirty page counters appropriately. Only called if there is private
137 * data on the page.
138 */
139static void ceph_invalidatepage(struct page *page, unsigned long offset)
140{
141 struct inode *inode = page->mapping->host;
142 struct ceph_inode_info *ci;
143 struct ceph_snap_context *snapc = (void *)page->private;
144
145 BUG_ON(!PageLocked(page));
146 BUG_ON(!page->private);
147 BUG_ON(!PagePrivate(page));
148 BUG_ON(!page->mapping);
149
150 /*
151 * We can get non-dirty pages here due to races between
152 * set_page_dirty and truncate_complete_page; just spit out a
153 * warning, in case we end up with accounting problems later.
154 */
155 if (!PageDirty(page))
156 pr_err("%p invalidatepage %p page not dirty\n", inode, page);
157
158 if (offset == 0)
159 ClearPageChecked(page);
160
161 ci = ceph_inode(inode);
162 if (offset == 0) {
163 dout("%p invalidatepage %p idx %lu full dirty page %lu\n",
164 inode, page, page->index, offset);
165 ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
166 ceph_put_snap_context(snapc);
167 page->private = 0;
168 ClearPagePrivate(page);
169 } else {
170 dout("%p invalidatepage %p idx %lu partial dirty page\n",
171 inode, page, page->index);
172 }
173}
174
175/* just a sanity check */
176static int ceph_releasepage(struct page *page, gfp_t g)
177{
178 struct inode *inode = page->mapping ? page->mapping->host : NULL;
179 dout("%p releasepage %p idx %lu\n", inode, page, page->index);
180 WARN_ON(PageDirty(page));
181 WARN_ON(page->private);
182 WARN_ON(PagePrivate(page));
183 return 0;
184}
185
186/*
187 * read a single page, without unlocking it.
188 */
189static int readpage_nounlock(struct file *filp, struct page *page)
190{
191 struct inode *inode = filp->f_dentry->d_inode;
192 struct ceph_inode_info *ci = ceph_inode(inode);
193 struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
194 int err = 0;
195 u64 len = PAGE_CACHE_SIZE;
196
197 dout("readpage inode %p file %p page %p index %lu\n",
198 inode, filp, page, page->index);
199 err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
200 page->index << PAGE_CACHE_SHIFT, &len,
201 ci->i_truncate_seq, ci->i_truncate_size,
202 &page, 1);
203 if (err == -ENOENT)
204 err = 0;
205 if (err < 0) {
206 SetPageError(page);
207 goto out;
208 } else if (err < PAGE_CACHE_SIZE) {
209 /* zero fill remainder of page */
210 zero_user_segment(page, err, PAGE_CACHE_SIZE);
211 }
212 SetPageUptodate(page);
213
214out:
215 return err < 0 ? err : 0;
216}
217
218static int ceph_readpage(struct file *filp, struct page *page)
219{
220 int r = readpage_nounlock(filp, page);
221 unlock_page(page);
222 return r;
223}
224
225/*
226 * Build a vector of contiguous pages from the provided page list.
227 */
228static struct page **page_vector_from_list(struct list_head *page_list,
229 unsigned *nr_pages)
230{
231 struct page **pages;
232 struct page *page;
233 int next_index, contig_pages = 0;
234
235 /* build page vector */
236 pages = kmalloc(sizeof(*pages) * *nr_pages, GFP_NOFS);
237 if (!pages)
238 return ERR_PTR(-ENOMEM);
239
240 BUG_ON(list_empty(page_list));
241 next_index = list_entry(page_list->prev, struct page, lru)->index;
242 list_for_each_entry_reverse(page, page_list, lru) {
243 if (page->index == next_index) {
244 dout("readpages page %d %p\n", contig_pages, page);
245 pages[contig_pages] = page;
246 contig_pages++;
247 next_index++;
248 } else {
249 break;
250 }
251 }
252 *nr_pages = contig_pages;
253 return pages;
254}
255
256/*
257 * Read multiple pages. Leave pages we don't read + unlock in page_list;
258 * the caller (VM) cleans them up.
259 */
260static int ceph_readpages(struct file *file, struct address_space *mapping,
261 struct list_head *page_list, unsigned nr_pages)
262{
263 struct inode *inode = file->f_dentry->d_inode;
264 struct ceph_inode_info *ci = ceph_inode(inode);
265 struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
266 int rc = 0;
267 struct page **pages;
268 struct pagevec pvec;
269 loff_t offset;
270 u64 len;
271
272 dout("readpages %p file %p nr_pages %d\n",
273 inode, file, nr_pages);
274
275 pages = page_vector_from_list(page_list, &nr_pages);
276 if (IS_ERR(pages))
277 return PTR_ERR(pages);
278
279 /* guess read extent */
280 offset = pages[0]->index << PAGE_CACHE_SHIFT;
281 len = nr_pages << PAGE_CACHE_SHIFT;
282 rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
283 offset, &len,
284 ci->i_truncate_seq, ci->i_truncate_size,
285 pages, nr_pages);
286 if (rc == -ENOENT)
287 rc = 0;
288 if (rc < 0)
289 goto out;
290
291 /* set uptodate and add to lru in pagevec-sized chunks */
292 pagevec_init(&pvec, 0);
293 for (; !list_empty(page_list) && len > 0;
294 rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) {
295 struct page *page =
296 list_entry(page_list->prev, struct page, lru);
297
298 list_del(&page->lru);
299
300 if (rc < (int)PAGE_CACHE_SIZE) {
301 /* zero (remainder of) page */
302 int s = rc < 0 ? 0 : rc;
303 zero_user_segment(page, s, PAGE_CACHE_SIZE);
304 }
305
306 if (add_to_page_cache(page, mapping, page->index, GFP_NOFS)) {
307 page_cache_release(page);
308 dout("readpages %p add_to_page_cache failed %p\n",
309 inode, page);
310 continue;
311 }
312 dout("readpages %p adding %p idx %lu\n", inode, page,
313 page->index);
314 flush_dcache_page(page);
315 SetPageUptodate(page);
316 unlock_page(page);
317 if (pagevec_add(&pvec, page) == 0)
318 pagevec_lru_add_file(&pvec); /* add to lru */
319 }
320 pagevec_lru_add_file(&pvec);
321 rc = 0;
322
323out:
324 kfree(pages);
325 return rc;
326}
327
328/*
329 * Get ref for the oldest snapc for an inode with dirty data... that is, the
330 * only snap context we are allowed to write back.
331 *
332 * Caller holds i_lock.
333 */
334static struct ceph_snap_context *__get_oldest_context(struct inode *inode,
335 u64 *snap_size)
336{
337 struct ceph_inode_info *ci = ceph_inode(inode);
338 struct ceph_snap_context *snapc = NULL;
339 struct ceph_cap_snap *capsnap = NULL;
340
341 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
342 dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
343 capsnap->context, capsnap->dirty_pages);
344 if (capsnap->dirty_pages) {
345 snapc = ceph_get_snap_context(capsnap->context);
346 if (snap_size)
347 *snap_size = capsnap->size;
348 break;
349 }
350 }
351 if (!snapc && ci->i_snap_realm) {
352 snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
353 dout(" head snapc %p has %d dirty pages\n",
354 snapc, ci->i_wrbuffer_ref_head);
355 }
356 return snapc;
357}
358
359static struct ceph_snap_context *get_oldest_context(struct inode *inode,
360 u64 *snap_size)
361{
362 struct ceph_snap_context *snapc = NULL;
363
364 spin_lock(&inode->i_lock);
365 snapc = __get_oldest_context(inode, snap_size);
366 spin_unlock(&inode->i_lock);
367 return snapc;
368}
369
370/*
371 * Write a single page, but leave the page locked.
372 *
373 * If we get a write error, set the page error bit, but still adjust the
374 * dirty page accounting (i.e., page is no longer dirty).
375 */
376static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
377{
378 struct inode *inode;
379 struct ceph_inode_info *ci;
380 struct ceph_osd_client *osdc;
381 loff_t page_off = page->index << PAGE_CACHE_SHIFT;
382 int len = PAGE_CACHE_SIZE;
383 loff_t i_size;
384 int err = 0;
385 struct ceph_snap_context *snapc;
386 u64 snap_size = 0;
387
388 dout("writepage %p idx %lu\n", page, page->index);
389
390 if (!page->mapping || !page->mapping->host) {
391 dout("writepage %p - no mapping\n", page);
392 return -EFAULT;
393 }
394 inode = page->mapping->host;
395 ci = ceph_inode(inode);
396 osdc = &ceph_inode_to_client(inode)->osdc;
397
398 /* verify this is a writeable snap context */
399 snapc = (void *)page->private;
400 if (snapc == NULL) {
401 dout("writepage %p page %p not dirty?\n", inode, page);
402 goto out;
403 }
404 if (snapc != get_oldest_context(inode, &snap_size)) {
405 dout("writepage %p page %p snapc %p not writeable - noop\n",
406 inode, page, (void *)page->private);
407 /* we should only noop if called by kswapd */
408 WARN_ON((current->flags & PF_MEMALLOC) == 0);
409 goto out;
410 }
411
412 /* is this a partial page at end of file? */
413 if (snap_size)
414 i_size = snap_size;
415 else
416 i_size = i_size_read(inode);
417 if (i_size < page_off + len)
418 len = i_size - page_off;
419
420 dout("writepage %p page %p index %lu on %llu~%u\n",
421 inode, page, page->index, page_off, len);
422
423 set_page_writeback(page);
424 err = ceph_osdc_writepages(osdc, ceph_vino(inode),
425 &ci->i_layout, snapc,
426 page_off, len,
427 ci->i_truncate_seq, ci->i_truncate_size,
428 &inode->i_mtime,
429 &page, 1, 0, 0, true);
430 if (err < 0) {
431 dout("writepage setting page/mapping error %d %p\n", err, page);
432 SetPageError(page);
433 mapping_set_error(&inode->i_data, err);
434 if (wbc)
435 wbc->pages_skipped++;
436 } else {
437 dout("writepage cleaned page %p\n", page);
438 err = 0; /* vfs expects us to return 0 */
439 }
440 page->private = 0;
441 ClearPagePrivate(page);
442 end_page_writeback(page);
443 ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
444 ceph_put_snap_context(snapc);
445out:
446 return err;
447}
448
449static int ceph_writepage(struct page *page, struct writeback_control *wbc)
450{
451 int err = writepage_nounlock(page, wbc);
452 unlock_page(page);
453 return err;
454}
455
456
457/*
458 * lame release_pages helper. release_pages() isn't exported to
459 * modules.
460 */
461static void ceph_release_pages(struct page **pages, int num)
462{
463 struct pagevec pvec;
464 int i;
465
466 pagevec_init(&pvec, 0);
467 for (i = 0; i < num; i++) {
468 if (pagevec_add(&pvec, pages[i]) == 0)
469 pagevec_release(&pvec);
470 }
471 pagevec_release(&pvec);
472}
473
474
475/*
476 * async writeback completion handler.
477 *
478 * If we get an error, set the mapping error bit, but not the individual
479 * page error bits.
480 */
481static void writepages_finish(struct ceph_osd_request *req,
482 struct ceph_msg *msg)
483{
484 struct inode *inode = req->r_inode;
485 struct ceph_osd_reply_head *replyhead;
486 struct ceph_osd_op *op;
487 struct ceph_inode_info *ci = ceph_inode(inode);
488 unsigned wrote;
489 loff_t offset = req->r_pages[0]->index << PAGE_CACHE_SHIFT;
490 struct page *page;
491 int i;
492 struct ceph_snap_context *snapc = req->r_snapc;
493 struct address_space *mapping = inode->i_mapping;
494 struct writeback_control *wbc = req->r_wbc;
495 __s32 rc = -EIO;
496 u64 bytes = 0;
497
498 /* parse reply */
499 replyhead = msg->front.iov_base;
500 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
501 op = (void *)(replyhead + 1);
502 rc = le32_to_cpu(replyhead->result);
503 bytes = le64_to_cpu(op->extent.length);
504
505 if (rc >= 0) {
506 wrote = (bytes + (offset & ~PAGE_CACHE_MASK) + ~PAGE_CACHE_MASK)
507 >> PAGE_CACHE_SHIFT;
508 WARN_ON(wrote != req->r_num_pages);
509 } else {
510 wrote = 0;
511 mapping_set_error(mapping, rc);
512 }
513 dout("writepages_finish %p rc %d bytes %llu wrote %d (pages)\n",
514 inode, rc, bytes, wrote);
515
516 /* clean all pages */
517 for (i = 0; i < req->r_num_pages; i++) {
518 page = req->r_pages[i];
519 BUG_ON(!page);
520 WARN_ON(!PageUptodate(page));
521
522 if (i >= wrote) {
523 dout("inode %p skipping page %p\n", inode, page);
524 wbc->pages_skipped++;
525 }
526 page->private = 0;
527 ClearPagePrivate(page);
528 ceph_put_snap_context(snapc);
529 dout("unlocking %d %p\n", i, page);
530 end_page_writeback(page);
531 unlock_page(page);
532 }
533 dout("%p wrote+cleaned %d pages\n", inode, wrote);
534 ceph_put_wrbuffer_cap_refs(ci, req->r_num_pages, snapc);
535
536 ceph_release_pages(req->r_pages, req->r_num_pages);
537 if (req->r_pages_from_pool)
538 mempool_free(req->r_pages,
539 ceph_client(inode->i_sb)->wb_pagevec_pool);
540 else
541 kfree(req->r_pages);
542 ceph_osdc_put_request(req);
543}
544
545/*
546 * allocate a page vec, either directly, or if necessary, via a the
547 * mempool. we avoid the mempool if we can because req->r_num_pages
548 * may be less than the maximum write size.
549 */
550static void alloc_page_vec(struct ceph_client *client,
551 struct ceph_osd_request *req)
552{
553 req->r_pages = kmalloc(sizeof(struct page *) * req->r_num_pages,
554 GFP_NOFS);
555 if (!req->r_pages) {
556 req->r_pages = mempool_alloc(client->wb_pagevec_pool, GFP_NOFS);
557 req->r_pages_from_pool = 1;
558 WARN_ON(!req->r_pages);
559 }
560}
561
562/*
563 * initiate async writeback
564 */
565static int ceph_writepages_start(struct address_space *mapping,
566 struct writeback_control *wbc)
567{
568 struct inode *inode = mapping->host;
569 struct backing_dev_info *bdi = mapping->backing_dev_info;
570 struct ceph_inode_info *ci = ceph_inode(inode);
571 struct ceph_client *client = ceph_inode_to_client(inode);
572 pgoff_t index, start, end;
573 int range_whole = 0;
574 int should_loop = 1;
575 pgoff_t max_pages = 0, max_pages_ever = 0;
576 struct ceph_snap_context *snapc = NULL, *last_snapc = NULL;
577 struct pagevec pvec;
578 int done = 0;
579 int rc = 0;
580 unsigned wsize = 1 << inode->i_blkbits;
581 struct ceph_osd_request *req = NULL;
582 int do_sync;
583 u64 snap_size = 0;
584
585 /*
586 * Include a 'sync' in the OSD request if this is a data
587 * integrity write (e.g., O_SYNC write or fsync()), or if our
588 * cap is being revoked.
589 */
590 do_sync = wbc->sync_mode == WB_SYNC_ALL;
591 if (ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER))
592 do_sync = 1;
593 dout("writepages_start %p dosync=%d (mode=%s)\n",
594 inode, do_sync,
595 wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
596 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
597
598 client = ceph_inode_to_client(inode);
599 if (client->mount_state == CEPH_MOUNT_SHUTDOWN) {
600 pr_warning("writepage_start %p on forced umount\n", inode);
601 return -EIO; /* we're in a forced umount, don't write! */
602 }
603 if (client->mount_args.wsize && client->mount_args.wsize < wsize)
604 wsize = client->mount_args.wsize;
605 if (wsize < PAGE_CACHE_SIZE)
606 wsize = PAGE_CACHE_SIZE;
607 max_pages_ever = wsize >> PAGE_CACHE_SHIFT;
608
609 pagevec_init(&pvec, 0);
610
611 /* ?? */
612 if (wbc->nonblocking && bdi_write_congested(bdi)) {
613 dout(" writepages congested\n");
614 wbc->encountered_congestion = 1;
615 goto out_final;
616 }
617
618 /* where to start/end? */
619 if (wbc->range_cyclic) {
620 start = mapping->writeback_index; /* Start from prev offset */
621 end = -1;
622 dout(" cyclic, start at %lu\n", start);
623 } else {
624 start = wbc->range_start >> PAGE_CACHE_SHIFT;
625 end = wbc->range_end >> PAGE_CACHE_SHIFT;
626 if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
627 range_whole = 1;
628 should_loop = 0;
629 dout(" not cyclic, %lu to %lu\n", start, end);
630 }
631 index = start;
632
633retry:
634 /* find oldest snap context with dirty data */
635 ceph_put_snap_context(snapc);
636 snapc = get_oldest_context(inode, &snap_size);
637 if (!snapc) {
638 /* hmm, why does writepages get called when there
639 is no dirty data? */
640 dout(" no snap context with dirty data?\n");
641 goto out;
642 }
643 dout(" oldest snapc is %p seq %lld (%d snaps)\n",
644 snapc, snapc->seq, snapc->num_snaps);
645 if (last_snapc && snapc != last_snapc) {
646 /* if we switched to a newer snapc, restart our scan at the
647 * start of the original file range. */
648 dout(" snapc differs from last pass, restarting at %lu\n",
649 index);
650 index = start;
651 }
652 last_snapc = snapc;
653
654 while (!done && index <= end) {
655 unsigned i;
656 int first;
657 pgoff_t next;
658 int pvec_pages, locked_pages;
659 struct page *page;
660 int want;
661 u64 offset, len;
662 struct ceph_osd_request_head *reqhead;
663 struct ceph_osd_op *op;
664
665 next = 0;
666 locked_pages = 0;
667 max_pages = max_pages_ever;
668
669get_more_pages:
670 first = -1;
671 want = min(end - index,
672 min((pgoff_t)PAGEVEC_SIZE,
673 max_pages - (pgoff_t)locked_pages) - 1)
674 + 1;
675 pvec_pages = pagevec_lookup_tag(&pvec, mapping, &index,
676 PAGECACHE_TAG_DIRTY,
677 want);
678 dout("pagevec_lookup_tag got %d\n", pvec_pages);
679 if (!pvec_pages && !locked_pages)
680 break;
681 for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
682 page = pvec.pages[i];
683 dout("? %p idx %lu\n", page, page->index);
684 if (locked_pages == 0)
685 lock_page(page); /* first page */
686 else if (!trylock_page(page))
687 break;
688
689 /* only dirty pages, or our accounting breaks */
690 if (unlikely(!PageDirty(page)) ||
691 unlikely(page->mapping != mapping)) {
692 dout("!dirty or !mapping %p\n", page);
693 unlock_page(page);
694 break;
695 }
696 if (!wbc->range_cyclic && page->index > end) {
697 dout("end of range %p\n", page);
698 done = 1;
699 unlock_page(page);
700 break;
701 }
702 if (next && (page->index != next)) {
703 dout("not consecutive %p\n", page);
704 unlock_page(page);
705 break;
706 }
707 if (wbc->sync_mode != WB_SYNC_NONE) {
708 dout("waiting on writeback %p\n", page);
709 wait_on_page_writeback(page);
710 }
711 if ((snap_size && page_offset(page) > snap_size) ||
712 (!snap_size &&
713 page_offset(page) > i_size_read(inode))) {
714 dout("%p page eof %llu\n", page, snap_size ?
715 snap_size : i_size_read(inode));
716 done = 1;
717 unlock_page(page);
718 break;
719 }
720 if (PageWriteback(page)) {
721 dout("%p under writeback\n", page);
722 unlock_page(page);
723 break;
724 }
725
726 /* only if matching snap context */
727 if (snapc != (void *)page->private) {
728 dout("page snapc %p != oldest %p\n",
729 (void *)page->private, snapc);
730 unlock_page(page);
731 if (!locked_pages)
732 continue; /* keep looking for snap */
733 break;
734 }
735
736 if (!clear_page_dirty_for_io(page)) {
737 dout("%p !clear_page_dirty_for_io\n", page);
738 unlock_page(page);
739 break;
740 }
741
742 /* ok */
743 if (locked_pages == 0) {
744 /* prepare async write request */
745 offset = page->index << PAGE_CACHE_SHIFT;
746 len = wsize;
747 req = ceph_osdc_new_request(&client->osdc,
748 &ci->i_layout,
749 ceph_vino(inode),
750 offset, &len,
751 CEPH_OSD_OP_WRITE,
752 CEPH_OSD_FLAG_WRITE |
753 CEPH_OSD_FLAG_ONDISK,
754 snapc, do_sync,
755 ci->i_truncate_seq,
756 ci->i_truncate_size,
757 &inode->i_mtime, true, 1);
758 max_pages = req->r_num_pages;
759
760 alloc_page_vec(client, req);
761 req->r_callback = writepages_finish;
762 req->r_inode = inode;
763 req->r_wbc = wbc;
764 }
765
766 /* note position of first page in pvec */
767 if (first < 0)
768 first = i;
769 dout("%p will write page %p idx %lu\n",
770 inode, page, page->index);
771 set_page_writeback(page);
772 req->r_pages[locked_pages] = page;
773 locked_pages++;
774 next = page->index + 1;
775 }
776
777 /* did we get anything? */
778 if (!locked_pages)
779 goto release_pvec_pages;
780 if (i) {
781 int j;
782 BUG_ON(!locked_pages || first < 0);
783
784 if (pvec_pages && i == pvec_pages &&
785 locked_pages < max_pages) {
786 dout("reached end pvec, trying for more\n");
787 pagevec_reinit(&pvec);
788 goto get_more_pages;
789 }
790
791 /* shift unused pages over in the pvec... we
792 * will need to release them below. */
793 for (j = i; j < pvec_pages; j++) {
794 dout(" pvec leftover page %p\n",
795 pvec.pages[j]);
796 pvec.pages[j-i+first] = pvec.pages[j];
797 }
798 pvec.nr -= i-first;
799 }
800
801 /* submit the write */
802 offset = req->r_pages[0]->index << PAGE_CACHE_SHIFT;
803 len = min((snap_size ? snap_size : i_size_read(inode)) - offset,
804 (u64)locked_pages << PAGE_CACHE_SHIFT);
805 dout("writepages got %d pages at %llu~%llu\n",
806 locked_pages, offset, len);
807
808 /* revise final length, page count */
809 req->r_num_pages = locked_pages;
810 reqhead = req->r_request->front.iov_base;
811 op = (void *)(reqhead + 1);
812 op->extent.length = cpu_to_le64(len);
813 op->payload_len = cpu_to_le32(len);
814 req->r_request->hdr.data_len = cpu_to_le32(len);
815
816 ceph_osdc_start_request(&client->osdc, req, true);
817 req = NULL;
818
819 /* continue? */
820 index = next;
821 wbc->nr_to_write -= locked_pages;
822 if (wbc->nr_to_write <= 0)
823 done = 1;
824
825release_pvec_pages:
826 dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
827 pvec.nr ? pvec.pages[0] : NULL);
828 pagevec_release(&pvec);
829
830 if (locked_pages && !done)
831 goto retry;
832 }
833
834 if (should_loop && !done) {
835 /* more to do; loop back to beginning of file */
836 dout("writepages looping back to beginning of file\n");
837 should_loop = 0;
838 index = 0;
839 goto retry;
840 }
841
842 if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
843 mapping->writeback_index = index;
844
845out:
846 if (req)
847 ceph_osdc_put_request(req);
848 if (rc > 0)
849 rc = 0; /* vfs expects us to return 0 */
850 ceph_put_snap_context(snapc);
851 dout("writepages done, rc = %d\n", rc);
852out_final:
853 return rc;
854}
855
856
857
858/*
859 * See if a given @snapc is either writeable, or already written.
860 */
861static int context_is_writeable_or_written(struct inode *inode,
862 struct ceph_snap_context *snapc)
863{
864 struct ceph_snap_context *oldest = get_oldest_context(inode, NULL);
865 return !oldest || snapc->seq <= oldest->seq;
866}
867
868/*
869 * We are only allowed to write into/dirty the page if the page is
870 * clean, or already dirty within the same snap context.
871 */
872static int ceph_write_begin(struct file *file, struct address_space *mapping,
873 loff_t pos, unsigned len, unsigned flags,
874 struct page **pagep, void **fsdata)
875{
876 struct inode *inode = file->f_dentry->d_inode;
877 struct ceph_inode_info *ci = ceph_inode(inode);
878 struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
879 struct page *page;
880 pgoff_t index = pos >> PAGE_CACHE_SHIFT;
881 loff_t page_off = pos & PAGE_CACHE_MASK;
882 int pos_in_page = pos & ~PAGE_CACHE_MASK;
883 int end_in_page = pos_in_page + len;
884 loff_t i_size;
885 struct ceph_snap_context *snapc;
886 int r;
887
888 /* get a page*/
889retry:
890 page = grab_cache_page_write_begin(mapping, index, 0);
891 if (!page)
892 return -ENOMEM;
893 *pagep = page;
894
895 dout("write_begin file %p inode %p page %p %d~%d\n", file,
896 inode, page, (int)pos, (int)len);
897
898retry_locked:
899 /* writepages currently holds page lock, but if we change that later, */
900 wait_on_page_writeback(page);
901
902 /* check snap context */
903 BUG_ON(!ci->i_snap_realm);
904 down_read(&mdsc->snap_rwsem);
905 BUG_ON(!ci->i_snap_realm->cached_context);
906 if (page->private &&
907 (void *)page->private != ci->i_snap_realm->cached_context) {
908 /*
909 * this page is already dirty in another (older) snap
910 * context! is it writeable now?
911 */
912 snapc = get_oldest_context(inode, NULL);
913 up_read(&mdsc->snap_rwsem);
914
915 if (snapc != (void *)page->private) {
916 dout(" page %p snapc %p not current or oldest\n",
917 page, (void *)page->private);
918 /*
919 * queue for writeback, and wait for snapc to
920 * be writeable or written
921 */
922 snapc = ceph_get_snap_context((void *)page->private);
923 unlock_page(page);
924 if (ceph_queue_writeback(inode))
925 igrab(inode);
926 wait_event_interruptible(ci->i_cap_wq,
927 context_is_writeable_or_written(inode, snapc));
928 ceph_put_snap_context(snapc);
929 goto retry;
930 }
931
932 /* yay, writeable, do it now (without dropping page lock) */
933 dout(" page %p snapc %p not current, but oldest\n",
934 page, snapc);
935 if (!clear_page_dirty_for_io(page))
936 goto retry_locked;
937 r = writepage_nounlock(page, NULL);
938 if (r < 0)
939 goto fail_nosnap;
940 goto retry_locked;
941 }
942
943 if (PageUptodate(page)) {
944 dout(" page %p already uptodate\n", page);
945 return 0;
946 }
947
948 /* full page? */
949 if (pos_in_page == 0 && len == PAGE_CACHE_SIZE)
950 return 0;
951
952 /* past end of file? */
953 i_size = inode->i_size; /* caller holds i_mutex */
954
955 if (i_size + len > inode->i_sb->s_maxbytes) {
956 /* file is too big */
957 r = -EINVAL;
958 goto fail;
959 }
960
961 if (page_off >= i_size ||
962 (pos_in_page == 0 && (pos+len) >= i_size &&
963 end_in_page - pos_in_page != PAGE_CACHE_SIZE)) {
964 dout(" zeroing %p 0 - %d and %d - %d\n",
965 page, pos_in_page, end_in_page, (int)PAGE_CACHE_SIZE);
966 zero_user_segments(page,
967 0, pos_in_page,
968 end_in_page, PAGE_CACHE_SIZE);
969 return 0;
970 }
971
972 /* we need to read it. */
973 up_read(&mdsc->snap_rwsem);
974 r = readpage_nounlock(file, page);
975 if (r < 0)
976 goto fail_nosnap;
977 goto retry_locked;
978
979fail:
980 up_read(&mdsc->snap_rwsem);
981fail_nosnap:
982 unlock_page(page);
983 return r;
984}
985
986/*
987 * we don't do anything in here that simple_write_end doesn't do
988 * except adjust dirty page accounting and drop read lock on
989 * mdsc->snap_rwsem.
990 */
991static int ceph_write_end(struct file *file, struct address_space *mapping,
992 loff_t pos, unsigned len, unsigned copied,
993 struct page *page, void *fsdata)
994{
995 struct inode *inode = file->f_dentry->d_inode;
996 struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
997 unsigned from = pos & (PAGE_CACHE_SIZE - 1);
998 int check_cap = 0;
999
1000 dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
1001 inode, page, (int)pos, (int)copied, (int)len);
1002
1003 /* zero the stale part of the page if we did a short copy */
1004 if (copied < len)
1005 zero_user_segment(page, from+copied, len);
1006
1007 /* did file size increase? */
1008 /* (no need for i_size_read(); we caller holds i_mutex */
1009 if (pos+copied > inode->i_size)
1010 check_cap = ceph_inode_set_size(inode, pos+copied);
1011
1012 if (!PageUptodate(page))
1013 SetPageUptodate(page);
1014
1015 set_page_dirty(page);
1016
1017 unlock_page(page);
1018 up_read(&mdsc->snap_rwsem);
1019 page_cache_release(page);
1020
1021 if (check_cap)
1022 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
1023
1024 return copied;
1025}
1026
1027/*
1028 * we set .direct_IO to indicate direct io is supported, but since we
1029 * intercept O_DIRECT reads and writes early, this function should
1030 * never get called.
1031 */
1032static ssize_t ceph_direct_io(int rw, struct kiocb *iocb,
1033 const struct iovec *iov,
1034 loff_t pos, unsigned long nr_segs)
1035{
1036 WARN_ON(1);
1037 return -EINVAL;
1038}
1039
1040const struct address_space_operations ceph_aops = {
1041 .readpage = ceph_readpage,
1042 .readpages = ceph_readpages,
1043 .writepage = ceph_writepage,
1044 .writepages = ceph_writepages_start,
1045 .write_begin = ceph_write_begin,
1046 .write_end = ceph_write_end,
1047 .set_page_dirty = ceph_set_page_dirty,
1048 .invalidatepage = ceph_invalidatepage,
1049 .releasepage = ceph_releasepage,
1050 .direct_IO = ceph_direct_io,
1051};
1052
1053
1054/*
1055 * vm ops
1056 */
1057
1058/*
1059 * Reuse write_begin here for simplicity.
1060 */
1061static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1062{
1063 struct inode *inode = vma->vm_file->f_dentry->d_inode;
1064 struct page *page = vmf->page;
1065 struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
1066 loff_t off = page->index << PAGE_CACHE_SHIFT;
1067 loff_t size, len;
1068 struct page *locked_page = NULL;
1069 void *fsdata = NULL;
1070 int ret;
1071
1072 size = i_size_read(inode);
1073 if (off + PAGE_CACHE_SIZE <= size)
1074 len = PAGE_CACHE_SIZE;
1075 else
1076 len = size & ~PAGE_CACHE_MASK;
1077
1078 dout("page_mkwrite %p %llu~%llu page %p idx %lu\n", inode,
1079 off, len, page, page->index);
1080 ret = ceph_write_begin(vma->vm_file, inode->i_mapping, off, len, 0,
1081 &locked_page, &fsdata);
1082 WARN_ON(page != locked_page);
1083 if (!ret) {
1084 /*
1085 * doing the following, instead of calling
1086 * ceph_write_end. Note that we keep the
1087 * page locked
1088 */
1089 set_page_dirty(page);
1090 up_read(&mdsc->snap_rwsem);
1091 page_cache_release(page);
1092 ret = VM_FAULT_LOCKED;
1093 } else {
1094 ret = VM_FAULT_SIGBUS;
1095 }
1096 dout("page_mkwrite %p %llu~%llu = %d\n", inode, off, len, ret);
1097 return ret;
1098}
1099
1100static struct vm_operations_struct ceph_vmops = {
1101 .fault = filemap_fault,
1102 .page_mkwrite = ceph_page_mkwrite,
1103};
1104
1105int ceph_mmap(struct file *file, struct vm_area_struct *vma)
1106{
1107 struct address_space *mapping = file->f_mapping;
1108
1109 if (!mapping->a_ops->readpage)
1110 return -ENOEXEC;
1111 file_accessed(file);
1112 vma->vm_ops = &ceph_vmops;
1113 vma->vm_flags |= VM_CAN_NONLINEAR;
1114 return 0;
1115}