aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2013-09-09 12:13:22 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2013-09-09 12:13:22 -0400
commit6cccc7d3012344371a897ecdd1a1398286a6ee8a (patch)
tree64d7c301739abb303e15f108df4c00e8da227caf
parent255ae3fbd298f312ce47ff0c7ee9bb6ad002e0f0 (diff)
parenta8d436f015b627a55ec3b1d15f13d6ab92dd892b (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull ceph updates from Sage Weil: "This includes both the first pile of Ceph patches (which I sent to torvalds@vger, sigh) and a few new patches that add support for fscache for Ceph. That includes a few fscache core fixes that David Howells asked go through the Ceph tree. (Thanks go to Milosz Tanski for putting this feature together) This first batch of patches (included here) had (has) several important RBD bug fixes, hole punch support, several different cleanups in the page cache interactions, improvements in the truncate code (new truncate mutex to avoid shenanigans with i_mutex), and a series of fixes in the synchronous striping read/write code. On top of that is a random collection of small fixes all across the tree (error code checks and error path cleanup, obsolete wq flags, etc)" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (43 commits) ceph: use d_invalidate() to invalidate aliases ceph: remove ceph_lookup_inode() ceph: trivial buildbot warnings fix ceph: Do not do invalidate if the filesystem is mounted nofsc ceph: page still marked private_2 ceph: ceph_readpage_to_fscache didn't check if marked ceph: clean PgPrivate2 on returning from readpages ceph: use fscache as a local presisent cache fscache: Netfs function for cleanup post readpages FS-Cache: Fix heading in documentation CacheFiles: Implement interface to check cache consistency FS-Cache: Add interface to check consistency of a cached object rbd: fix null dereference in dout rbd: fix buffer size for writes to images with snapshots libceph: use pg_num_mask instead of pgp_num_mask for pg.seed calc rbd: fix I/O error propagation for reads ceph: use vfs __set_page_dirty_nobuffers interface instead of doing it inside filesystem ceph: allow sync_read/write return partial successed size of read/write. ceph: fix bugs about handling short-read for sync read mode. ceph: remove useless variable revoked_rdcache ...
-rw-r--r--Documentation/filesystems/caching/backend-api.txt9
-rw-r--r--Documentation/filesystems/caching/netfs-api.txt37
-rw-r--r--drivers/block/rbd.c36
-rw-r--r--fs/cachefiles/interface.c26
-rw-r--r--fs/cachefiles/internal.h1
-rw-r--r--fs/cachefiles/xattr.c36
-rw-r--r--fs/ceph/Kconfig9
-rw-r--r--fs/ceph/Makefile1
-rw-r--r--fs/ceph/addr.c116
-rw-r--r--fs/ceph/cache.c398
-rw-r--r--fs/ceph/cache.h159
-rw-r--r--fs/ceph/caps.c87
-rw-r--r--fs/ceph/dir.c2
-rw-r--r--fs/ceph/file.c299
-rw-r--r--fs/ceph/inode.c46
-rw-r--r--fs/ceph/ioctl.c12
-rw-r--r--fs/ceph/mds_client.c34
-rw-r--r--fs/ceph/super.c35
-rw-r--r--fs/ceph/super.h17
-rw-r--r--fs/fscache/cookie.c71
-rw-r--r--fs/fscache/internal.h6
-rw-r--r--fs/fscache/page.c71
-rw-r--r--include/linux/fscache-cache.h4
-rw-r--r--include/linux/fscache.h42
-rw-r--r--net/ceph/messenger.c2
-rw-r--r--net/ceph/osd_client.c27
-rw-r--r--net/ceph/osdmap.c2
27 files changed, 1396 insertions, 189 deletions
diff --git a/Documentation/filesystems/caching/backend-api.txt b/Documentation/filesystems/caching/backend-api.txt
index d78bab9622c6..277d1e810670 100644
--- a/Documentation/filesystems/caching/backend-api.txt
+++ b/Documentation/filesystems/caching/backend-api.txt
@@ -299,6 +299,15 @@ performed on the denizens of the cache. These are held in a structure of type:
299 enough space in the cache to permit this. 299 enough space in the cache to permit this.
300 300
301 301
302 (*) Check coherency state of an object [mandatory]:
303
304 int (*check_consistency)(struct fscache_object *object)
305
306 This method is called to have the cache check the saved auxiliary data of
307 the object against the netfs's idea of the state. 0 should be returned
308 if they're consistent and -ESTALE otherwise. -ENOMEM and -ERESTARTSYS
309 may also be returned.
310
302 (*) Update object [mandatory]: 311 (*) Update object [mandatory]:
303 312
304 int (*update_object)(struct fscache_object *object) 313 int (*update_object)(struct fscache_object *object)
diff --git a/Documentation/filesystems/caching/netfs-api.txt b/Documentation/filesystems/caching/netfs-api.txt
index 97e6c0ecc5ef..11a0a40ce445 100644
--- a/Documentation/filesystems/caching/netfs-api.txt
+++ b/Documentation/filesystems/caching/netfs-api.txt
@@ -32,7 +32,7 @@ This document contains the following sections:
32 (9) Setting the data file size 32 (9) Setting the data file size
33 (10) Page alloc/read/write 33 (10) Page alloc/read/write
34 (11) Page uncaching 34 (11) Page uncaching
35 (12) Index and data file update 35 (12) Index and data file consistency
36 (13) Miscellaneous cookie operations 36 (13) Miscellaneous cookie operations
37 (14) Cookie unregistration 37 (14) Cookie unregistration
38 (15) Index invalidation 38 (15) Index invalidation
@@ -433,7 +433,7 @@ to the caller. The attribute adjustment excludes read and write operations.
433 433
434 434
435===================== 435=====================
436PAGE READ/ALLOC/WRITE 436PAGE ALLOC/READ/WRITE
437===================== 437=====================
438 438
439And the sixth step is to store and retrieve pages in the cache. There are 439And the sixth step is to store and retrieve pages in the cache. There are
@@ -499,7 +499,7 @@ Else if there's a copy of the page resident in the cache:
499 (*) An argument that's 0 on success or negative for an error code. 499 (*) An argument that's 0 on success or negative for an error code.
500 500
501 If an error occurs, it should be assumed that the page contains no usable 501 If an error occurs, it should be assumed that the page contains no usable
502 data. 502 data. fscache_readpages_cancel() may need to be called.
503 503
504 end_io_func() will be called in process context if the read is results in 504 end_io_func() will be called in process context if the read is results in
505 an error, but it might be called in interrupt context if the read is 505 an error, but it might be called in interrupt context if the read is
@@ -623,6 +623,22 @@ some of the pages being read and some being allocated. Those pages will have
623been marked appropriately and will need uncaching. 623been marked appropriately and will need uncaching.
624 624
625 625
626CANCELLATION OF UNREAD PAGES
627----------------------------
628
629If one or more pages are passed to fscache_read_or_alloc_pages() but not then
630read from the cache and also not read from the underlying filesystem then
631those pages will need to have any marks and reservations removed. This can be
632done by calling:
633
634 void fscache_readpages_cancel(struct fscache_cookie *cookie,
635 struct list_head *pages);
636
637prior to returning to the caller. The cookie argument should be as passed to
638fscache_read_or_alloc_pages(). Every page in the pages list will be examined
639and any that have PG_fscache set will be uncached.
640
641
626============== 642==============
627PAGE UNCACHING 643PAGE UNCACHING
628============== 644==============
@@ -690,9 +706,18 @@ written to the cache and for the cache to finish with the page generally. No
690error is returned. 706error is returned.
691 707
692 708
693========================== 709===============================
694INDEX AND DATA FILE UPDATE 710INDEX AND DATA FILE CONSISTENCY
695========================== 711===============================
712
713To find out whether auxiliary data for an object is up to data within the
714cache, the following function can be called:
715
716 int fscache_check_consistency(struct fscache_cookie *cookie)
717
718This will call back to the netfs to check whether the auxiliary data associated
719with a cookie is correct. It returns 0 if it is and -ESTALE if it isn't; it
720may also return -ENOMEM and -ERESTARTSYS.
696 721
697To request an update of the index data for an index or other object, the 722To request an update of the index data for an index or other object, the
698following function should be called: 723following function should be called:
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 191cd177fef2..39c51cc7fabc 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1561,11 +1561,12 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1561 obj_request, obj_request->img_request, obj_request->result, 1561 obj_request, obj_request->img_request, obj_request->result,
1562 xferred, length); 1562 xferred, length);
1563 /* 1563 /*
1564 * ENOENT means a hole in the image. We zero-fill the 1564 * ENOENT means a hole in the image. We zero-fill the entire
1565 * entire length of the request. A short read also implies 1565 * length of the request. A short read also implies zero-fill
1566 * zero-fill to the end of the request. Either way we 1566 * to the end of the request. An error requires the whole
1567 * update the xferred count to indicate the whole request 1567 * length of the request to be reported finished with an error
1568 * was satisfied. 1568 * to the block layer. In each case we update the xferred
1569 * count to indicate the whole request was satisfied.
1569 */ 1570 */
1570 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA); 1571 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1571 if (obj_request->result == -ENOENT) { 1572 if (obj_request->result == -ENOENT) {
@@ -1574,14 +1575,13 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1574 else 1575 else
1575 zero_pages(obj_request->pages, 0, length); 1576 zero_pages(obj_request->pages, 0, length);
1576 obj_request->result = 0; 1577 obj_request->result = 0;
1577 obj_request->xferred = length;
1578 } else if (xferred < length && !obj_request->result) { 1578 } else if (xferred < length && !obj_request->result) {
1579 if (obj_request->type == OBJ_REQUEST_BIO) 1579 if (obj_request->type == OBJ_REQUEST_BIO)
1580 zero_bio_chain(obj_request->bio_list, xferred); 1580 zero_bio_chain(obj_request->bio_list, xferred);
1581 else 1581 else
1582 zero_pages(obj_request->pages, xferred, length); 1582 zero_pages(obj_request->pages, xferred, length);
1583 obj_request->xferred = length;
1584 } 1583 }
1584 obj_request->xferred = length;
1585 obj_request_done_set(obj_request); 1585 obj_request_done_set(obj_request);
1586} 1586}
1587 1587
@@ -2167,9 +2167,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
2167 struct rbd_obj_request *obj_request = NULL; 2167 struct rbd_obj_request *obj_request = NULL;
2168 struct rbd_obj_request *next_obj_request; 2168 struct rbd_obj_request *next_obj_request;
2169 bool write_request = img_request_write_test(img_request); 2169 bool write_request = img_request_write_test(img_request);
2170 struct bio *bio_list = 0; 2170 struct bio *bio_list = NULL;
2171 unsigned int bio_offset = 0; 2171 unsigned int bio_offset = 0;
2172 struct page **pages = 0; 2172 struct page **pages = NULL;
2173 u64 img_offset; 2173 u64 img_offset;
2174 u64 resid; 2174 u64 resid;
2175 u16 opcode; 2175 u16 opcode;
@@ -2207,6 +2207,11 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
2207 rbd_segment_name_free(object_name); 2207 rbd_segment_name_free(object_name);
2208 if (!obj_request) 2208 if (!obj_request)
2209 goto out_unwind; 2209 goto out_unwind;
2210 /*
2211 * set obj_request->img_request before creating the
2212 * osd_request so that it gets the right snapc
2213 */
2214 rbd_img_obj_request_add(img_request, obj_request);
2210 2215
2211 if (type == OBJ_REQUEST_BIO) { 2216 if (type == OBJ_REQUEST_BIO) {
2212 unsigned int clone_size; 2217 unsigned int clone_size;
@@ -2248,11 +2253,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
2248 obj_request->pages, length, 2253 obj_request->pages, length,
2249 offset & ~PAGE_MASK, false, false); 2254 offset & ~PAGE_MASK, false, false);
2250 2255
2251 /*
2252 * set obj_request->img_request before formatting
2253 * the osd_request so that it gets the right snapc
2254 */
2255 rbd_img_obj_request_add(img_request, obj_request);
2256 if (write_request) 2256 if (write_request)
2257 rbd_osd_req_format_write(obj_request); 2257 rbd_osd_req_format_write(obj_request);
2258 else 2258 else
@@ -3706,12 +3706,14 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3706 if (ret < sizeof (size_buf)) 3706 if (ret < sizeof (size_buf))
3707 return -ERANGE; 3707 return -ERANGE;
3708 3708
3709 if (order) 3709 if (order) {
3710 *order = size_buf.order; 3710 *order = size_buf.order;
3711 dout(" order %u", (unsigned int)*order);
3712 }
3711 *snap_size = le64_to_cpu(size_buf.size); 3713 *snap_size = le64_to_cpu(size_buf.size);
3712 3714
3713 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n", 3715 dout(" snap_id 0x%016llx snap_size = %llu\n",
3714 (unsigned long long)snap_id, (unsigned int)*order, 3716 (unsigned long long)snap_id,
3715 (unsigned long long)*snap_size); 3717 (unsigned long long)*snap_size);
3716 3718
3717 return 0; 3719 return 0;
diff --git a/fs/cachefiles/interface.c b/fs/cachefiles/interface.c
index d4c1206af9fc..43eb5592cdea 100644
--- a/fs/cachefiles/interface.c
+++ b/fs/cachefiles/interface.c
@@ -378,6 +378,31 @@ static void cachefiles_sync_cache(struct fscache_cache *_cache)
378} 378}
379 379
380/* 380/*
381 * check if the backing cache is updated to FS-Cache
382 * - called by FS-Cache when evaluates if need to invalidate the cache
383 */
384static bool cachefiles_check_consistency(struct fscache_operation *op)
385{
386 struct cachefiles_object *object;
387 struct cachefiles_cache *cache;
388 const struct cred *saved_cred;
389 int ret;
390
391 _enter("{OBJ%x}", op->object->debug_id);
392
393 object = container_of(op->object, struct cachefiles_object, fscache);
394 cache = container_of(object->fscache.cache,
395 struct cachefiles_cache, cache);
396
397 cachefiles_begin_secure(cache, &saved_cred);
398 ret = cachefiles_check_auxdata(object);
399 cachefiles_end_secure(cache, saved_cred);
400
401 _leave(" = %d", ret);
402 return ret;
403}
404
405/*
381 * notification the attributes on an object have changed 406 * notification the attributes on an object have changed
382 * - called with reads/writes excluded by FS-Cache 407 * - called with reads/writes excluded by FS-Cache
383 */ 408 */
@@ -522,4 +547,5 @@ const struct fscache_cache_ops cachefiles_cache_ops = {
522 .write_page = cachefiles_write_page, 547 .write_page = cachefiles_write_page,
523 .uncache_page = cachefiles_uncache_page, 548 .uncache_page = cachefiles_uncache_page,
524 .dissociate_pages = cachefiles_dissociate_pages, 549 .dissociate_pages = cachefiles_dissociate_pages,
550 .check_consistency = cachefiles_check_consistency,
525}; 551};
diff --git a/fs/cachefiles/internal.h b/fs/cachefiles/internal.h
index 49382519907a..5349473df1b1 100644
--- a/fs/cachefiles/internal.h
+++ b/fs/cachefiles/internal.h
@@ -235,6 +235,7 @@ extern int cachefiles_set_object_xattr(struct cachefiles_object *object,
235 struct cachefiles_xattr *auxdata); 235 struct cachefiles_xattr *auxdata);
236extern int cachefiles_update_object_xattr(struct cachefiles_object *object, 236extern int cachefiles_update_object_xattr(struct cachefiles_object *object,
237 struct cachefiles_xattr *auxdata); 237 struct cachefiles_xattr *auxdata);
238extern int cachefiles_check_auxdata(struct cachefiles_object *object);
238extern int cachefiles_check_object_xattr(struct cachefiles_object *object, 239extern int cachefiles_check_object_xattr(struct cachefiles_object *object,
239 struct cachefiles_xattr *auxdata); 240 struct cachefiles_xattr *auxdata);
240extern int cachefiles_remove_object_xattr(struct cachefiles_cache *cache, 241extern int cachefiles_remove_object_xattr(struct cachefiles_cache *cache,
diff --git a/fs/cachefiles/xattr.c b/fs/cachefiles/xattr.c
index 2476e5162609..34c88b83e39f 100644
--- a/fs/cachefiles/xattr.c
+++ b/fs/cachefiles/xattr.c
@@ -157,6 +157,42 @@ int cachefiles_update_object_xattr(struct cachefiles_object *object,
157} 157}
158 158
159/* 159/*
160 * check the consistency between the backing cache and the FS-Cache cookie
161 */
162int cachefiles_check_auxdata(struct cachefiles_object *object)
163{
164 struct cachefiles_xattr *auxbuf;
165 struct dentry *dentry = object->dentry;
166 unsigned int dlen;
167 int ret;
168
169 ASSERT(dentry);
170 ASSERT(dentry->d_inode);
171 ASSERT(object->fscache.cookie->def->check_aux);
172
173 auxbuf = kmalloc(sizeof(struct cachefiles_xattr) + 512, GFP_KERNEL);
174 if (!auxbuf)
175 return -ENOMEM;
176
177 auxbuf->len = vfs_getxattr(dentry, cachefiles_xattr_cache,
178 &auxbuf->type, 512 + 1);
179 if (auxbuf->len < 1)
180 return -ESTALE;
181
182 if (auxbuf->type != object->fscache.cookie->def->type)
183 return -ESTALE;
184
185 dlen = auxbuf->len - 1;
186 ret = fscache_check_aux(&object->fscache, &auxbuf->data, dlen);
187
188 kfree(auxbuf);
189 if (ret != FSCACHE_CHECKAUX_OKAY)
190 return -ESTALE;
191
192 return 0;
193}
194
195/*
160 * check the state xattr on a cache file 196 * check the state xattr on a cache file
161 * - return -ESTALE if the object should be deleted 197 * - return -ESTALE if the object should be deleted
162 */ 198 */
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
index 49bc78243db9..ac9a2ef5bb9b 100644
--- a/fs/ceph/Kconfig
+++ b/fs/ceph/Kconfig
@@ -16,3 +16,12 @@ config CEPH_FS
16 16
17 If unsure, say N. 17 If unsure, say N.
18 18
19if CEPH_FS
20config CEPH_FSCACHE
21 bool "Enable Ceph client caching support"
22 depends on CEPH_FS=m && FSCACHE || CEPH_FS=y && FSCACHE=y
23 help
24 Choose Y here to enable persistent, read-only local
25 caching support for Ceph clients using FS-Cache
26
27endif
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index bd352125e829..32e30106a2f0 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -9,3 +9,4 @@ ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
9 mds_client.o mdsmap.o strings.o ceph_frag.o \ 9 mds_client.o mdsmap.o strings.o ceph_frag.o \
10 debugfs.o 10 debugfs.o
11 11
12ceph-$(CONFIG_CEPH_FSCACHE) += cache.o
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 5318a3b704f6..6df8bd481425 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -11,6 +11,7 @@
11 11
12#include "super.h" 12#include "super.h"
13#include "mds_client.h" 13#include "mds_client.h"
14#include "cache.h"
14#include <linux/ceph/osd_client.h> 15#include <linux/ceph/osd_client.h>
15 16
16/* 17/*
@@ -70,15 +71,16 @@ static int ceph_set_page_dirty(struct page *page)
70 struct address_space *mapping = page->mapping; 71 struct address_space *mapping = page->mapping;
71 struct inode *inode; 72 struct inode *inode;
72 struct ceph_inode_info *ci; 73 struct ceph_inode_info *ci;
73 int undo = 0;
74 struct ceph_snap_context *snapc; 74 struct ceph_snap_context *snapc;
75 int ret;
75 76
76 if (unlikely(!mapping)) 77 if (unlikely(!mapping))
77 return !TestSetPageDirty(page); 78 return !TestSetPageDirty(page);
78 79
79 if (TestSetPageDirty(page)) { 80 if (PageDirty(page)) {
80 dout("%p set_page_dirty %p idx %lu -- already dirty\n", 81 dout("%p set_page_dirty %p idx %lu -- already dirty\n",
81 mapping->host, page, page->index); 82 mapping->host, page, page->index);
83 BUG_ON(!PagePrivate(page));
82 return 0; 84 return 0;
83 } 85 }
84 86
@@ -107,35 +109,19 @@ static int ceph_set_page_dirty(struct page *page)
107 snapc, snapc->seq, snapc->num_snaps); 109 snapc, snapc->seq, snapc->num_snaps);
108 spin_unlock(&ci->i_ceph_lock); 110 spin_unlock(&ci->i_ceph_lock);
109 111
110 /* now adjust page */ 112 /*
111 spin_lock_irq(&mapping->tree_lock); 113 * Reference snap context in page->private. Also set
112 if (page->mapping) { /* Race with truncate? */ 114 * PagePrivate so that we get invalidatepage callback.
113 WARN_ON_ONCE(!PageUptodate(page)); 115 */
114 account_page_dirtied(page, page->mapping); 116 BUG_ON(PagePrivate(page));
115 radix_tree_tag_set(&mapping->page_tree, 117 page->private = (unsigned long)snapc;
116 page_index(page), PAGECACHE_TAG_DIRTY); 118 SetPagePrivate(page);
117
118 /*
119 * Reference snap context in page->private. Also set
120 * PagePrivate so that we get invalidatepage callback.
121 */
122 page->private = (unsigned long)snapc;
123 SetPagePrivate(page);
124 } else {
125 dout("ANON set_page_dirty %p (raced truncate?)\n", page);
126 undo = 1;
127 }
128
129 spin_unlock_irq(&mapping->tree_lock);
130
131 if (undo)
132 /* whoops, we failed to dirty the page */
133 ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
134 119
135 __mark_inode_dirty(mapping->host, I_DIRTY_PAGES); 120 ret = __set_page_dirty_nobuffers(page);
121 WARN_ON(!PageLocked(page));
122 WARN_ON(!page->mapping);
136 123
137 BUG_ON(!PageDirty(page)); 124 return ret;
138 return 1;
139} 125}
140 126
141/* 127/*
@@ -150,11 +136,19 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
150 struct ceph_inode_info *ci; 136 struct ceph_inode_info *ci;
151 struct ceph_snap_context *snapc = page_snap_context(page); 137 struct ceph_snap_context *snapc = page_snap_context(page);
152 138
153 BUG_ON(!PageLocked(page));
154 BUG_ON(!PagePrivate(page));
155 BUG_ON(!page->mapping);
156
157 inode = page->mapping->host; 139 inode = page->mapping->host;
140 ci = ceph_inode(inode);
141
142 if (offset != 0 || length != PAGE_CACHE_SIZE) {
143 dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n",
144 inode, page, page->index, offset, length);
145 return;
146 }
147
148 ceph_invalidate_fscache_page(inode, page);
149
150 if (!PagePrivate(page))
151 return;
158 152
159 /* 153 /*
160 * We can get non-dirty pages here due to races between 154 * We can get non-dirty pages here due to races between
@@ -164,31 +158,28 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
164 if (!PageDirty(page)) 158 if (!PageDirty(page))
165 pr_err("%p invalidatepage %p page not dirty\n", inode, page); 159 pr_err("%p invalidatepage %p page not dirty\n", inode, page);
166 160
167 if (offset == 0 && length == PAGE_CACHE_SIZE) 161 ClearPageChecked(page);
168 ClearPageChecked(page);
169 162
170 ci = ceph_inode(inode); 163 dout("%p invalidatepage %p idx %lu full dirty page\n",
171 if (offset == 0 && length == PAGE_CACHE_SIZE) { 164 inode, page, page->index);
172 dout("%p invalidatepage %p idx %lu full dirty page\n", 165
173 inode, page, page->index); 166 ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
174 ceph_put_wrbuffer_cap_refs(ci, 1, snapc); 167 ceph_put_snap_context(snapc);
175 ceph_put_snap_context(snapc); 168 page->private = 0;
176 page->private = 0; 169 ClearPagePrivate(page);
177 ClearPagePrivate(page);
178 } else {
179 dout("%p invalidatepage %p idx %lu partial dirty page %u(%u)\n",
180 inode, page, page->index, offset, length);
181 }
182} 170}
183 171
184/* just a sanity check */
185static int ceph_releasepage(struct page *page, gfp_t g) 172static int ceph_releasepage(struct page *page, gfp_t g)
186{ 173{
187 struct inode *inode = page->mapping ? page->mapping->host : NULL; 174 struct inode *inode = page->mapping ? page->mapping->host : NULL;
188 dout("%p releasepage %p idx %lu\n", inode, page, page->index); 175 dout("%p releasepage %p idx %lu\n", inode, page, page->index);
189 WARN_ON(PageDirty(page)); 176 WARN_ON(PageDirty(page));
190 WARN_ON(PagePrivate(page)); 177
191 return 0; 178 /* Can we release the page from the cache? */
179 if (!ceph_release_fscache_page(page, g))
180 return 0;
181
182 return !PagePrivate(page);
192} 183}
193 184
194/* 185/*
@@ -198,11 +189,16 @@ static int readpage_nounlock(struct file *filp, struct page *page)
198{ 189{
199 struct inode *inode = file_inode(filp); 190 struct inode *inode = file_inode(filp);
200 struct ceph_inode_info *ci = ceph_inode(inode); 191 struct ceph_inode_info *ci = ceph_inode(inode);
201 struct ceph_osd_client *osdc = 192 struct ceph_osd_client *osdc =
202 &ceph_inode_to_client(inode)->client->osdc; 193 &ceph_inode_to_client(inode)->client->osdc;
203 int err = 0; 194 int err = 0;
204 u64 len = PAGE_CACHE_SIZE; 195 u64 len = PAGE_CACHE_SIZE;
205 196
197 err = ceph_readpage_from_fscache(inode, page);
198
199 if (err == 0)
200 goto out;
201
206 dout("readpage inode %p file %p page %p index %lu\n", 202 dout("readpage inode %p file %p page %p index %lu\n",
207 inode, filp, page, page->index); 203 inode, filp, page, page->index);
208 err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, 204 err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
@@ -220,6 +216,9 @@ static int readpage_nounlock(struct file *filp, struct page *page)
220 } 216 }
221 SetPageUptodate(page); 217 SetPageUptodate(page);
222 218
219 if (err == 0)
220 ceph_readpage_to_fscache(inode, page);
221
223out: 222out:
224 return err < 0 ? err : 0; 223 return err < 0 ? err : 0;
225} 224}
@@ -262,6 +261,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
262 page->index); 261 page->index);
263 flush_dcache_page(page); 262 flush_dcache_page(page);
264 SetPageUptodate(page); 263 SetPageUptodate(page);
264 ceph_readpage_to_fscache(inode, page);
265 unlock_page(page); 265 unlock_page(page);
266 page_cache_release(page); 266 page_cache_release(page);
267 bytes -= PAGE_CACHE_SIZE; 267 bytes -= PAGE_CACHE_SIZE;
@@ -331,11 +331,12 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
331 page = list_entry(page_list->prev, struct page, lru); 331 page = list_entry(page_list->prev, struct page, lru);
332 BUG_ON(PageLocked(page)); 332 BUG_ON(PageLocked(page));
333 list_del(&page->lru); 333 list_del(&page->lru);
334 334
335 dout("start_read %p adding %p idx %lu\n", inode, page, 335 dout("start_read %p adding %p idx %lu\n", inode, page,
336 page->index); 336 page->index);
337 if (add_to_page_cache_lru(page, &inode->i_data, page->index, 337 if (add_to_page_cache_lru(page, &inode->i_data, page->index,
338 GFP_NOFS)) { 338 GFP_NOFS)) {
339 ceph_fscache_uncache_page(inode, page);
339 page_cache_release(page); 340 page_cache_release(page);
340 dout("start_read %p add_to_page_cache failed %p\n", 341 dout("start_read %p add_to_page_cache failed %p\n",
341 inode, page); 342 inode, page);
@@ -378,6 +379,12 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
378 int rc = 0; 379 int rc = 0;
379 int max = 0; 380 int max = 0;
380 381
382 rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
383 &nr_pages);
384
385 if (rc == 0)
386 goto out;
387
381 if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE) 388 if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE)
382 max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1) 389 max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
383 >> PAGE_SHIFT; 390 >> PAGE_SHIFT;
@@ -392,6 +399,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
392 BUG_ON(rc == 0); 399 BUG_ON(rc == 0);
393 } 400 }
394out: 401out:
402 ceph_fscache_readpages_cancel(inode, page_list);
403
395 dout("readpages %p file %p ret %d\n", inode, file, rc); 404 dout("readpages %p file %p ret %d\n", inode, file, rc);
396 return rc; 405 return rc;
397} 406}
@@ -497,6 +506,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
497 CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb)) 506 CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
498 set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC); 507 set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
499 508
509 ceph_readpage_to_fscache(inode, page);
510
500 set_page_writeback(page); 511 set_page_writeback(page);
501 err = ceph_osdc_writepages(osdc, ceph_vino(inode), 512 err = ceph_osdc_writepages(osdc, ceph_vino(inode),
502 &ci->i_layout, snapc, 513 &ci->i_layout, snapc,
@@ -552,7 +563,6 @@ static void ceph_release_pages(struct page **pages, int num)
552 pagevec_release(&pvec); 563 pagevec_release(&pvec);
553} 564}
554 565
555
556/* 566/*
557 * async writeback completion handler. 567 * async writeback completion handler.
558 * 568 *
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
new file mode 100644
index 000000000000..6bfe65e0b038
--- /dev/null
+++ b/fs/ceph/cache.c
@@ -0,0 +1,398 @@
1/*
2 * Ceph cache definitions.
3 *
4 * Copyright (C) 2013 by Adfin Solutions, Inc. All Rights Reserved.
5 * Written by Milosz Tanski (milosz@adfin.com)
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License version 2
9 * as published by the Free Software Foundation.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to:
18 * Free Software Foundation
19 * 51 Franklin Street, Fifth Floor
20 * Boston, MA 02111-1301 USA
21 *
22 */
23
24#include "super.h"
25#include "cache.h"
26
27struct ceph_aux_inode {
28 struct timespec mtime;
29 loff_t size;
30};
31
32struct fscache_netfs ceph_cache_netfs = {
33 .name = "ceph",
34 .version = 0,
35};
36
37static uint16_t ceph_fscache_session_get_key(const void *cookie_netfs_data,
38 void *buffer, uint16_t maxbuf)
39{
40 const struct ceph_fs_client* fsc = cookie_netfs_data;
41 uint16_t klen;
42
43 klen = sizeof(fsc->client->fsid);
44 if (klen > maxbuf)
45 return 0;
46
47 memcpy(buffer, &fsc->client->fsid, klen);
48 return klen;
49}
50
51static const struct fscache_cookie_def ceph_fscache_fsid_object_def = {
52 .name = "CEPH.fsid",
53 .type = FSCACHE_COOKIE_TYPE_INDEX,
54 .get_key = ceph_fscache_session_get_key,
55};
56
57int ceph_fscache_register(void)
58{
59 return fscache_register_netfs(&ceph_cache_netfs);
60}
61
62void ceph_fscache_unregister(void)
63{
64 fscache_unregister_netfs(&ceph_cache_netfs);
65}
66
67int ceph_fscache_register_fs(struct ceph_fs_client* fsc)
68{
69 fsc->fscache = fscache_acquire_cookie(ceph_cache_netfs.primary_index,
70 &ceph_fscache_fsid_object_def,
71 fsc);
72
73 if (fsc->fscache == NULL) {
74 pr_err("Unable to resgister fsid: %p fscache cookie", fsc);
75 return 0;
76 }
77
78 fsc->revalidate_wq = alloc_workqueue("ceph-revalidate", 0, 1);
79 if (fsc->revalidate_wq == NULL)
80 return -ENOMEM;
81
82 return 0;
83}
84
85static uint16_t ceph_fscache_inode_get_key(const void *cookie_netfs_data,
86 void *buffer, uint16_t maxbuf)
87{
88 const struct ceph_inode_info* ci = cookie_netfs_data;
89 uint16_t klen;
90
91 /* use ceph virtual inode (id + snaphot) */
92 klen = sizeof(ci->i_vino);
93 if (klen > maxbuf)
94 return 0;
95
96 memcpy(buffer, &ci->i_vino, klen);
97 return klen;
98}
99
100static uint16_t ceph_fscache_inode_get_aux(const void *cookie_netfs_data,
101 void *buffer, uint16_t bufmax)
102{
103 struct ceph_aux_inode aux;
104 const struct ceph_inode_info* ci = cookie_netfs_data;
105 const struct inode* inode = &ci->vfs_inode;
106
107 memset(&aux, 0, sizeof(aux));
108 aux.mtime = inode->i_mtime;
109 aux.size = inode->i_size;
110
111 memcpy(buffer, &aux, sizeof(aux));
112
113 return sizeof(aux);
114}
115
116static void ceph_fscache_inode_get_attr(const void *cookie_netfs_data,
117 uint64_t *size)
118{
119 const struct ceph_inode_info* ci = cookie_netfs_data;
120 const struct inode* inode = &ci->vfs_inode;
121
122 *size = inode->i_size;
123}
124
125static enum fscache_checkaux ceph_fscache_inode_check_aux(
126 void *cookie_netfs_data, const void *data, uint16_t dlen)
127{
128 struct ceph_aux_inode aux;
129 struct ceph_inode_info* ci = cookie_netfs_data;
130 struct inode* inode = &ci->vfs_inode;
131
132 if (dlen != sizeof(aux))
133 return FSCACHE_CHECKAUX_OBSOLETE;
134
135 memset(&aux, 0, sizeof(aux));
136 aux.mtime = inode->i_mtime;
137 aux.size = inode->i_size;
138
139 if (memcmp(data, &aux, sizeof(aux)) != 0)
140 return FSCACHE_CHECKAUX_OBSOLETE;
141
142 dout("ceph inode 0x%p cached okay", ci);
143 return FSCACHE_CHECKAUX_OKAY;
144}
145
146static void ceph_fscache_inode_now_uncached(void* cookie_netfs_data)
147{
148 struct ceph_inode_info* ci = cookie_netfs_data;
149 struct pagevec pvec;
150 pgoff_t first;
151 int loop, nr_pages;
152
153 pagevec_init(&pvec, 0);
154 first = 0;
155
156 dout("ceph inode 0x%p now uncached", ci);
157
158 while (1) {
159 nr_pages = pagevec_lookup(&pvec, ci->vfs_inode.i_mapping, first,
160 PAGEVEC_SIZE - pagevec_count(&pvec));
161
162 if (!nr_pages)
163 break;
164
165 for (loop = 0; loop < nr_pages; loop++)
166 ClearPageFsCache(pvec.pages[loop]);
167
168 first = pvec.pages[nr_pages - 1]->index + 1;
169
170 pvec.nr = nr_pages;
171 pagevec_release(&pvec);
172 cond_resched();
173 }
174}
175
176static const struct fscache_cookie_def ceph_fscache_inode_object_def = {
177 .name = "CEPH.inode",
178 .type = FSCACHE_COOKIE_TYPE_DATAFILE,
179 .get_key = ceph_fscache_inode_get_key,
180 .get_attr = ceph_fscache_inode_get_attr,
181 .get_aux = ceph_fscache_inode_get_aux,
182 .check_aux = ceph_fscache_inode_check_aux,
183 .now_uncached = ceph_fscache_inode_now_uncached,
184};
185
186void ceph_fscache_register_inode_cookie(struct ceph_fs_client* fsc,
187 struct ceph_inode_info* ci)
188{
189 struct inode* inode = &ci->vfs_inode;
190
191 /* No caching for filesystem */
192 if (fsc->fscache == NULL)
193 return;
194
195 /* Only cache for regular files that are read only */
196 if ((ci->vfs_inode.i_mode & S_IFREG) == 0)
197 return;
198
199 /* Avoid multiple racing open requests */
200 mutex_lock(&inode->i_mutex);
201
202 if (ci->fscache)
203 goto done;
204
205 ci->fscache = fscache_acquire_cookie(fsc->fscache,
206 &ceph_fscache_inode_object_def,
207 ci);
208done:
209 mutex_unlock(&inode->i_mutex);
210
211}
212
213void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci)
214{
215 struct fscache_cookie* cookie;
216
217 if ((cookie = ci->fscache) == NULL)
218 return;
219
220 ci->fscache = NULL;
221
222 fscache_uncache_all_inode_pages(cookie, &ci->vfs_inode);
223 fscache_relinquish_cookie(cookie, 0);
224}
225
226static void ceph_vfs_readpage_complete(struct page *page, void *data, int error)
227{
228 if (!error)
229 SetPageUptodate(page);
230}
231
232static void ceph_vfs_readpage_complete_unlock(struct page *page, void *data, int error)
233{
234 if (!error)
235 SetPageUptodate(page);
236
237 unlock_page(page);
238}
239
240static inline int cache_valid(struct ceph_inode_info *ci)
241{
242 return ((ceph_caps_issued(ci) & CEPH_CAP_FILE_CACHE) &&
243 (ci->i_fscache_gen == ci->i_rdcache_gen));
244}
245
246
247/* Atempt to read from the fscache,
248 *
249 * This function is called from the readpage_nounlock context. DO NOT attempt to
250 * unlock the page here (or in the callback).
251 */
252int ceph_readpage_from_fscache(struct inode *inode, struct page *page)
253{
254 struct ceph_inode_info *ci = ceph_inode(inode);
255 int ret;
256
257 if (!cache_valid(ci))
258 return -ENOBUFS;
259
260 ret = fscache_read_or_alloc_page(ci->fscache, page,
261 ceph_vfs_readpage_complete, NULL,
262 GFP_KERNEL);
263
264 switch (ret) {
265 case 0: /* Page found */
266 dout("page read submitted\n");
267 return 0;
268 case -ENOBUFS: /* Pages were not found, and can't be */
269 case -ENODATA: /* Pages were not found */
270 dout("page/inode not in cache\n");
271 return ret;
272 default:
273 dout("%s: unknown error ret = %i\n", __func__, ret);
274 return ret;
275 }
276}
277
278int ceph_readpages_from_fscache(struct inode *inode,
279 struct address_space *mapping,
280 struct list_head *pages,
281 unsigned *nr_pages)
282{
283 struct ceph_inode_info *ci = ceph_inode(inode);
284 int ret;
285
286 if (!cache_valid(ci))
287 return -ENOBUFS;
288
289 ret = fscache_read_or_alloc_pages(ci->fscache, mapping, pages, nr_pages,
290 ceph_vfs_readpage_complete_unlock,
291 NULL, mapping_gfp_mask(mapping));
292
293 switch (ret) {
294 case 0: /* All pages found */
295 dout("all-page read submitted\n");
296 return 0;
297 case -ENOBUFS: /* Some pages were not found, and can't be */
298 case -ENODATA: /* some pages were not found */
299 dout("page/inode not in cache\n");
300 return ret;
301 default:
302 dout("%s: unknown error ret = %i\n", __func__, ret);
303 return ret;
304 }
305}
306
307void ceph_readpage_to_fscache(struct inode *inode, struct page *page)
308{
309 struct ceph_inode_info *ci = ceph_inode(inode);
310 int ret;
311
312 if (!PageFsCache(page))
313 return;
314
315 if (!cache_valid(ci))
316 return;
317
318 ret = fscache_write_page(ci->fscache, page, GFP_KERNEL);
319 if (ret)
320 fscache_uncache_page(ci->fscache, page);
321}
322
323void ceph_invalidate_fscache_page(struct inode* inode, struct page *page)
324{
325 struct ceph_inode_info *ci = ceph_inode(inode);
326
327 fscache_wait_on_page_write(ci->fscache, page);
328 fscache_uncache_page(ci->fscache, page);
329}
330
331void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc)
332{
333 if (fsc->revalidate_wq)
334 destroy_workqueue(fsc->revalidate_wq);
335
336 fscache_relinquish_cookie(fsc->fscache, 0);
337 fsc->fscache = NULL;
338}
339
340static void ceph_revalidate_work(struct work_struct *work)
341{
342 int issued;
343 u32 orig_gen;
344 struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
345 i_revalidate_work);
346 struct inode *inode = &ci->vfs_inode;
347
348 spin_lock(&ci->i_ceph_lock);
349 issued = __ceph_caps_issued(ci, NULL);
350 orig_gen = ci->i_rdcache_gen;
351 spin_unlock(&ci->i_ceph_lock);
352
353 if (!(issued & CEPH_CAP_FILE_CACHE)) {
354 dout("revalidate_work lost cache before validation %p\n",
355 inode);
356 goto out;
357 }
358
359 if (!fscache_check_consistency(ci->fscache))
360 fscache_invalidate(ci->fscache);
361
362 spin_lock(&ci->i_ceph_lock);
363 /* Update the new valid generation (backwards sanity check too) */
364 if (orig_gen > ci->i_fscache_gen) {
365 ci->i_fscache_gen = orig_gen;
366 }
367 spin_unlock(&ci->i_ceph_lock);
368
369out:
370 iput(&ci->vfs_inode);
371}
372
373void ceph_queue_revalidate(struct inode *inode)
374{
375 struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
376 struct ceph_inode_info *ci = ceph_inode(inode);
377
378 if (fsc->revalidate_wq == NULL || ci->fscache == NULL)
379 return;
380
381 ihold(inode);
382
383 if (queue_work(ceph_sb_to_client(inode->i_sb)->revalidate_wq,
384 &ci->i_revalidate_work)) {
385 dout("ceph_queue_revalidate %p\n", inode);
386 } else {
387 dout("ceph_queue_revalidate %p failed\n)", inode);
388 iput(inode);
389 }
390}
391
392void ceph_fscache_inode_init(struct ceph_inode_info *ci)
393{
394 ci->fscache = NULL;
395 /* The first load is verifed cookie open time */
396 ci->i_fscache_gen = 1;
397 INIT_WORK(&ci->i_revalidate_work, ceph_revalidate_work);
398}
diff --git a/fs/ceph/cache.h b/fs/ceph/cache.h
new file mode 100644
index 000000000000..ba949408a336
--- /dev/null
+++ b/fs/ceph/cache.h
@@ -0,0 +1,159 @@
1/*
2 * Ceph cache definitions.
3 *
4 * Copyright (C) 2013 by Adfin Solutions, Inc. All Rights Reserved.
5 * Written by Milosz Tanski (milosz@adfin.com)
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License version 2
9 * as published by the Free Software Foundation.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to:
18 * Free Software Foundation
19 * 51 Franklin Street, Fifth Floor
20 * Boston, MA 02111-1301 USA
21 *
22 */
23
24#ifndef _CEPH_CACHE_H
25#define _CEPH_CACHE_H
26
27#ifdef CONFIG_CEPH_FSCACHE
28
29extern struct fscache_netfs ceph_cache_netfs;
30
31int ceph_fscache_register(void);
32void ceph_fscache_unregister(void);
33
34int ceph_fscache_register_fs(struct ceph_fs_client* fsc);
35void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc);
36
37void ceph_fscache_inode_init(struct ceph_inode_info *ci);
38void ceph_fscache_register_inode_cookie(struct ceph_fs_client* fsc,
39 struct ceph_inode_info* ci);
40void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci);
41
42int ceph_readpage_from_fscache(struct inode *inode, struct page *page);
43int ceph_readpages_from_fscache(struct inode *inode,
44 struct address_space *mapping,
45 struct list_head *pages,
46 unsigned *nr_pages);
47void ceph_readpage_to_fscache(struct inode *inode, struct page *page);
48void ceph_invalidate_fscache_page(struct inode* inode, struct page *page);
49void ceph_queue_revalidate(struct inode *inode);
50
51static inline void ceph_fscache_invalidate(struct inode *inode)
52{
53 fscache_invalidate(ceph_inode(inode)->fscache);
54}
55
56static inline void ceph_fscache_uncache_page(struct inode *inode,
57 struct page *page)
58{
59 struct ceph_inode_info *ci = ceph_inode(inode);
60 return fscache_uncache_page(ci->fscache, page);
61}
62
63static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp)
64{
65 struct inode* inode = page->mapping->host;
66 struct ceph_inode_info *ci = ceph_inode(inode);
67 return fscache_maybe_release_page(ci->fscache, page, gfp);
68}
69
70static inline void ceph_fscache_readpages_cancel(struct inode *inode,
71 struct list_head *pages)
72{
73 struct ceph_inode_info *ci = ceph_inode(inode);
74 return fscache_readpages_cancel(ci->fscache, pages);
75}
76
77#else
78
79static inline int ceph_fscache_register(void)
80{
81 return 0;
82}
83
84static inline void ceph_fscache_unregister(void)
85{
86}
87
88static inline int ceph_fscache_register_fs(struct ceph_fs_client* fsc)
89{
90 return 0;
91}
92
93static inline void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc)
94{
95}
96
97static inline void ceph_fscache_inode_init(struct ceph_inode_info *ci)
98{
99}
100
101static inline void ceph_fscache_register_inode_cookie(struct ceph_fs_client* parent_fsc,
102 struct ceph_inode_info* ci)
103{
104}
105
106static inline void ceph_fscache_uncache_page(struct inode *inode,
107 struct page *pages)
108{
109}
110
111static inline int ceph_readpage_from_fscache(struct inode* inode,
112 struct page *page)
113{
114 return -ENOBUFS;
115}
116
117static inline int ceph_readpages_from_fscache(struct inode *inode,
118 struct address_space *mapping,
119 struct list_head *pages,
120 unsigned *nr_pages)
121{
122 return -ENOBUFS;
123}
124
125static inline void ceph_readpage_to_fscache(struct inode *inode,
126 struct page *page)
127{
128}
129
130static inline void ceph_fscache_invalidate(struct inode *inode)
131{
132}
133
134static inline void ceph_invalidate_fscache_page(struct inode *inode,
135 struct page *page)
136{
137}
138
139static inline void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci)
140{
141}
142
143static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp)
144{
145 return 1;
146}
147
148static inline void ceph_fscache_readpages_cancel(struct inode *inode,
149 struct list_head *pages)
150{
151}
152
153static inline void ceph_queue_revalidate(struct inode *inode)
154{
155}
156
157#endif
158
159#endif
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 25442b40c25a..13976c33332e 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -10,6 +10,7 @@
10 10
11#include "super.h" 11#include "super.h"
12#include "mds_client.h" 12#include "mds_client.h"
13#include "cache.h"
13#include <linux/ceph/decode.h> 14#include <linux/ceph/decode.h>
14#include <linux/ceph/messenger.h> 15#include <linux/ceph/messenger.h>
15 16
@@ -479,8 +480,9 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
479 * i_rdcache_gen. 480 * i_rdcache_gen.
480 */ 481 */
481 if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && 482 if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
482 (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) 483 (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) {
483 ci->i_rdcache_gen++; 484 ci->i_rdcache_gen++;
485 }
484 486
485 /* 487 /*
486 * if we are newly issued FILE_SHARED, mark dir not complete; we 488 * if we are newly issued FILE_SHARED, mark dir not complete; we
@@ -2072,19 +2074,17 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2072 /* finish pending truncate */ 2074 /* finish pending truncate */
2073 while (ci->i_truncate_pending) { 2075 while (ci->i_truncate_pending) {
2074 spin_unlock(&ci->i_ceph_lock); 2076 spin_unlock(&ci->i_ceph_lock);
2075 if (!(need & CEPH_CAP_FILE_WR))
2076 mutex_lock(&inode->i_mutex);
2077 __ceph_do_pending_vmtruncate(inode); 2077 __ceph_do_pending_vmtruncate(inode);
2078 if (!(need & CEPH_CAP_FILE_WR))
2079 mutex_unlock(&inode->i_mutex);
2080 spin_lock(&ci->i_ceph_lock); 2078 spin_lock(&ci->i_ceph_lock);
2081 } 2079 }
2082 2080
2083 if (need & CEPH_CAP_FILE_WR) { 2081 have = __ceph_caps_issued(ci, &implemented);
2082
2083 if (have & need & CEPH_CAP_FILE_WR) {
2084 if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) { 2084 if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
2085 dout("get_cap_refs %p endoff %llu > maxsize %llu\n", 2085 dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
2086 inode, endoff, ci->i_max_size); 2086 inode, endoff, ci->i_max_size);
2087 if (endoff > ci->i_wanted_max_size) { 2087 if (endoff > ci->i_requested_max_size) {
2088 *check_max = 1; 2088 *check_max = 1;
2089 ret = 1; 2089 ret = 1;
2090 } 2090 }
@@ -2099,7 +2099,6 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2099 goto out; 2099 goto out;
2100 } 2100 }
2101 } 2101 }
2102 have = __ceph_caps_issued(ci, &implemented);
2103 2102
2104 if ((have & need) == need) { 2103 if ((have & need) == need) {
2105 /* 2104 /*
@@ -2141,14 +2140,17 @@ static void check_max_size(struct inode *inode, loff_t endoff)
2141 2140
2142 /* do we need to explicitly request a larger max_size? */ 2141 /* do we need to explicitly request a larger max_size? */
2143 spin_lock(&ci->i_ceph_lock); 2142 spin_lock(&ci->i_ceph_lock);
2144 if ((endoff >= ci->i_max_size || 2143 if (endoff >= ci->i_max_size && endoff > ci->i_wanted_max_size) {
2145 endoff > (inode->i_size << 1)) &&
2146 endoff > ci->i_wanted_max_size) {
2147 dout("write %p at large endoff %llu, req max_size\n", 2144 dout("write %p at large endoff %llu, req max_size\n",
2148 inode, endoff); 2145 inode, endoff);
2149 ci->i_wanted_max_size = endoff; 2146 ci->i_wanted_max_size = endoff;
2150 check = 1;
2151 } 2147 }
2148 /* duplicate ceph_check_caps()'s logic */
2149 if (ci->i_auth_cap &&
2150 (ci->i_auth_cap->issued & CEPH_CAP_FILE_WR) &&
2151 ci->i_wanted_max_size > ci->i_max_size &&
2152 ci->i_wanted_max_size > ci->i_requested_max_size)
2153 check = 1;
2152 spin_unlock(&ci->i_ceph_lock); 2154 spin_unlock(&ci->i_ceph_lock);
2153 if (check) 2155 if (check)
2154 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); 2156 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
@@ -2334,6 +2336,38 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
2334} 2336}
2335 2337
2336/* 2338/*
2339 * Invalidate unlinked inode's aliases, so we can drop the inode ASAP.
2340 */
2341static void invalidate_aliases(struct inode *inode)
2342{
2343 struct dentry *dn, *prev = NULL;
2344
2345 dout("invalidate_aliases inode %p\n", inode);
2346 d_prune_aliases(inode);
2347 /*
2348 * For non-directory inode, d_find_alias() only returns
2349 * connected dentry. After calling d_invalidate(), the
2350 * dentry become disconnected.
2351 *
2352 * For directory inode, d_find_alias() can return
2353 * disconnected dentry. But directory inode should have
2354 * one alias at most.
2355 */
2356 while ((dn = d_find_alias(inode))) {
2357 if (dn == prev) {
2358 dput(dn);
2359 break;
2360 }
2361 d_invalidate(dn);
2362 if (prev)
2363 dput(prev);
2364 prev = dn;
2365 }
2366 if (prev)
2367 dput(prev);
2368}
2369
2370/*
2337 * Handle a cap GRANT message from the MDS. (Note that a GRANT may 2371 * Handle a cap GRANT message from the MDS. (Note that a GRANT may
2338 * actually be a revocation if it specifies a smaller cap set.) 2372 * actually be a revocation if it specifies a smaller cap set.)
2339 * 2373 *
@@ -2361,8 +2395,9 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2361 int check_caps = 0; 2395 int check_caps = 0;
2362 int wake = 0; 2396 int wake = 0;
2363 int writeback = 0; 2397 int writeback = 0;
2364 int revoked_rdcache = 0;
2365 int queue_invalidate = 0; 2398 int queue_invalidate = 0;
2399 int deleted_inode = 0;
2400 int queue_revalidate = 0;
2366 2401
2367 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", 2402 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
2368 inode, cap, mds, seq, ceph_cap_string(newcaps)); 2403 inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2377,9 +2412,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2377 if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && 2412 if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
2378 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && 2413 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
2379 !ci->i_wrbuffer_ref) { 2414 !ci->i_wrbuffer_ref) {
2380 if (try_nonblocking_invalidate(inode) == 0) { 2415 if (try_nonblocking_invalidate(inode)) {
2381 revoked_rdcache = 1;
2382 } else {
2383 /* there were locked pages.. invalidate later 2416 /* there were locked pages.. invalidate later
2384 in a separate thread. */ 2417 in a separate thread. */
2385 if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { 2418 if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
@@ -2387,6 +2420,8 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2387 ci->i_rdcache_revoking = ci->i_rdcache_gen; 2420 ci->i_rdcache_revoking = ci->i_rdcache_gen;
2388 } 2421 }
2389 } 2422 }
2423
2424 ceph_fscache_invalidate(inode);
2390 } 2425 }
2391 2426
2392 /* side effects now are allowed */ 2427 /* side effects now are allowed */
@@ -2407,8 +2442,12 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2407 from_kgid(&init_user_ns, inode->i_gid)); 2442 from_kgid(&init_user_ns, inode->i_gid));
2408 } 2443 }
2409 2444
2410 if ((issued & CEPH_CAP_LINK_EXCL) == 0) 2445 if ((issued & CEPH_CAP_LINK_EXCL) == 0) {
2411 set_nlink(inode, le32_to_cpu(grant->nlink)); 2446 set_nlink(inode, le32_to_cpu(grant->nlink));
2447 if (inode->i_nlink == 0 &&
2448 (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
2449 deleted_inode = 1;
2450 }
2412 2451
2413 if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) { 2452 if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) {
2414 int len = le32_to_cpu(grant->xattr_len); 2453 int len = le32_to_cpu(grant->xattr_len);
@@ -2424,6 +2463,11 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2424 } 2463 }
2425 } 2464 }
2426 2465
2466 /* Do we need to revalidate our fscache cookie. Don't bother on the
2467 * first cache cap as we already validate at cookie creation time. */
2468 if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1)
2469 queue_revalidate = 1;
2470
2427 /* size/ctime/mtime/atime? */ 2471 /* size/ctime/mtime/atime? */
2428 ceph_fill_file_size(inode, issued, 2472 ceph_fill_file_size(inode, issued,
2429 le32_to_cpu(grant->truncate_seq), 2473 le32_to_cpu(grant->truncate_seq),
@@ -2508,6 +2552,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2508 BUG_ON(cap->issued & ~cap->implemented); 2552 BUG_ON(cap->issued & ~cap->implemented);
2509 2553
2510 spin_unlock(&ci->i_ceph_lock); 2554 spin_unlock(&ci->i_ceph_lock);
2555
2511 if (writeback) 2556 if (writeback)
2512 /* 2557 /*
2513 * queue inode for writeback: we can't actually call 2558 * queue inode for writeback: we can't actually call
@@ -2517,6 +2562,10 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2517 ceph_queue_writeback(inode); 2562 ceph_queue_writeback(inode);
2518 if (queue_invalidate) 2563 if (queue_invalidate)
2519 ceph_queue_invalidate(inode); 2564 ceph_queue_invalidate(inode);
2565 if (deleted_inode)
2566 invalidate_aliases(inode);
2567 if (queue_revalidate)
2568 ceph_queue_revalidate(inode);
2520 if (wake) 2569 if (wake)
2521 wake_up_all(&ci->i_cap_wq); 2570 wake_up_all(&ci->i_cap_wq);
2522 2571
@@ -2673,8 +2722,10 @@ static void handle_cap_trunc(struct inode *inode,
2673 truncate_seq, truncate_size, size); 2722 truncate_seq, truncate_size, size);
2674 spin_unlock(&ci->i_ceph_lock); 2723 spin_unlock(&ci->i_ceph_lock);
2675 2724
2676 if (queue_trunc) 2725 if (queue_trunc) {
2677 ceph_queue_vmtruncate(inode); 2726 ceph_queue_vmtruncate(inode);
2727 ceph_fscache_invalidate(inode);
2728 }
2678} 2729}
2679 2730
2680/* 2731/*
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index a40ceda47a32..868b61d56cac 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -793,6 +793,8 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
793 req->r_locked_dir = dir; 793 req->r_locked_dir = dir;
794 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 794 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
795 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 795 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
796 /* release LINK_SHARED on source inode (mds will lock it) */
797 req->r_old_inode_drop = CEPH_CAP_LINK_SHARED;
796 err = ceph_mdsc_do_request(mdsc, dir, req); 798 err = ceph_mdsc_do_request(mdsc, dir, req);
797 if (err) { 799 if (err) {
798 d_drop(dentry); 800 d_drop(dentry);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 2ddf061c1c4a..3de89829e2a1 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -8,9 +8,11 @@
8#include <linux/namei.h> 8#include <linux/namei.h>
9#include <linux/writeback.h> 9#include <linux/writeback.h>
10#include <linux/aio.h> 10#include <linux/aio.h>
11#include <linux/falloc.h>
11 12
12#include "super.h" 13#include "super.h"
13#include "mds_client.h" 14#include "mds_client.h"
15#include "cache.h"
14 16
15/* 17/*
16 * Ceph file operations 18 * Ceph file operations
@@ -68,9 +70,23 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
68{ 70{
69 struct ceph_file_info *cf; 71 struct ceph_file_info *cf;
70 int ret = 0; 72 int ret = 0;
73 struct ceph_inode_info *ci = ceph_inode(inode);
74 struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
75 struct ceph_mds_client *mdsc = fsc->mdsc;
71 76
72 switch (inode->i_mode & S_IFMT) { 77 switch (inode->i_mode & S_IFMT) {
73 case S_IFREG: 78 case S_IFREG:
79 /* First file open request creates the cookie, we want to keep
80 * this cookie around for the filetime of the inode as not to
81 * have to worry about fscache register / revoke / operation
82 * races.
83 *
84 * Also, if we know the operation is going to invalidate data
85 * (non readonly) just nuke the cache right away.
86 */
87 ceph_fscache_register_inode_cookie(mdsc->fsc, ci);
88 if ((fmode & CEPH_FILE_MODE_WR))
89 ceph_fscache_invalidate(inode);
74 case S_IFDIR: 90 case S_IFDIR:
75 dout("init_file %p %p 0%o (regular)\n", inode, file, 91 dout("init_file %p %p 0%o (regular)\n", inode, file,
76 inode->i_mode); 92 inode->i_mode);
@@ -181,6 +197,7 @@ int ceph_open(struct inode *inode, struct file *file)
181 spin_unlock(&ci->i_ceph_lock); 197 spin_unlock(&ci->i_ceph_lock);
182 return ceph_init_file(inode, file, fmode); 198 return ceph_init_file(inode, file, fmode);
183 } 199 }
200
184 spin_unlock(&ci->i_ceph_lock); 201 spin_unlock(&ci->i_ceph_lock);
185 202
186 dout("open fmode %d wants %s\n", fmode, ceph_cap_string(wanted)); 203 dout("open fmode %d wants %s\n", fmode, ceph_cap_string(wanted));
@@ -191,6 +208,7 @@ int ceph_open(struct inode *inode, struct file *file)
191 } 208 }
192 req->r_inode = inode; 209 req->r_inode = inode;
193 ihold(inode); 210 ihold(inode);
211
194 req->r_num_caps = 1; 212 req->r_num_caps = 1;
195 if (flags & (O_CREAT|O_TRUNC)) 213 if (flags & (O_CREAT|O_TRUNC))
196 parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); 214 parent_inode = ceph_get_dentry_parent_inode(file->f_dentry);
@@ -313,9 +331,9 @@ static int striped_read(struct inode *inode,
313{ 331{
314 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 332 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
315 struct ceph_inode_info *ci = ceph_inode(inode); 333 struct ceph_inode_info *ci = ceph_inode(inode);
316 u64 pos, this_len; 334 u64 pos, this_len, left;
317 int io_align, page_align; 335 int io_align, page_align;
318 int left, pages_left; 336 int pages_left;
319 int read; 337 int read;
320 struct page **page_pos; 338 struct page **page_pos;
321 int ret; 339 int ret;
@@ -346,47 +364,40 @@ more:
346 ret = 0; 364 ret = 0;
347 hit_stripe = this_len < left; 365 hit_stripe = this_len < left;
348 was_short = ret >= 0 && ret < this_len; 366 was_short = ret >= 0 && ret < this_len;
349 dout("striped_read %llu~%u (read %u) got %d%s%s\n", pos, left, read, 367 dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read,
350 ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : ""); 368 ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
351 369
352 if (ret > 0) { 370 if (ret >= 0) {
353 int didpages = (page_align + ret) >> PAGE_CACHE_SHIFT; 371 int didpages;
354 372 if (was_short && (pos + ret < inode->i_size)) {
355 if (read < pos - off) { 373 u64 tmp = min(this_len - ret,
356 dout(" zero gap %llu to %llu\n", off + read, pos); 374 inode->i_size - pos - ret);
357 ceph_zero_page_vector_range(page_align + read, 375 dout(" zero gap %llu to %llu\n",
358 pos - off - read, pages); 376 pos + ret, pos + ret + tmp);
377 ceph_zero_page_vector_range(page_align + read + ret,
378 tmp, pages);
379 ret += tmp;
359 } 380 }
381
382 didpages = (page_align + ret) >> PAGE_CACHE_SHIFT;
360 pos += ret; 383 pos += ret;
361 read = pos - off; 384 read = pos - off;
362 left -= ret; 385 left -= ret;
363 page_pos += didpages; 386 page_pos += didpages;
364 pages_left -= didpages; 387 pages_left -= didpages;
365 388
366 /* hit stripe? */ 389 /* hit stripe and need continue*/
367 if (left && hit_stripe) 390 if (left && hit_stripe && pos < inode->i_size)
368 goto more; 391 goto more;
369 } 392 }
370 393
371 if (was_short) { 394 if (read > 0) {
395 ret = read;
372 /* did we bounce off eof? */ 396 /* did we bounce off eof? */
373 if (pos + left > inode->i_size) 397 if (pos + left > inode->i_size)
374 *checkeof = 1; 398 *checkeof = 1;
375
376 /* zero trailing bytes (inside i_size) */
377 if (left > 0 && pos < inode->i_size) {
378 if (pos + left > inode->i_size)
379 left = inode->i_size - pos;
380
381 dout("zero tail %d\n", left);
382 ceph_zero_page_vector_range(page_align + read, left,
383 pages);
384 read += left;
385 }
386 } 399 }
387 400
388 if (ret >= 0)
389 ret = read;
390 dout("striped_read returns %d\n", ret); 401 dout("striped_read returns %d\n", ret);
391 return ret; 402 return ret;
392} 403}
@@ -618,6 +629,8 @@ out:
618 if (check_caps) 629 if (check_caps)
619 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, 630 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
620 NULL); 631 NULL);
632 } else if (ret != -EOLDSNAPC && written > 0) {
633 ret = written;
621 } 634 }
622 return ret; 635 return ret;
623} 636}
@@ -659,7 +672,6 @@ again:
659 672
660 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || 673 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
661 (iocb->ki_filp->f_flags & O_DIRECT) || 674 (iocb->ki_filp->f_flags & O_DIRECT) ||
662 (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
663 (fi->flags & CEPH_F_SYNC)) 675 (fi->flags & CEPH_F_SYNC))
664 /* hmm, this isn't really async... */ 676 /* hmm, this isn't really async... */
665 ret = ceph_sync_read(filp, base, len, ppos, &checkeof); 677 ret = ceph_sync_read(filp, base, len, ppos, &checkeof);
@@ -711,13 +723,11 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
711 &ceph_sb_to_client(inode->i_sb)->client->osdc; 723 &ceph_sb_to_client(inode->i_sb)->client->osdc;
712 ssize_t count, written = 0; 724 ssize_t count, written = 0;
713 int err, want, got; 725 int err, want, got;
714 bool hold_mutex;
715 726
716 if (ceph_snap(inode) != CEPH_NOSNAP) 727 if (ceph_snap(inode) != CEPH_NOSNAP)
717 return -EROFS; 728 return -EROFS;
718 729
719 mutex_lock(&inode->i_mutex); 730 mutex_lock(&inode->i_mutex);
720 hold_mutex = true;
721 731
722 err = generic_segment_checks(iov, &nr_segs, &count, VERIFY_READ); 732 err = generic_segment_checks(iov, &nr_segs, &count, VERIFY_READ);
723 if (err) 733 if (err)
@@ -763,18 +773,31 @@ retry_snap:
763 773
764 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || 774 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
765 (iocb->ki_filp->f_flags & O_DIRECT) || 775 (iocb->ki_filp->f_flags & O_DIRECT) ||
766 (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
767 (fi->flags & CEPH_F_SYNC)) { 776 (fi->flags & CEPH_F_SYNC)) {
768 mutex_unlock(&inode->i_mutex); 777 mutex_unlock(&inode->i_mutex);
769 written = ceph_sync_write(file, iov->iov_base, count, 778 written = ceph_sync_write(file, iov->iov_base, count,
770 pos, &iocb->ki_pos); 779 pos, &iocb->ki_pos);
780 if (written == -EOLDSNAPC) {
781 dout("aio_write %p %llx.%llx %llu~%u"
782 "got EOLDSNAPC, retrying\n",
783 inode, ceph_vinop(inode),
784 pos, (unsigned)iov->iov_len);
785 mutex_lock(&inode->i_mutex);
786 goto retry_snap;
787 }
771 } else { 788 } else {
789 /*
790 * No need to acquire the i_truncate_mutex. Because
791 * the MDS revokes Fwb caps before sending truncate
792 * message to us. We can't get Fwb cap while there
793 * are pending vmtruncate. So write and vmtruncate
794 * can not run at the same time
795 */
772 written = generic_file_buffered_write(iocb, iov, nr_segs, 796 written = generic_file_buffered_write(iocb, iov, nr_segs,
773 pos, &iocb->ki_pos, 797 pos, &iocb->ki_pos,
774 count, 0); 798 count, 0);
775 mutex_unlock(&inode->i_mutex); 799 mutex_unlock(&inode->i_mutex);
776 } 800 }
777 hold_mutex = false;
778 801
779 if (written >= 0) { 802 if (written >= 0) {
780 int dirty; 803 int dirty;
@@ -798,18 +821,12 @@ retry_snap:
798 written = err; 821 written = err;
799 } 822 }
800 823
801 if (written == -EOLDSNAPC) { 824 goto out_unlocked;
802 dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n", 825
803 inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len);
804 mutex_lock(&inode->i_mutex);
805 hold_mutex = true;
806 goto retry_snap;
807 }
808out: 826out:
809 if (hold_mutex) 827 mutex_unlock(&inode->i_mutex);
810 mutex_unlock(&inode->i_mutex); 828out_unlocked:
811 current->backing_dev_info = NULL; 829 current->backing_dev_info = NULL;
812
813 return written ? written : err; 830 return written ? written : err;
814} 831}
815 832
@@ -822,7 +839,6 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
822 int ret; 839 int ret;
823 840
824 mutex_lock(&inode->i_mutex); 841 mutex_lock(&inode->i_mutex);
825 __ceph_do_pending_vmtruncate(inode);
826 842
827 if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) { 843 if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
828 ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); 844 ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE);
@@ -871,6 +887,204 @@ out:
871 return offset; 887 return offset;
872} 888}
873 889
890static inline void ceph_zero_partial_page(
891 struct inode *inode, loff_t offset, unsigned size)
892{
893 struct page *page;
894 pgoff_t index = offset >> PAGE_CACHE_SHIFT;
895
896 page = find_lock_page(inode->i_mapping, index);
897 if (page) {
898 wait_on_page_writeback(page);
899 zero_user(page, offset & (PAGE_CACHE_SIZE - 1), size);
900 unlock_page(page);
901 page_cache_release(page);
902 }
903}
904
905static void ceph_zero_pagecache_range(struct inode *inode, loff_t offset,
906 loff_t length)
907{
908 loff_t nearly = round_up(offset, PAGE_CACHE_SIZE);
909 if (offset < nearly) {
910 loff_t size = nearly - offset;
911 if (length < size)
912 size = length;
913 ceph_zero_partial_page(inode, offset, size);
914 offset += size;
915 length -= size;
916 }
917 if (length >= PAGE_CACHE_SIZE) {
918 loff_t size = round_down(length, PAGE_CACHE_SIZE);
919 truncate_pagecache_range(inode, offset, offset + size - 1);
920 offset += size;
921 length -= size;
922 }
923 if (length)
924 ceph_zero_partial_page(inode, offset, length);
925}
926
927static int ceph_zero_partial_object(struct inode *inode,
928 loff_t offset, loff_t *length)
929{
930 struct ceph_inode_info *ci = ceph_inode(inode);
931 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
932 struct ceph_osd_request *req;
933 int ret = 0;
934 loff_t zero = 0;
935 int op;
936
937 if (!length) {
938 op = offset ? CEPH_OSD_OP_DELETE : CEPH_OSD_OP_TRUNCATE;
939 length = &zero;
940 } else {
941 op = CEPH_OSD_OP_ZERO;
942 }
943
944 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
945 ceph_vino(inode),
946 offset, length,
947 1, op,
948 CEPH_OSD_FLAG_WRITE |
949 CEPH_OSD_FLAG_ONDISK,
950 NULL, 0, 0, false);
951 if (IS_ERR(req)) {
952 ret = PTR_ERR(req);
953 goto out;
954 }
955
956 ceph_osdc_build_request(req, offset, NULL, ceph_vino(inode).snap,
957 &inode->i_mtime);
958
959 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
960 if (!ret) {
961 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
962 if (ret == -ENOENT)
963 ret = 0;
964 }
965 ceph_osdc_put_request(req);
966
967out:
968 return ret;
969}
970
971static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length)
972{
973 int ret = 0;
974 struct ceph_inode_info *ci = ceph_inode(inode);
975 s32 stripe_unit = ceph_file_layout_su(ci->i_layout);
976 s32 stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
977 s32 object_size = ceph_file_layout_object_size(ci->i_layout);
978 u64 object_set_size = object_size * stripe_count;
979 u64 nearly, t;
980
981 /* round offset up to next period boundary */
982 nearly = offset + object_set_size - 1;
983 t = nearly;
984 nearly -= do_div(t, object_set_size);
985
986 while (length && offset < nearly) {
987 loff_t size = length;
988 ret = ceph_zero_partial_object(inode, offset, &size);
989 if (ret < 0)
990 return ret;
991 offset += size;
992 length -= size;
993 }
994 while (length >= object_set_size) {
995 int i;
996 loff_t pos = offset;
997 for (i = 0; i < stripe_count; ++i) {
998 ret = ceph_zero_partial_object(inode, pos, NULL);
999 if (ret < 0)
1000 return ret;
1001 pos += stripe_unit;
1002 }
1003 offset += object_set_size;
1004 length -= object_set_size;
1005 }
1006 while (length) {
1007 loff_t size = length;
1008 ret = ceph_zero_partial_object(inode, offset, &size);
1009 if (ret < 0)
1010 return ret;
1011 offset += size;
1012 length -= size;
1013 }
1014 return ret;
1015}
1016
1017static long ceph_fallocate(struct file *file, int mode,
1018 loff_t offset, loff_t length)
1019{
1020 struct ceph_file_info *fi = file->private_data;
1021 struct inode *inode = file->f_dentry->d_inode;
1022 struct ceph_inode_info *ci = ceph_inode(inode);
1023 struct ceph_osd_client *osdc =
1024 &ceph_inode_to_client(inode)->client->osdc;
1025 int want, got = 0;
1026 int dirty;
1027 int ret = 0;
1028 loff_t endoff = 0;
1029 loff_t size;
1030
1031 if (!S_ISREG(inode->i_mode))
1032 return -EOPNOTSUPP;
1033
1034 if (IS_SWAPFILE(inode))
1035 return -ETXTBSY;
1036
1037 mutex_lock(&inode->i_mutex);
1038
1039 if (ceph_snap(inode) != CEPH_NOSNAP) {
1040 ret = -EROFS;
1041 goto unlock;
1042 }
1043
1044 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) &&
1045 !(mode & FALLOC_FL_PUNCH_HOLE)) {
1046 ret = -ENOSPC;
1047 goto unlock;
1048 }
1049
1050 size = i_size_read(inode);
1051 if (!(mode & FALLOC_FL_KEEP_SIZE))
1052 endoff = offset + length;
1053
1054 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1055 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1056 else
1057 want = CEPH_CAP_FILE_BUFFER;
1058
1059 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
1060 if (ret < 0)
1061 goto unlock;
1062
1063 if (mode & FALLOC_FL_PUNCH_HOLE) {
1064 if (offset < size)
1065 ceph_zero_pagecache_range(inode, offset, length);
1066 ret = ceph_zero_objects(inode, offset, length);
1067 } else if (endoff > size) {
1068 truncate_pagecache_range(inode, size, -1);
1069 if (ceph_inode_set_size(inode, endoff))
1070 ceph_check_caps(ceph_inode(inode),
1071 CHECK_CAPS_AUTHONLY, NULL);
1072 }
1073
1074 if (!ret) {
1075 spin_lock(&ci->i_ceph_lock);
1076 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
1077 spin_unlock(&ci->i_ceph_lock);
1078 if (dirty)
1079 __mark_inode_dirty(inode, dirty);
1080 }
1081
1082 ceph_put_cap_refs(ci, got);
1083unlock:
1084 mutex_unlock(&inode->i_mutex);
1085 return ret;
1086}
1087
874const struct file_operations ceph_file_fops = { 1088const struct file_operations ceph_file_fops = {
875 .open = ceph_open, 1089 .open = ceph_open,
876 .release = ceph_release, 1090 .release = ceph_release,
@@ -887,5 +1101,6 @@ const struct file_operations ceph_file_fops = {
887 .splice_write = generic_file_splice_write, 1101 .splice_write = generic_file_splice_write,
888 .unlocked_ioctl = ceph_ioctl, 1102 .unlocked_ioctl = ceph_ioctl,
889 .compat_ioctl = ceph_ioctl, 1103 .compat_ioctl = ceph_ioctl,
1104 .fallocate = ceph_fallocate,
890}; 1105};
891 1106
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index f3a2abf28a77..8549a48115f7 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -12,6 +12,7 @@
12 12
13#include "super.h" 13#include "super.h"
14#include "mds_client.h" 14#include "mds_client.h"
15#include "cache.h"
15#include <linux/ceph/decode.h> 16#include <linux/ceph/decode.h>
16 17
17/* 18/*
@@ -344,6 +345,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
344 for (i = 0; i < CEPH_FILE_MODE_NUM; i++) 345 for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
345 ci->i_nr_by_mode[i] = 0; 346 ci->i_nr_by_mode[i] = 0;
346 347
348 mutex_init(&ci->i_truncate_mutex);
347 ci->i_truncate_seq = 0; 349 ci->i_truncate_seq = 0;
348 ci->i_truncate_size = 0; 350 ci->i_truncate_size = 0;
349 ci->i_truncate_pending = 0; 351 ci->i_truncate_pending = 0;
@@ -377,6 +379,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
377 379
378 INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work); 380 INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
379 381
382 ceph_fscache_inode_init(ci);
383
380 return &ci->vfs_inode; 384 return &ci->vfs_inode;
381} 385}
382 386
@@ -396,6 +400,8 @@ void ceph_destroy_inode(struct inode *inode)
396 400
397 dout("destroy_inode %p ino %llx.%llx\n", inode, ceph_vinop(inode)); 401 dout("destroy_inode %p ino %llx.%llx\n", inode, ceph_vinop(inode));
398 402
403 ceph_fscache_unregister_inode_cookie(ci);
404
399 ceph_queue_caps_release(inode); 405 ceph_queue_caps_release(inode);
400 406
401 /* 407 /*
@@ -430,7 +436,6 @@ void ceph_destroy_inode(struct inode *inode)
430 call_rcu(&inode->i_rcu, ceph_i_callback); 436 call_rcu(&inode->i_rcu, ceph_i_callback);
431} 437}
432 438
433
434/* 439/*
435 * Helpers to fill in size, ctime, mtime, and atime. We have to be 440 * Helpers to fill in size, ctime, mtime, and atime. We have to be
436 * careful because either the client or MDS may have more up to date 441 * careful because either the client or MDS may have more up to date
@@ -455,16 +460,20 @@ int ceph_fill_file_size(struct inode *inode, int issued,
455 dout("truncate_seq %u -> %u\n", 460 dout("truncate_seq %u -> %u\n",
456 ci->i_truncate_seq, truncate_seq); 461 ci->i_truncate_seq, truncate_seq);
457 ci->i_truncate_seq = truncate_seq; 462 ci->i_truncate_seq = truncate_seq;
463
464 /* the MDS should have revoked these caps */
465 WARN_ON_ONCE(issued & (CEPH_CAP_FILE_EXCL |
466 CEPH_CAP_FILE_RD |
467 CEPH_CAP_FILE_WR |
468 CEPH_CAP_FILE_LAZYIO));
458 /* 469 /*
459 * If we hold relevant caps, or in the case where we're 470 * If we hold relevant caps, or in the case where we're
460 * not the only client referencing this file and we 471 * not the only client referencing this file and we
461 * don't hold those caps, then we need to check whether 472 * don't hold those caps, then we need to check whether
462 * the file is either opened or mmaped 473 * the file is either opened or mmaped
463 */ 474 */
464 if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD| 475 if ((issued & (CEPH_CAP_FILE_CACHE|
465 CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER| 476 CEPH_CAP_FILE_BUFFER)) ||
466 CEPH_CAP_FILE_EXCL|
467 CEPH_CAP_FILE_LAZYIO)) ||
468 mapping_mapped(inode->i_mapping) || 477 mapping_mapped(inode->i_mapping) ||
469 __ceph_caps_file_wanted(ci)) { 478 __ceph_caps_file_wanted(ci)) {
470 ci->i_truncate_pending++; 479 ci->i_truncate_pending++;
@@ -478,6 +487,10 @@ int ceph_fill_file_size(struct inode *inode, int issued,
478 truncate_size); 487 truncate_size);
479 ci->i_truncate_size = truncate_size; 488 ci->i_truncate_size = truncate_size;
480 } 489 }
490
491 if (queue_trunc)
492 ceph_fscache_invalidate(inode);
493
481 return queue_trunc; 494 return queue_trunc;
482} 495}
483 496
@@ -1066,7 +1079,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
1066 * complete. 1079 * complete.
1067 */ 1080 */
1068 ceph_set_dentry_offset(req->r_old_dentry); 1081 ceph_set_dentry_offset(req->r_old_dentry);
1069 dout("dn %p gets new offset %lld\n", req->r_old_dentry, 1082 dout("dn %p gets new offset %lld\n", req->r_old_dentry,
1070 ceph_dentry(req->r_old_dentry)->offset); 1083 ceph_dentry(req->r_old_dentry)->offset);
1071 1084
1072 dn = req->r_old_dentry; /* use old_dentry */ 1085 dn = req->r_old_dentry; /* use old_dentry */
@@ -1419,18 +1432,20 @@ static void ceph_invalidate_work(struct work_struct *work)
1419 u32 orig_gen; 1432 u32 orig_gen;
1420 int check = 0; 1433 int check = 0;
1421 1434
1435 mutex_lock(&ci->i_truncate_mutex);
1422 spin_lock(&ci->i_ceph_lock); 1436 spin_lock(&ci->i_ceph_lock);
1423 dout("invalidate_pages %p gen %d revoking %d\n", inode, 1437 dout("invalidate_pages %p gen %d revoking %d\n", inode,
1424 ci->i_rdcache_gen, ci->i_rdcache_revoking); 1438 ci->i_rdcache_gen, ci->i_rdcache_revoking);
1425 if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { 1439 if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
1426 /* nevermind! */ 1440 /* nevermind! */
1427 spin_unlock(&ci->i_ceph_lock); 1441 spin_unlock(&ci->i_ceph_lock);
1442 mutex_unlock(&ci->i_truncate_mutex);
1428 goto out; 1443 goto out;
1429 } 1444 }
1430 orig_gen = ci->i_rdcache_gen; 1445 orig_gen = ci->i_rdcache_gen;
1431 spin_unlock(&ci->i_ceph_lock); 1446 spin_unlock(&ci->i_ceph_lock);
1432 1447
1433 truncate_inode_pages(&inode->i_data, 0); 1448 truncate_inode_pages(inode->i_mapping, 0);
1434 1449
1435 spin_lock(&ci->i_ceph_lock); 1450 spin_lock(&ci->i_ceph_lock);
1436 if (orig_gen == ci->i_rdcache_gen && 1451 if (orig_gen == ci->i_rdcache_gen &&
@@ -1445,6 +1460,7 @@ static void ceph_invalidate_work(struct work_struct *work)
1445 ci->i_rdcache_revoking); 1460 ci->i_rdcache_revoking);
1446 } 1461 }
1447 spin_unlock(&ci->i_ceph_lock); 1462 spin_unlock(&ci->i_ceph_lock);
1463 mutex_unlock(&ci->i_truncate_mutex);
1448 1464
1449 if (check) 1465 if (check)
1450 ceph_check_caps(ci, 0, NULL); 1466 ceph_check_caps(ci, 0, NULL);
@@ -1465,9 +1481,7 @@ static void ceph_vmtruncate_work(struct work_struct *work)
1465 struct inode *inode = &ci->vfs_inode; 1481 struct inode *inode = &ci->vfs_inode;
1466 1482
1467 dout("vmtruncate_work %p\n", inode); 1483 dout("vmtruncate_work %p\n", inode);
1468 mutex_lock(&inode->i_mutex);
1469 __ceph_do_pending_vmtruncate(inode); 1484 __ceph_do_pending_vmtruncate(inode);
1470 mutex_unlock(&inode->i_mutex);
1471 iput(inode); 1485 iput(inode);
1472} 1486}
1473 1487
@@ -1480,6 +1494,7 @@ void ceph_queue_vmtruncate(struct inode *inode)
1480 struct ceph_inode_info *ci = ceph_inode(inode); 1494 struct ceph_inode_info *ci = ceph_inode(inode);
1481 1495
1482 ihold(inode); 1496 ihold(inode);
1497
1483 if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq, 1498 if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq,
1484 &ci->i_vmtruncate_work)) { 1499 &ci->i_vmtruncate_work)) {
1485 dout("ceph_queue_vmtruncate %p\n", inode); 1500 dout("ceph_queue_vmtruncate %p\n", inode);
@@ -1500,11 +1515,13 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)
1500 u64 to; 1515 u64 to;
1501 int wrbuffer_refs, finish = 0; 1516 int wrbuffer_refs, finish = 0;
1502 1517
1518 mutex_lock(&ci->i_truncate_mutex);
1503retry: 1519retry:
1504 spin_lock(&ci->i_ceph_lock); 1520 spin_lock(&ci->i_ceph_lock);
1505 if (ci->i_truncate_pending == 0) { 1521 if (ci->i_truncate_pending == 0) {
1506 dout("__do_pending_vmtruncate %p none pending\n", inode); 1522 dout("__do_pending_vmtruncate %p none pending\n", inode);
1507 spin_unlock(&ci->i_ceph_lock); 1523 spin_unlock(&ci->i_ceph_lock);
1524 mutex_unlock(&ci->i_truncate_mutex);
1508 return; 1525 return;
1509 } 1526 }
1510 1527
@@ -1521,6 +1538,9 @@ retry:
1521 goto retry; 1538 goto retry;
1522 } 1539 }
1523 1540
1541 /* there should be no reader or writer */
1542 WARN_ON_ONCE(ci->i_rd_ref || ci->i_wr_ref);
1543
1524 to = ci->i_truncate_size; 1544 to = ci->i_truncate_size;
1525 wrbuffer_refs = ci->i_wrbuffer_ref; 1545 wrbuffer_refs = ci->i_wrbuffer_ref;
1526 dout("__do_pending_vmtruncate %p (%d) to %lld\n", inode, 1546 dout("__do_pending_vmtruncate %p (%d) to %lld\n", inode,
@@ -1538,13 +1558,14 @@ retry:
1538 if (!finish) 1558 if (!finish)
1539 goto retry; 1559 goto retry;
1540 1560
1561 mutex_unlock(&ci->i_truncate_mutex);
1562
1541 if (wrbuffer_refs == 0) 1563 if (wrbuffer_refs == 0)
1542 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); 1564 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
1543 1565
1544 wake_up_all(&ci->i_cap_wq); 1566 wake_up_all(&ci->i_cap_wq);
1545} 1567}
1546 1568
1547
1548/* 1569/*
1549 * symlinks 1570 * symlinks
1550 */ 1571 */
@@ -1586,8 +1607,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1586 if (ceph_snap(inode) != CEPH_NOSNAP) 1607 if (ceph_snap(inode) != CEPH_NOSNAP)
1587 return -EROFS; 1608 return -EROFS;
1588 1609
1589 __ceph_do_pending_vmtruncate(inode);
1590
1591 err = inode_change_ok(inode, attr); 1610 err = inode_change_ok(inode, attr);
1592 if (err != 0) 1611 if (err != 0)
1593 return err; 1612 return err;
@@ -1768,7 +1787,8 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1768 ceph_cap_string(dirtied), mask); 1787 ceph_cap_string(dirtied), mask);
1769 1788
1770 ceph_mdsc_put_request(req); 1789 ceph_mdsc_put_request(req);
1771 __ceph_do_pending_vmtruncate(inode); 1790 if (mask & CEPH_SETATTR_SIZE)
1791 __ceph_do_pending_vmtruncate(inode);
1772 return err; 1792 return err;
1773out: 1793out:
1774 spin_unlock(&ci->i_ceph_lock); 1794 spin_unlock(&ci->i_ceph_lock);
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index e0b4ef31d3c8..669622fd1ae3 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -196,8 +196,10 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
196 r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len, 196 r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len,
197 &dl.object_no, &dl.object_offset, 197 &dl.object_no, &dl.object_offset,
198 &olen); 198 &olen);
199 if (r < 0) 199 if (r < 0) {
200 up_read(&osdc->map_sem);
200 return -EIO; 201 return -EIO;
202 }
201 dl.file_offset -= dl.object_offset; 203 dl.file_offset -= dl.object_offset;
202 dl.object_size = ceph_file_layout_object_size(ci->i_layout); 204 dl.object_size = ceph_file_layout_object_size(ci->i_layout);
203 dl.block_size = ceph_file_layout_su(ci->i_layout); 205 dl.block_size = ceph_file_layout_su(ci->i_layout);
@@ -209,8 +211,12 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
209 snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx", 211 snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx",
210 ceph_ino(inode), dl.object_no); 212 ceph_ino(inode), dl.object_no);
211 213
212 ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap, 214 r = ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap,
213 ceph_file_layout_pg_pool(ci->i_layout)); 215 ceph_file_layout_pg_pool(ci->i_layout));
216 if (r < 0) {
217 up_read(&osdc->map_sem);
218 return r;
219 }
214 220
215 dl.osd = ceph_calc_pg_primary(osdc->osdmap, pgid); 221 dl.osd = ceph_calc_pg_primary(osdc->osdmap, pgid);
216 if (dl.osd >= 0) { 222 if (dl.osd >= 0) {
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 187bf214444d..b7bda5d9611d 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -414,6 +414,9 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
414{ 414{
415 struct ceph_mds_session *s; 415 struct ceph_mds_session *s;
416 416
417 if (mds >= mdsc->mdsmap->m_max_mds)
418 return ERR_PTR(-EINVAL);
419
417 s = kzalloc(sizeof(*s), GFP_NOFS); 420 s = kzalloc(sizeof(*s), GFP_NOFS);
418 if (!s) 421 if (!s)
419 return ERR_PTR(-ENOMEM); 422 return ERR_PTR(-ENOMEM);
@@ -1028,6 +1031,37 @@ static void remove_session_caps(struct ceph_mds_session *session)
1028{ 1031{
1029 dout("remove_session_caps on %p\n", session); 1032 dout("remove_session_caps on %p\n", session);
1030 iterate_session_caps(session, remove_session_caps_cb, NULL); 1033 iterate_session_caps(session, remove_session_caps_cb, NULL);
1034
1035 spin_lock(&session->s_cap_lock);
1036 if (session->s_nr_caps > 0) {
1037 struct super_block *sb = session->s_mdsc->fsc->sb;
1038 struct inode *inode;
1039 struct ceph_cap *cap, *prev = NULL;
1040 struct ceph_vino vino;
1041 /*
1042 * iterate_session_caps() skips inodes that are being
1043 * deleted, we need to wait until deletions are complete.
1044 * __wait_on_freeing_inode() is designed for the job,
1045 * but it is not exported, so use lookup inode function
1046 * to access it.
1047 */
1048 while (!list_empty(&session->s_caps)) {
1049 cap = list_entry(session->s_caps.next,
1050 struct ceph_cap, session_caps);
1051 if (cap == prev)
1052 break;
1053 prev = cap;
1054 vino = cap->ci->i_vino;
1055 spin_unlock(&session->s_cap_lock);
1056
1057 inode = ceph_find_inode(sb, vino);
1058 iput(inode);
1059
1060 spin_lock(&session->s_cap_lock);
1061 }
1062 }
1063 spin_unlock(&session->s_cap_lock);
1064
1031 BUG_ON(session->s_nr_caps > 0); 1065 BUG_ON(session->s_nr_caps > 0);
1032 BUG_ON(!list_empty(&session->s_cap_flushing)); 1066 BUG_ON(!list_empty(&session->s_cap_flushing));
1033 cleanup_cap_releases(session); 1067 cleanup_cap_releases(session);
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 6627b26a800c..6a0951e43044 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -17,6 +17,7 @@
17 17
18#include "super.h" 18#include "super.h"
19#include "mds_client.h" 19#include "mds_client.h"
20#include "cache.h"
20 21
21#include <linux/ceph/ceph_features.h> 22#include <linux/ceph/ceph_features.h>
22#include <linux/ceph/decode.h> 23#include <linux/ceph/decode.h>
@@ -142,6 +143,8 @@ enum {
142 Opt_nodcache, 143 Opt_nodcache,
143 Opt_ino32, 144 Opt_ino32,
144 Opt_noino32, 145 Opt_noino32,
146 Opt_fscache,
147 Opt_nofscache
145}; 148};
146 149
147static match_table_t fsopt_tokens = { 150static match_table_t fsopt_tokens = {
@@ -167,6 +170,8 @@ static match_table_t fsopt_tokens = {
167 {Opt_nodcache, "nodcache"}, 170 {Opt_nodcache, "nodcache"},
168 {Opt_ino32, "ino32"}, 171 {Opt_ino32, "ino32"},
169 {Opt_noino32, "noino32"}, 172 {Opt_noino32, "noino32"},
173 {Opt_fscache, "fsc"},
174 {Opt_nofscache, "nofsc"},
170 {-1, NULL} 175 {-1, NULL}
171}; 176};
172 177
@@ -260,6 +265,12 @@ static int parse_fsopt_token(char *c, void *private)
260 case Opt_noino32: 265 case Opt_noino32:
261 fsopt->flags &= ~CEPH_MOUNT_OPT_INO32; 266 fsopt->flags &= ~CEPH_MOUNT_OPT_INO32;
262 break; 267 break;
268 case Opt_fscache:
269 fsopt->flags |= CEPH_MOUNT_OPT_FSCACHE;
270 break;
271 case Opt_nofscache:
272 fsopt->flags &= ~CEPH_MOUNT_OPT_FSCACHE;
273 break;
263 default: 274 default:
264 BUG_ON(token); 275 BUG_ON(token);
265 } 276 }
@@ -422,6 +433,10 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
422 seq_puts(m, ",dcache"); 433 seq_puts(m, ",dcache");
423 else 434 else
424 seq_puts(m, ",nodcache"); 435 seq_puts(m, ",nodcache");
436 if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE)
437 seq_puts(m, ",fsc");
438 else
439 seq_puts(m, ",nofsc");
425 440
426 if (fsopt->wsize) 441 if (fsopt->wsize)
427 seq_printf(m, ",wsize=%d", fsopt->wsize); 442 seq_printf(m, ",wsize=%d", fsopt->wsize);
@@ -530,11 +545,18 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
530 if (!fsc->wb_pagevec_pool) 545 if (!fsc->wb_pagevec_pool)
531 goto fail_trunc_wq; 546 goto fail_trunc_wq;
532 547
548 /* setup fscache */
549 if ((fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) &&
550 (ceph_fscache_register_fs(fsc) != 0))
551 goto fail_fscache;
552
533 /* caps */ 553 /* caps */
534 fsc->min_caps = fsopt->max_readdir; 554 fsc->min_caps = fsopt->max_readdir;
535 555
536 return fsc; 556 return fsc;
537 557
558fail_fscache:
559 ceph_fscache_unregister_fs(fsc);
538fail_trunc_wq: 560fail_trunc_wq:
539 destroy_workqueue(fsc->trunc_wq); 561 destroy_workqueue(fsc->trunc_wq);
540fail_pg_inv_wq: 562fail_pg_inv_wq:
@@ -554,6 +576,8 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
554{ 576{
555 dout("destroy_fs_client %p\n", fsc); 577 dout("destroy_fs_client %p\n", fsc);
556 578
579 ceph_fscache_unregister_fs(fsc);
580
557 destroy_workqueue(fsc->wb_wq); 581 destroy_workqueue(fsc->wb_wq);
558 destroy_workqueue(fsc->pg_inv_wq); 582 destroy_workqueue(fsc->pg_inv_wq);
559 destroy_workqueue(fsc->trunc_wq); 583 destroy_workqueue(fsc->trunc_wq);
@@ -588,6 +612,8 @@ static void ceph_inode_init_once(void *foo)
588 612
589static int __init init_caches(void) 613static int __init init_caches(void)
590{ 614{
615 int error = -ENOMEM;
616
591 ceph_inode_cachep = kmem_cache_create("ceph_inode_info", 617 ceph_inode_cachep = kmem_cache_create("ceph_inode_info",
592 sizeof(struct ceph_inode_info), 618 sizeof(struct ceph_inode_info),
593 __alignof__(struct ceph_inode_info), 619 __alignof__(struct ceph_inode_info),
@@ -611,15 +637,17 @@ static int __init init_caches(void)
611 if (ceph_file_cachep == NULL) 637 if (ceph_file_cachep == NULL)
612 goto bad_file; 638 goto bad_file;
613 639
614 return 0; 640 if ((error = ceph_fscache_register()))
641 goto bad_file;
615 642
643 return 0;
616bad_file: 644bad_file:
617 kmem_cache_destroy(ceph_dentry_cachep); 645 kmem_cache_destroy(ceph_dentry_cachep);
618bad_dentry: 646bad_dentry:
619 kmem_cache_destroy(ceph_cap_cachep); 647 kmem_cache_destroy(ceph_cap_cachep);
620bad_cap: 648bad_cap:
621 kmem_cache_destroy(ceph_inode_cachep); 649 kmem_cache_destroy(ceph_inode_cachep);
622 return -ENOMEM; 650 return error;
623} 651}
624 652
625static void destroy_caches(void) 653static void destroy_caches(void)
@@ -629,10 +657,13 @@ static void destroy_caches(void)
629 * destroy cache. 657 * destroy cache.
630 */ 658 */
631 rcu_barrier(); 659 rcu_barrier();
660
632 kmem_cache_destroy(ceph_inode_cachep); 661 kmem_cache_destroy(ceph_inode_cachep);
633 kmem_cache_destroy(ceph_cap_cachep); 662 kmem_cache_destroy(ceph_cap_cachep);
634 kmem_cache_destroy(ceph_dentry_cachep); 663 kmem_cache_destroy(ceph_dentry_cachep);
635 kmem_cache_destroy(ceph_file_cachep); 664 kmem_cache_destroy(ceph_file_cachep);
665
666 ceph_fscache_unregister();
636} 667}
637 668
638 669
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index cbded572345e..6014b0a3c405 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -16,6 +16,10 @@
16 16
17#include <linux/ceph/libceph.h> 17#include <linux/ceph/libceph.h>
18 18
19#ifdef CONFIG_CEPH_FSCACHE
20#include <linux/fscache.h>
21#endif
22
19/* f_type in struct statfs */ 23/* f_type in struct statfs */
20#define CEPH_SUPER_MAGIC 0x00c36400 24#define CEPH_SUPER_MAGIC 0x00c36400
21 25
@@ -29,6 +33,7 @@
29#define CEPH_MOUNT_OPT_NOASYNCREADDIR (1<<7) /* no dcache readdir */ 33#define CEPH_MOUNT_OPT_NOASYNCREADDIR (1<<7) /* no dcache readdir */
30#define CEPH_MOUNT_OPT_INO32 (1<<8) /* 32 bit inos */ 34#define CEPH_MOUNT_OPT_INO32 (1<<8) /* 32 bit inos */
31#define CEPH_MOUNT_OPT_DCACHE (1<<9) /* use dcache for readdir etc */ 35#define CEPH_MOUNT_OPT_DCACHE (1<<9) /* use dcache for readdir etc */
36#define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */
32 37
33#define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES) 38#define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES)
34 39
@@ -90,6 +95,11 @@ struct ceph_fs_client {
90 struct dentry *debugfs_bdi; 95 struct dentry *debugfs_bdi;
91 struct dentry *debugfs_mdsc, *debugfs_mdsmap; 96 struct dentry *debugfs_mdsc, *debugfs_mdsmap;
92#endif 97#endif
98
99#ifdef CONFIG_CEPH_FSCACHE
100 struct fscache_cookie *fscache;
101 struct workqueue_struct *revalidate_wq;
102#endif
93}; 103};
94 104
95 105
@@ -288,6 +298,7 @@ struct ceph_inode_info {
288 298
289 int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */ 299 int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */
290 300
301 struct mutex i_truncate_mutex;
291 u32 i_truncate_seq; /* last truncate to smaller size */ 302 u32 i_truncate_seq; /* last truncate to smaller size */
292 u64 i_truncate_size; /* and the size we last truncated down to */ 303 u64 i_truncate_size; /* and the size we last truncated down to */
293 int i_truncate_pending; /* still need to call vmtruncate */ 304 int i_truncate_pending; /* still need to call vmtruncate */
@@ -319,6 +330,12 @@ struct ceph_inode_info {
319 330
320 struct work_struct i_vmtruncate_work; 331 struct work_struct i_vmtruncate_work;
321 332
333#ifdef CONFIG_CEPH_FSCACHE
334 struct fscache_cookie *fscache;
335 u32 i_fscache_gen; /* sequence, for delayed fscache validate */
336 struct work_struct i_revalidate_work;
337#endif
338
322 struct inode vfs_inode; /* at end */ 339 struct inode vfs_inode; /* at end */
323}; 340};
324 341
diff --git a/fs/fscache/cookie.c b/fs/fscache/cookie.c
index 0e91a3c9fdb2..318e8433527c 100644
--- a/fs/fscache/cookie.c
+++ b/fs/fscache/cookie.c
@@ -558,3 +558,74 @@ void __fscache_cookie_put(struct fscache_cookie *cookie)
558 558
559 _leave(""); 559 _leave("");
560} 560}
561
562/*
563 * check the consistency between the netfs inode and the backing cache
564 *
565 * NOTE: it only serves no-index type
566 */
567int __fscache_check_consistency(struct fscache_cookie *cookie)
568{
569 struct fscache_operation *op;
570 struct fscache_object *object;
571 int ret;
572
573 _enter("%p,", cookie);
574
575 ASSERTCMP(cookie->def->type, ==, FSCACHE_COOKIE_TYPE_DATAFILE);
576
577 if (fscache_wait_for_deferred_lookup(cookie) < 0)
578 return -ERESTARTSYS;
579
580 if (hlist_empty(&cookie->backing_objects))
581 return 0;
582
583 op = kzalloc(sizeof(*op), GFP_NOIO | __GFP_NOMEMALLOC | __GFP_NORETRY);
584 if (!op)
585 return -ENOMEM;
586
587 fscache_operation_init(op, NULL, NULL);
588 op->flags = FSCACHE_OP_MYTHREAD |
589 (1 << FSCACHE_OP_WAITING);
590
591 spin_lock(&cookie->lock);
592
593 if (hlist_empty(&cookie->backing_objects))
594 goto inconsistent;
595 object = hlist_entry(cookie->backing_objects.first,
596 struct fscache_object, cookie_link);
597 if (test_bit(FSCACHE_IOERROR, &object->cache->flags))
598 goto inconsistent;
599
600 op->debug_id = atomic_inc_return(&fscache_op_debug_id);
601
602 atomic_inc(&cookie->n_active);
603 if (fscache_submit_op(object, op) < 0)
604 goto submit_failed;
605
606 /* the work queue now carries its own ref on the object */
607 spin_unlock(&cookie->lock);
608
609 ret = fscache_wait_for_operation_activation(object, op,
610 NULL, NULL, NULL);
611 if (ret == 0) {
612 /* ask the cache to honour the operation */
613 ret = object->cache->ops->check_consistency(op);
614 fscache_op_complete(op, false);
615 } else if (ret == -ENOBUFS) {
616 ret = 0;
617 }
618
619 fscache_put_operation(op);
620 _leave(" = %d", ret);
621 return ret;
622
623submit_failed:
624 atomic_dec(&cookie->n_active);
625inconsistent:
626 spin_unlock(&cookie->lock);
627 kfree(op);
628 _leave(" = -ESTALE");
629 return -ESTALE;
630}
631EXPORT_SYMBOL(__fscache_check_consistency);
diff --git a/fs/fscache/internal.h b/fs/fscache/internal.h
index 12d505bedb5c..4226f6680b06 100644
--- a/fs/fscache/internal.h
+++ b/fs/fscache/internal.h
@@ -130,6 +130,12 @@ extern void fscache_operation_gc(struct work_struct *);
130/* 130/*
131 * page.c 131 * page.c
132 */ 132 */
133extern int fscache_wait_for_deferred_lookup(struct fscache_cookie *);
134extern int fscache_wait_for_operation_activation(struct fscache_object *,
135 struct fscache_operation *,
136 atomic_t *,
137 atomic_t *,
138 void (*)(struct fscache_operation *));
133extern void fscache_invalidate_writes(struct fscache_cookie *); 139extern void fscache_invalidate_writes(struct fscache_cookie *);
134 140
135/* 141/*
diff --git a/fs/fscache/page.c b/fs/fscache/page.c
index d479ab3c63e4..8702b732109a 100644
--- a/fs/fscache/page.c
+++ b/fs/fscache/page.c
@@ -278,7 +278,7 @@ static struct fscache_retrieval *fscache_alloc_retrieval(
278/* 278/*
279 * wait for a deferred lookup to complete 279 * wait for a deferred lookup to complete
280 */ 280 */
281static int fscache_wait_for_deferred_lookup(struct fscache_cookie *cookie) 281int fscache_wait_for_deferred_lookup(struct fscache_cookie *cookie)
282{ 282{
283 unsigned long jif; 283 unsigned long jif;
284 284
@@ -322,42 +322,46 @@ static void fscache_do_cancel_retrieval(struct fscache_operation *_op)
322/* 322/*
323 * wait for an object to become active (or dead) 323 * wait for an object to become active (or dead)
324 */ 324 */
325static int fscache_wait_for_retrieval_activation(struct fscache_object *object, 325int fscache_wait_for_operation_activation(struct fscache_object *object,
326 struct fscache_retrieval *op, 326 struct fscache_operation *op,
327 atomic_t *stat_op_waits, 327 atomic_t *stat_op_waits,
328 atomic_t *stat_object_dead) 328 atomic_t *stat_object_dead,
329 void (*do_cancel)(struct fscache_operation *))
329{ 330{
330 int ret; 331 int ret;
331 332
332 if (!test_bit(FSCACHE_OP_WAITING, &op->op.flags)) 333 if (!test_bit(FSCACHE_OP_WAITING, &op->flags))
333 goto check_if_dead; 334 goto check_if_dead;
334 335
335 _debug(">>> WT"); 336 _debug(">>> WT");
336 fscache_stat(stat_op_waits); 337 if (stat_op_waits)
337 if (wait_on_bit(&op->op.flags, FSCACHE_OP_WAITING, 338 fscache_stat(stat_op_waits);
339 if (wait_on_bit(&op->flags, FSCACHE_OP_WAITING,
338 fscache_wait_bit_interruptible, 340 fscache_wait_bit_interruptible,
339 TASK_INTERRUPTIBLE) != 0) { 341 TASK_INTERRUPTIBLE) != 0) {
340 ret = fscache_cancel_op(&op->op, fscache_do_cancel_retrieval); 342 ret = fscache_cancel_op(op, do_cancel);
341 if (ret == 0) 343 if (ret == 0)
342 return -ERESTARTSYS; 344 return -ERESTARTSYS;
343 345
344 /* it's been removed from the pending queue by another party, 346 /* it's been removed from the pending queue by another party,
345 * so we should get to run shortly */ 347 * so we should get to run shortly */
346 wait_on_bit(&op->op.flags, FSCACHE_OP_WAITING, 348 wait_on_bit(&op->flags, FSCACHE_OP_WAITING,
347 fscache_wait_bit, TASK_UNINTERRUPTIBLE); 349 fscache_wait_bit, TASK_UNINTERRUPTIBLE);
348 } 350 }
349 _debug("<<< GO"); 351 _debug("<<< GO");
350 352
351check_if_dead: 353check_if_dead:
352 if (op->op.state == FSCACHE_OP_ST_CANCELLED) { 354 if (op->state == FSCACHE_OP_ST_CANCELLED) {
353 fscache_stat(stat_object_dead); 355 if (stat_object_dead)
356 fscache_stat(stat_object_dead);
354 _leave(" = -ENOBUFS [cancelled]"); 357 _leave(" = -ENOBUFS [cancelled]");
355 return -ENOBUFS; 358 return -ENOBUFS;
356 } 359 }
357 if (unlikely(fscache_object_is_dead(object))) { 360 if (unlikely(fscache_object_is_dead(object))) {
358 pr_err("%s() = -ENOBUFS [obj dead %d]\n", __func__, op->op.state); 361 pr_err("%s() = -ENOBUFS [obj dead %d]\n", __func__, op->state);
359 fscache_cancel_op(&op->op, fscache_do_cancel_retrieval); 362 fscache_cancel_op(op, do_cancel);
360 fscache_stat(stat_object_dead); 363 if (stat_object_dead)
364 fscache_stat(stat_object_dead);
361 return -ENOBUFS; 365 return -ENOBUFS;
362 } 366 }
363 return 0; 367 return 0;
@@ -432,10 +436,11 @@ int __fscache_read_or_alloc_page(struct fscache_cookie *cookie,
432 436
433 /* we wait for the operation to become active, and then process it 437 /* we wait for the operation to become active, and then process it
434 * *here*, in this thread, and not in the thread pool */ 438 * *here*, in this thread, and not in the thread pool */
435 ret = fscache_wait_for_retrieval_activation( 439 ret = fscache_wait_for_operation_activation(
436 object, op, 440 object, &op->op,
437 __fscache_stat(&fscache_n_retrieval_op_waits), 441 __fscache_stat(&fscache_n_retrieval_op_waits),
438 __fscache_stat(&fscache_n_retrievals_object_dead)); 442 __fscache_stat(&fscache_n_retrievals_object_dead),
443 fscache_do_cancel_retrieval);
439 if (ret < 0) 444 if (ret < 0)
440 goto error; 445 goto error;
441 446
@@ -557,10 +562,11 @@ int __fscache_read_or_alloc_pages(struct fscache_cookie *cookie,
557 562
558 /* we wait for the operation to become active, and then process it 563 /* we wait for the operation to become active, and then process it
559 * *here*, in this thread, and not in the thread pool */ 564 * *here*, in this thread, and not in the thread pool */
560 ret = fscache_wait_for_retrieval_activation( 565 ret = fscache_wait_for_operation_activation(
561 object, op, 566 object, &op->op,
562 __fscache_stat(&fscache_n_retrieval_op_waits), 567 __fscache_stat(&fscache_n_retrieval_op_waits),
563 __fscache_stat(&fscache_n_retrievals_object_dead)); 568 __fscache_stat(&fscache_n_retrievals_object_dead),
569 fscache_do_cancel_retrieval);
564 if (ret < 0) 570 if (ret < 0)
565 goto error; 571 goto error;
566 572
@@ -658,10 +664,11 @@ int __fscache_alloc_page(struct fscache_cookie *cookie,
658 664
659 fscache_stat(&fscache_n_alloc_ops); 665 fscache_stat(&fscache_n_alloc_ops);
660 666
661 ret = fscache_wait_for_retrieval_activation( 667 ret = fscache_wait_for_operation_activation(
662 object, op, 668 object, &op->op,
663 __fscache_stat(&fscache_n_alloc_op_waits), 669 __fscache_stat(&fscache_n_alloc_op_waits),
664 __fscache_stat(&fscache_n_allocs_object_dead)); 670 __fscache_stat(&fscache_n_allocs_object_dead),
671 fscache_do_cancel_retrieval);
665 if (ret < 0) 672 if (ret < 0)
666 goto error; 673 goto error;
667 674
@@ -694,6 +701,22 @@ nobufs:
694EXPORT_SYMBOL(__fscache_alloc_page); 701EXPORT_SYMBOL(__fscache_alloc_page);
695 702
696/* 703/*
704 * Unmark pages allocate in the readahead code path (via:
705 * fscache_readpages_or_alloc) after delegating to the base filesystem
706 */
707void __fscache_readpages_cancel(struct fscache_cookie *cookie,
708 struct list_head *pages)
709{
710 struct page *page;
711
712 list_for_each_entry(page, pages, lru) {
713 if (PageFsCache(page))
714 __fscache_uncache_page(cookie, page);
715 }
716}
717EXPORT_SYMBOL(__fscache_readpages_cancel);
718
719/*
697 * release a write op reference 720 * release a write op reference
698 */ 721 */
699static void fscache_release_write_op(struct fscache_operation *_op) 722static void fscache_release_write_op(struct fscache_operation *_op)
diff --git a/include/linux/fscache-cache.h b/include/linux/fscache-cache.h
index a9ff9a36b86d..7823e9ef995e 100644
--- a/include/linux/fscache-cache.h
+++ b/include/linux/fscache-cache.h
@@ -251,6 +251,10 @@ struct fscache_cache_ops {
251 /* unpin an object in the cache */ 251 /* unpin an object in the cache */
252 void (*unpin_object)(struct fscache_object *object); 252 void (*unpin_object)(struct fscache_object *object);
253 253
254 /* check the consistency between the backing cache and the FS-Cache
255 * cookie */
256 bool (*check_consistency)(struct fscache_operation *op);
257
254 /* store the updated auxiliary data on an object */ 258 /* store the updated auxiliary data on an object */
255 void (*update_object)(struct fscache_object *object); 259 void (*update_object)(struct fscache_object *object);
256 260
diff --git a/include/linux/fscache.h b/include/linux/fscache.h
index 7a086235da4b..19b46458e4e8 100644
--- a/include/linux/fscache.h
+++ b/include/linux/fscache.h
@@ -183,6 +183,7 @@ extern struct fscache_cookie *__fscache_acquire_cookie(
183 const struct fscache_cookie_def *, 183 const struct fscache_cookie_def *,
184 void *); 184 void *);
185extern void __fscache_relinquish_cookie(struct fscache_cookie *, int); 185extern void __fscache_relinquish_cookie(struct fscache_cookie *, int);
186extern int __fscache_check_consistency(struct fscache_cookie *);
186extern void __fscache_update_cookie(struct fscache_cookie *); 187extern void __fscache_update_cookie(struct fscache_cookie *);
187extern int __fscache_attr_changed(struct fscache_cookie *); 188extern int __fscache_attr_changed(struct fscache_cookie *);
188extern void __fscache_invalidate(struct fscache_cookie *); 189extern void __fscache_invalidate(struct fscache_cookie *);
@@ -208,6 +209,8 @@ extern bool __fscache_maybe_release_page(struct fscache_cookie *, struct page *,
208 gfp_t); 209 gfp_t);
209extern void __fscache_uncache_all_inode_pages(struct fscache_cookie *, 210extern void __fscache_uncache_all_inode_pages(struct fscache_cookie *,
210 struct inode *); 211 struct inode *);
212extern void __fscache_readpages_cancel(struct fscache_cookie *cookie,
213 struct list_head *pages);
211 214
212/** 215/**
213 * fscache_register_netfs - Register a filesystem as desiring caching services 216 * fscache_register_netfs - Register a filesystem as desiring caching services
@@ -326,6 +329,25 @@ void fscache_relinquish_cookie(struct fscache_cookie *cookie, int retire)
326} 329}
327 330
328/** 331/**
332 * fscache_check_consistency - Request that if the cache is updated
333 * @cookie: The cookie representing the cache object
334 *
335 * Request an consistency check from fscache, which passes the request
336 * to the backing cache.
337 *
338 * Returns 0 if consistent and -ESTALE if inconsistent. May also
339 * return -ENOMEM and -ERESTARTSYS.
340 */
341static inline
342int fscache_check_consistency(struct fscache_cookie *cookie)
343{
344 if (fscache_cookie_valid(cookie))
345 return __fscache_check_consistency(cookie);
346 else
347 return 0;
348}
349
350/**
329 * fscache_update_cookie - Request that a cache object be updated 351 * fscache_update_cookie - Request that a cache object be updated
330 * @cookie: The cookie representing the cache object 352 * @cookie: The cookie representing the cache object
331 * 353 *
@@ -570,6 +592,26 @@ int fscache_alloc_page(struct fscache_cookie *cookie,
570} 592}
571 593
572/** 594/**
595 * fscache_readpages_cancel - Cancel read/alloc on pages
596 * @cookie: The cookie representing the inode's cache object.
597 * @pages: The netfs pages that we canceled write on in readpages()
598 *
599 * Uncache/unreserve the pages reserved earlier in readpages() via
600 * fscache_readpages_or_alloc() and similar. In most successful caches in
601 * readpages() this doesn't do anything. In cases when the underlying netfs's
602 * readahead failed we need to clean up the pagelist (unmark and uncache).
603 *
604 * This function may sleep as it may have to clean up disk state.
605 */
606static inline
607void fscache_readpages_cancel(struct fscache_cookie *cookie,
608 struct list_head *pages)
609{
610 if (fscache_cookie_valid(cookie))
611 __fscache_readpages_cancel(cookie, pages);
612}
613
614/**
573 * fscache_write_page - Request storage of a page in the cache 615 * fscache_write_page - Request storage of a page in the cache
574 * @cookie: The cookie representing the cache object 616 * @cookie: The cookie representing the cache object
575 * @page: The netfs page to store 617 * @page: The netfs page to store
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 3be308e14302..4a5df7b1cc9f 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -290,7 +290,7 @@ int ceph_msgr_init(void)
290 if (ceph_msgr_slab_init()) 290 if (ceph_msgr_slab_init())
291 return -ENOMEM; 291 return -ENOMEM;
292 292
293 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0); 293 ceph_msgr_wq = alloc_workqueue("ceph-msgr", 0, 0);
294 if (ceph_msgr_wq) 294 if (ceph_msgr_wq)
295 return 0; 295 return 0;
296 296
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index dd47889adc4a..1606f740d6ae 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -503,7 +503,9 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
503 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); 503 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
504 size_t payload_len = 0; 504 size_t payload_len = 0;
505 505
506 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); 506 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
507 opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO &&
508 opcode != CEPH_OSD_OP_TRUNCATE);
507 509
508 op->extent.offset = offset; 510 op->extent.offset = offset;
509 op->extent.length = length; 511 op->extent.length = length;
@@ -631,6 +633,9 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
631 break; 633 break;
632 case CEPH_OSD_OP_READ: 634 case CEPH_OSD_OP_READ:
633 case CEPH_OSD_OP_WRITE: 635 case CEPH_OSD_OP_WRITE:
636 case CEPH_OSD_OP_ZERO:
637 case CEPH_OSD_OP_DELETE:
638 case CEPH_OSD_OP_TRUNCATE:
634 if (src->op == CEPH_OSD_OP_WRITE) 639 if (src->op == CEPH_OSD_OP_WRITE)
635 request_data_len = src->extent.length; 640 request_data_len = src->extent.length;
636 dst->extent.offset = cpu_to_le64(src->extent.offset); 641 dst->extent.offset = cpu_to_le64(src->extent.offset);
@@ -715,7 +720,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
715 u64 object_base; 720 u64 object_base;
716 int r; 721 int r;
717 722
718 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); 723 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
724 opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO &&
725 opcode != CEPH_OSD_OP_TRUNCATE);
719 726
720 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, 727 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
721 GFP_NOFS); 728 GFP_NOFS);
@@ -1488,14 +1495,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1488 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, 1495 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
1489 req, result); 1496 req, result);
1490 1497
1491 ceph_decode_need(&p, end, 4, bad); 1498 ceph_decode_need(&p, end, 4, bad_put);
1492 numops = ceph_decode_32(&p); 1499 numops = ceph_decode_32(&p);
1493 if (numops > CEPH_OSD_MAX_OP) 1500 if (numops > CEPH_OSD_MAX_OP)
1494 goto bad_put; 1501 goto bad_put;
1495 if (numops != req->r_num_ops) 1502 if (numops != req->r_num_ops)
1496 goto bad_put; 1503 goto bad_put;
1497 payload_len = 0; 1504 payload_len = 0;
1498 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad); 1505 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put);
1499 for (i = 0; i < numops; i++) { 1506 for (i = 0; i < numops; i++) {
1500 struct ceph_osd_op *op = p; 1507 struct ceph_osd_op *op = p;
1501 int len; 1508 int len;
@@ -1513,7 +1520,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1513 goto bad_put; 1520 goto bad_put;
1514 } 1521 }
1515 1522
1516 ceph_decode_need(&p, end, 4 + numops * 4, bad); 1523 ceph_decode_need(&p, end, 4 + numops * 4, bad_put);
1517 retry_attempt = ceph_decode_32(&p); 1524 retry_attempt = ceph_decode_32(&p);
1518 for (i = 0; i < numops; i++) 1525 for (i = 0; i < numops; i++)
1519 req->r_reply_op_result[i] = ceph_decode_32(&p); 1526 req->r_reply_op_result[i] = ceph_decode_32(&p);
@@ -1786,6 +1793,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1786 nr_maps--; 1793 nr_maps--;
1787 } 1794 }
1788 1795
1796 if (!osdc->osdmap)
1797 goto bad;
1789done: 1798done:
1790 downgrade_write(&osdc->map_sem); 1799 downgrade_write(&osdc->map_sem);
1791 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 1800 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
@@ -2129,6 +2138,8 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
2129 dout("osdc_start_request failed map, " 2138 dout("osdc_start_request failed map, "
2130 " will retry %lld\n", req->r_tid); 2139 " will retry %lld\n", req->r_tid);
2131 rc = 0; 2140 rc = 0;
2141 } else {
2142 __unregister_request(osdc, req);
2132 } 2143 }
2133 goto out_unlock; 2144 goto out_unlock;
2134 } 2145 }
@@ -2253,12 +2264,10 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
2253 if (err < 0) 2264 if (err < 0)
2254 goto out_msgpool; 2265 goto out_msgpool;
2255 2266
2267 err = -ENOMEM;
2256 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 2268 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
2257 if (IS_ERR(osdc->notify_wq)) { 2269 if (!osdc->notify_wq)
2258 err = PTR_ERR(osdc->notify_wq);
2259 osdc->notify_wq = NULL;
2260 goto out_msgpool; 2270 goto out_msgpool;
2261 }
2262 return 0; 2271 return 0;
2263 2272
2264out_msgpool: 2273out_msgpool:
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 603ddd92db19..dbd9a4792427 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -1129,7 +1129,7 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1129 1129
1130 /* pg_temp? */ 1130 /* pg_temp? */
1131 pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num, 1131 pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num,
1132 pool->pgp_num_mask); 1132 pool->pg_num_mask);
1133 pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); 1133 pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
1134 if (pg) { 1134 if (pg) {
1135 *num = pg->len; 1135 *num = pg->len;