aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-01-28 14:02:23 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2014-01-28 14:02:23 -0500
commitd891ea23d5203e5c47439b2a174f86a00b356a6c (patch)
tree3876cefcced9df5519f437cd8eb275cb979b93f6 /fs
parent08d21b5f93eb92a781daea71b6fcb3a340909141 (diff)
parent125d725c923527a85876c031028c7f55c28b74b3 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull ceph updates from Sage Weil: "This is a big batch. From Ilya we have: - rbd support for more than ~250 mapped devices (now uses same scheme that SCSI does for device major/minor numbering) - crush updates for new mapping behaviors (will be needed for coming erasure coding support, among other things) - preliminary support for tiered storage pools There is also a big series fixing a pile cephfs bugs with clustered MDSs from Yan Zheng, ACL support for cephfs from Guangliang Zhao, ceph fscache improvements from Li Wang, improved behavior when we get ENOSPC from Josh Durgin, some readv/writev improvements from Majianpeng, and the usual mix of small cleanups" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (76 commits) ceph: cast PAGE_SIZE to size_t in ceph_sync_write() ceph: fix dout() compile warnings in ceph_filemap_fault() libceph: support CEPH_FEATURE_OSD_CACHEPOOL feature libceph: follow redirect replies from osds libceph: rename ceph_osd_request::r_{oloc,oid} to r_base_{oloc,oid} libceph: follow {read,write}_tier fields on osd request submission libceph: add ceph_pg_pool_by_id() libceph: CEPH_OSD_FLAG_* enum update libceph: replace ceph_calc_ceph_pg() with ceph_oloc_oid_to_pg() libceph: introduce and start using oid abstraction libceph: rename MAX_OBJ_NAME_SIZE to CEPH_MAX_OID_NAME_LEN libceph: move ceph_file_layout helpers to ceph_fs.h libceph: start using oloc abstraction libceph: dout() is missing a newline libceph: add ceph_kv{malloc,free}() and switch to them libceph: support CEPH_FEATURE_EXPORT_PEER ceph: add imported caps when handling cap export message ceph: add open export target session helper ceph: remove exported caps when handling cap import message ceph: handle session flush message ...
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/Kconfig13
-rw-r--r--fs/ceph/Makefile1
-rw-r--r--fs/ceph/acl.c332
-rw-r--r--fs/ceph/addr.c93
-rw-r--r--fs/ceph/cache.h13
-rw-r--r--fs/ceph/caps.c338
-rw-r--r--fs/ceph/dir.c16
-rw-r--r--fs/ceph/file.c437
-rw-r--r--fs/ceph/inode.c33
-rw-r--r--fs/ceph/ioctl.c8
-rw-r--r--fs/ceph/mds_client.c132
-rw-r--r--fs/ceph/mds_client.h2
-rw-r--r--fs/ceph/strings.c2
-rw-r--r--fs/ceph/super.c9
-rw-r--r--fs/ceph/super.h45
-rw-r--r--fs/ceph/xattr.c60
16 files changed, 1202 insertions, 332 deletions
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
index ac9a2ef5bb9b..264e9bf83ff3 100644
--- a/fs/ceph/Kconfig
+++ b/fs/ceph/Kconfig
@@ -25,3 +25,16 @@ config CEPH_FSCACHE
25 caching support for Ceph clients using FS-Cache 25 caching support for Ceph clients using FS-Cache
26 26
27endif 27endif
28
29config CEPH_FS_POSIX_ACL
30 bool "Ceph POSIX Access Control Lists"
31 depends on CEPH_FS
32 select FS_POSIX_ACL
33 help
34 POSIX Access Control Lists (ACLs) support permissions for users and
35 groups beyond the owner/group/world scheme.
36
37 To learn more about Access Control Lists, visit the POSIX ACLs for
38 Linux website <http://acl.bestbits.at/>.
39
40 If you don't know what Access Control Lists are, say N
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index 32e30106a2f0..85a4230b9bff 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -10,3 +10,4 @@ ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
10 debugfs.o 10 debugfs.o
11 11
12ceph-$(CONFIG_CEPH_FSCACHE) += cache.o 12ceph-$(CONFIG_CEPH_FSCACHE) += cache.o
13ceph-$(CONFIG_CEPH_FS_POSIX_ACL) += acl.o
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
new file mode 100644
index 000000000000..64fddbc1d17b
--- /dev/null
+++ b/fs/ceph/acl.c
@@ -0,0 +1,332 @@
1/*
2 * linux/fs/ceph/acl.c
3 *
4 * Copyright (C) 2013 Guangliang Zhao, <lucienchao@gmail.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public
8 * License v2 as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public
16 * License along with this program; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 021110-1307, USA.
19 */
20
21#include <linux/ceph/ceph_debug.h>
22#include <linux/fs.h>
23#include <linux/string.h>
24#include <linux/xattr.h>
25#include <linux/posix_acl_xattr.h>
26#include <linux/posix_acl.h>
27#include <linux/sched.h>
28#include <linux/slab.h>
29
30#include "super.h"
31
32static inline void ceph_set_cached_acl(struct inode *inode,
33 int type, struct posix_acl *acl)
34{
35 struct ceph_inode_info *ci = ceph_inode(inode);
36
37 spin_lock(&ci->i_ceph_lock);
38 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0))
39 set_cached_acl(inode, type, acl);
40 spin_unlock(&ci->i_ceph_lock);
41}
42
43static inline struct posix_acl *ceph_get_cached_acl(struct inode *inode,
44 int type)
45{
46 struct ceph_inode_info *ci = ceph_inode(inode);
47 struct posix_acl *acl = ACL_NOT_CACHED;
48
49 spin_lock(&ci->i_ceph_lock);
50 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0))
51 acl = get_cached_acl(inode, type);
52 spin_unlock(&ci->i_ceph_lock);
53
54 return acl;
55}
56
57void ceph_forget_all_cached_acls(struct inode *inode)
58{
59 forget_all_cached_acls(inode);
60}
61
62struct posix_acl *ceph_get_acl(struct inode *inode, int type)
63{
64 int size;
65 const char *name;
66 char *value = NULL;
67 struct posix_acl *acl;
68
69 if (!IS_POSIXACL(inode))
70 return NULL;
71
72 acl = ceph_get_cached_acl(inode, type);
73 if (acl != ACL_NOT_CACHED)
74 return acl;
75
76 switch (type) {
77 case ACL_TYPE_ACCESS:
78 name = POSIX_ACL_XATTR_ACCESS;
79 break;
80 case ACL_TYPE_DEFAULT:
81 name = POSIX_ACL_XATTR_DEFAULT;
82 break;
83 default:
84 BUG();
85 }
86
87 size = __ceph_getxattr(inode, name, "", 0);
88 if (size > 0) {
89 value = kzalloc(size, GFP_NOFS);
90 if (!value)
91 return ERR_PTR(-ENOMEM);
92 size = __ceph_getxattr(inode, name, value, size);
93 }
94
95 if (size > 0)
96 acl = posix_acl_from_xattr(&init_user_ns, value, size);
97 else if (size == -ERANGE || size == -ENODATA || size == 0)
98 acl = NULL;
99 else
100 acl = ERR_PTR(-EIO);
101
102 kfree(value);
103
104 if (!IS_ERR(acl))
105 ceph_set_cached_acl(inode, type, acl);
106
107 return acl;
108}
109
110static int ceph_set_acl(struct dentry *dentry, struct inode *inode,
111 struct posix_acl *acl, int type)
112{
113 int ret = 0, size = 0;
114 const char *name = NULL;
115 char *value = NULL;
116 struct iattr newattrs;
117 umode_t new_mode = inode->i_mode, old_mode = inode->i_mode;
118
119 if (acl) {
120 ret = posix_acl_valid(acl);
121 if (ret < 0)
122 goto out;
123 }
124
125 switch (type) {
126 case ACL_TYPE_ACCESS:
127 name = POSIX_ACL_XATTR_ACCESS;
128 if (acl) {
129 ret = posix_acl_equiv_mode(acl, &new_mode);
130 if (ret < 0)
131 goto out;
132 if (ret == 0)
133 acl = NULL;
134 }
135 break;
136 case ACL_TYPE_DEFAULT:
137 if (!S_ISDIR(inode->i_mode)) {
138 ret = acl ? -EINVAL : 0;
139 goto out;
140 }
141 name = POSIX_ACL_XATTR_DEFAULT;
142 break;
143 default:
144 ret = -EINVAL;
145 goto out;
146 }
147
148 if (acl) {
149 size = posix_acl_xattr_size(acl->a_count);
150 value = kmalloc(size, GFP_NOFS);
151 if (!value) {
152 ret = -ENOMEM;
153 goto out;
154 }
155
156 ret = posix_acl_to_xattr(&init_user_ns, acl, value, size);
157 if (ret < 0)
158 goto out_free;
159 }
160
161 if (new_mode != old_mode) {
162 newattrs.ia_mode = new_mode;
163 newattrs.ia_valid = ATTR_MODE;
164 ret = ceph_setattr(dentry, &newattrs);
165 if (ret)
166 goto out_free;
167 }
168
169 if (value)
170 ret = __ceph_setxattr(dentry, name, value, size, 0);
171 else
172 ret = __ceph_removexattr(dentry, name);
173
174 if (ret) {
175 if (new_mode != old_mode) {
176 newattrs.ia_mode = old_mode;
177 newattrs.ia_valid = ATTR_MODE;
178 ceph_setattr(dentry, &newattrs);
179 }
180 goto out_free;
181 }
182
183 ceph_set_cached_acl(inode, type, acl);
184
185out_free:
186 kfree(value);
187out:
188 return ret;
189}
190
191int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir)
192{
193 struct posix_acl *acl = NULL;
194 int ret = 0;
195
196 if (!S_ISLNK(inode->i_mode)) {
197 if (IS_POSIXACL(dir)) {
198 acl = ceph_get_acl(dir, ACL_TYPE_DEFAULT);
199 if (IS_ERR(acl)) {
200 ret = PTR_ERR(acl);
201 goto out;
202 }
203 }
204
205 if (!acl)
206 inode->i_mode &= ~current_umask();
207 }
208
209 if (IS_POSIXACL(dir) && acl) {
210 if (S_ISDIR(inode->i_mode)) {
211 ret = ceph_set_acl(dentry, inode, acl,
212 ACL_TYPE_DEFAULT);
213 if (ret)
214 goto out_release;
215 }
216 ret = posix_acl_create(&acl, GFP_NOFS, &inode->i_mode);
217 if (ret < 0)
218 goto out;
219 else if (ret > 0)
220 ret = ceph_set_acl(dentry, inode, acl, ACL_TYPE_ACCESS);
221 else
222 cache_no_acl(inode);
223 } else {
224 cache_no_acl(inode);
225 }
226
227out_release:
228 posix_acl_release(acl);
229out:
230 return ret;
231}
232
233int ceph_acl_chmod(struct dentry *dentry, struct inode *inode)
234{
235 struct posix_acl *acl;
236 int ret = 0;
237
238 if (S_ISLNK(inode->i_mode)) {
239 ret = -EOPNOTSUPP;
240 goto out;
241 }
242
243 if (!IS_POSIXACL(inode))
244 goto out;
245
246 acl = ceph_get_acl(inode, ACL_TYPE_ACCESS);
247 if (IS_ERR_OR_NULL(acl)) {
248 ret = PTR_ERR(acl);
249 goto out;
250 }
251
252 ret = posix_acl_chmod(&acl, GFP_KERNEL, inode->i_mode);
253 if (ret)
254 goto out;
255 ret = ceph_set_acl(dentry, inode, acl, ACL_TYPE_ACCESS);
256 posix_acl_release(acl);
257out:
258 return ret;
259}
260
261static int ceph_xattr_acl_get(struct dentry *dentry, const char *name,
262 void *value, size_t size, int type)
263{
264 struct posix_acl *acl;
265 int ret = 0;
266
267 if (!IS_POSIXACL(dentry->d_inode))
268 return -EOPNOTSUPP;
269
270 acl = ceph_get_acl(dentry->d_inode, type);
271 if (IS_ERR(acl))
272 return PTR_ERR(acl);
273 if (acl == NULL)
274 return -ENODATA;
275
276 ret = posix_acl_to_xattr(&init_user_ns, acl, value, size);
277 posix_acl_release(acl);
278
279 return ret;
280}
281
282static int ceph_xattr_acl_set(struct dentry *dentry, const char *name,
283 const void *value, size_t size, int flags, int type)
284{
285 int ret = 0;
286 struct posix_acl *acl = NULL;
287
288 if (!inode_owner_or_capable(dentry->d_inode)) {
289 ret = -EPERM;
290 goto out;
291 }
292
293 if (!IS_POSIXACL(dentry->d_inode)) {
294 ret = -EOPNOTSUPP;
295 goto out;
296 }
297
298 if (value) {
299 acl = posix_acl_from_xattr(&init_user_ns, value, size);
300 if (IS_ERR(acl)) {
301 ret = PTR_ERR(acl);
302 goto out;
303 }
304
305 if (acl) {
306 ret = posix_acl_valid(acl);
307 if (ret)
308 goto out_release;
309 }
310 }
311
312 ret = ceph_set_acl(dentry, dentry->d_inode, acl, type);
313
314out_release:
315 posix_acl_release(acl);
316out:
317 return ret;
318}
319
320const struct xattr_handler ceph_xattr_acl_default_handler = {
321 .prefix = POSIX_ACL_XATTR_DEFAULT,
322 .flags = ACL_TYPE_DEFAULT,
323 .get = ceph_xattr_acl_get,
324 .set = ceph_xattr_acl_set,
325};
326
327const struct xattr_handler ceph_xattr_acl_access_handler = {
328 .prefix = POSIX_ACL_XATTR_ACCESS,
329 .flags = ACL_TYPE_ACCESS,
330 .get = ceph_xattr_acl_get,
331 .set = ceph_xattr_acl_set,
332};
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index ec3ba43b9faa..b53278c9fd97 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -209,6 +209,7 @@ static int readpage_nounlock(struct file *filp, struct page *page)
209 err = 0; 209 err = 0;
210 if (err < 0) { 210 if (err < 0) {
211 SetPageError(page); 211 SetPageError(page);
212 ceph_fscache_readpage_cancel(inode, page);
212 goto out; 213 goto out;
213 } else { 214 } else {
214 if (err < PAGE_CACHE_SIZE) { 215 if (err < PAGE_CACHE_SIZE) {
@@ -256,6 +257,8 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
256 for (i = 0; i < num_pages; i++) { 257 for (i = 0; i < num_pages; i++) {
257 struct page *page = osd_data->pages[i]; 258 struct page *page = osd_data->pages[i];
258 259
260 if (rc < 0)
261 goto unlock;
259 if (bytes < (int)PAGE_CACHE_SIZE) { 262 if (bytes < (int)PAGE_CACHE_SIZE) {
260 /* zero (remainder of) page */ 263 /* zero (remainder of) page */
261 int s = bytes < 0 ? 0 : bytes; 264 int s = bytes < 0 ? 0 : bytes;
@@ -266,6 +269,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
266 flush_dcache_page(page); 269 flush_dcache_page(page);
267 SetPageUptodate(page); 270 SetPageUptodate(page);
268 ceph_readpage_to_fscache(inode, page); 271 ceph_readpage_to_fscache(inode, page);
272unlock:
269 unlock_page(page); 273 unlock_page(page);
270 page_cache_release(page); 274 page_cache_release(page);
271 bytes -= PAGE_CACHE_SIZE; 275 bytes -= PAGE_CACHE_SIZE;
@@ -1207,6 +1211,41 @@ const struct address_space_operations ceph_aops = {
1207/* 1211/*
1208 * vm ops 1212 * vm ops
1209 */ 1213 */
1214static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1215{
1216 struct inode *inode = file_inode(vma->vm_file);
1217 struct ceph_inode_info *ci = ceph_inode(inode);
1218 struct ceph_file_info *fi = vma->vm_file->private_data;
1219 loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT;
1220 int want, got, ret;
1221
1222 dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n",
1223 inode, ceph_vinop(inode), off, (size_t)PAGE_CACHE_SIZE);
1224 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1225 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
1226 else
1227 want = CEPH_CAP_FILE_CACHE;
1228 while (1) {
1229 got = 0;
1230 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
1231 if (ret == 0)
1232 break;
1233 if (ret != -ERESTARTSYS) {
1234 WARN_ON(1);
1235 return VM_FAULT_SIGBUS;
1236 }
1237 }
1238 dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
1239 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got));
1240
1241 ret = filemap_fault(vma, vmf);
1242
1243 dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
1244 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret);
1245 ceph_put_cap_refs(ci, got);
1246
1247 return ret;
1248}
1210 1249
1211/* 1250/*
1212 * Reuse write_begin here for simplicity. 1251 * Reuse write_begin here for simplicity.
@@ -1214,23 +1253,41 @@ const struct address_space_operations ceph_aops = {
1214static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) 1253static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1215{ 1254{
1216 struct inode *inode = file_inode(vma->vm_file); 1255 struct inode *inode = file_inode(vma->vm_file);
1217 struct page *page = vmf->page; 1256 struct ceph_inode_info *ci = ceph_inode(inode);
1257 struct ceph_file_info *fi = vma->vm_file->private_data;
1218 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 1258 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
1259 struct page *page = vmf->page;
1219 loff_t off = page_offset(page); 1260 loff_t off = page_offset(page);
1220 loff_t size, len; 1261 loff_t size = i_size_read(inode);
1221 int ret; 1262 size_t len;
1222 1263 int want, got, ret;
1223 /* Update time before taking page lock */
1224 file_update_time(vma->vm_file);
1225 1264
1226 size = i_size_read(inode);
1227 if (off + PAGE_CACHE_SIZE <= size) 1265 if (off + PAGE_CACHE_SIZE <= size)
1228 len = PAGE_CACHE_SIZE; 1266 len = PAGE_CACHE_SIZE;
1229 else 1267 else
1230 len = size & ~PAGE_CACHE_MASK; 1268 len = size & ~PAGE_CACHE_MASK;
1231 1269
1232 dout("page_mkwrite %p %llu~%llu page %p idx %lu\n", inode, 1270 dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n",
1233 off, len, page, page->index); 1271 inode, ceph_vinop(inode), off, len, size);
1272 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1273 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1274 else
1275 want = CEPH_CAP_FILE_BUFFER;
1276 while (1) {
1277 got = 0;
1278 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len);
1279 if (ret == 0)
1280 break;
1281 if (ret != -ERESTARTSYS) {
1282 WARN_ON(1);
1283 return VM_FAULT_SIGBUS;
1284 }
1285 }
1286 dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
1287 inode, off, len, ceph_cap_string(got));
1288
1289 /* Update time before taking page lock */
1290 file_update_time(vma->vm_file);
1234 1291
1235 lock_page(page); 1292 lock_page(page);
1236 1293
@@ -1252,14 +1309,26 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1252 ret = VM_FAULT_SIGBUS; 1309 ret = VM_FAULT_SIGBUS;
1253 } 1310 }
1254out: 1311out:
1255 dout("page_mkwrite %p %llu~%llu = %d\n", inode, off, len, ret); 1312 if (ret != VM_FAULT_LOCKED) {
1256 if (ret != VM_FAULT_LOCKED)
1257 unlock_page(page); 1313 unlock_page(page);
1314 } else {
1315 int dirty;
1316 spin_lock(&ci->i_ceph_lock);
1317 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
1318 spin_unlock(&ci->i_ceph_lock);
1319 if (dirty)
1320 __mark_inode_dirty(inode, dirty);
1321 }
1322
1323 dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n",
1324 inode, off, len, ceph_cap_string(got), ret);
1325 ceph_put_cap_refs(ci, got);
1326
1258 return ret; 1327 return ret;
1259} 1328}
1260 1329
1261static struct vm_operations_struct ceph_vmops = { 1330static struct vm_operations_struct ceph_vmops = {
1262 .fault = filemap_fault, 1331 .fault = ceph_filemap_fault,
1263 .page_mkwrite = ceph_page_mkwrite, 1332 .page_mkwrite = ceph_page_mkwrite,
1264 .remap_pages = generic_file_remap_pages, 1333 .remap_pages = generic_file_remap_pages,
1265}; 1334};
diff --git a/fs/ceph/cache.h b/fs/ceph/cache.h
index ba949408a336..da95f61b7a09 100644
--- a/fs/ceph/cache.h
+++ b/fs/ceph/cache.h
@@ -67,6 +67,14 @@ static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp)
67 return fscache_maybe_release_page(ci->fscache, page, gfp); 67 return fscache_maybe_release_page(ci->fscache, page, gfp);
68} 68}
69 69
70static inline void ceph_fscache_readpage_cancel(struct inode *inode,
71 struct page *page)
72{
73 struct ceph_inode_info *ci = ceph_inode(inode);
74 if (fscache_cookie_valid(ci->fscache) && PageFsCache(page))
75 __fscache_uncache_page(ci->fscache, page);
76}
77
70static inline void ceph_fscache_readpages_cancel(struct inode *inode, 78static inline void ceph_fscache_readpages_cancel(struct inode *inode,
71 struct list_head *pages) 79 struct list_head *pages)
72{ 80{
@@ -145,6 +153,11 @@ static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp)
145 return 1; 153 return 1;
146} 154}
147 155
156static inline void ceph_fscache_readpage_cancel(struct inode *inode,
157 struct page *page)
158{
159}
160
148static inline void ceph_fscache_readpages_cancel(struct inode *inode, 161static inline void ceph_fscache_readpages_cancel(struct inode *inode,
149 struct list_head *pages) 162 struct list_head *pages)
150{ 163{
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 3c0a4bd74996..17543383545c 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -555,21 +555,34 @@ retry:
555 cap->ci = ci; 555 cap->ci = ci;
556 __insert_cap_node(ci, cap); 556 __insert_cap_node(ci, cap);
557 557
558 /* clear out old exporting info? (i.e. on cap import) */
559 if (ci->i_cap_exporting_mds == mds) {
560 ci->i_cap_exporting_issued = 0;
561 ci->i_cap_exporting_mseq = 0;
562 ci->i_cap_exporting_mds = -1;
563 }
564
565 /* add to session cap list */ 558 /* add to session cap list */
566 cap->session = session; 559 cap->session = session;
567 spin_lock(&session->s_cap_lock); 560 spin_lock(&session->s_cap_lock);
568 list_add_tail(&cap->session_caps, &session->s_caps); 561 list_add_tail(&cap->session_caps, &session->s_caps);
569 session->s_nr_caps++; 562 session->s_nr_caps++;
570 spin_unlock(&session->s_cap_lock); 563 spin_unlock(&session->s_cap_lock);
571 } else if (new_cap) 564 } else {
572 ceph_put_cap(mdsc, new_cap); 565 if (new_cap)
566 ceph_put_cap(mdsc, new_cap);
567
568 /*
569 * auth mds of the inode changed. we received the cap export
570 * message, but still haven't received the cap import message.
571 * handle_cap_export() updated the new auth MDS' cap.
572 *
573 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing
574 * a message that was send before the cap import message. So
575 * don't remove caps.
576 */
577 if (ceph_seq_cmp(seq, cap->seq) <= 0) {
578 WARN_ON(cap != ci->i_auth_cap);
579 WARN_ON(cap->cap_id != cap_id);
580 seq = cap->seq;
581 mseq = cap->mseq;
582 issued |= cap->issued;
583 flags |= CEPH_CAP_FLAG_AUTH;
584 }
585 }
573 586
574 if (!ci->i_snap_realm) { 587 if (!ci->i_snap_realm) {
575 /* 588 /*
@@ -611,15 +624,9 @@ retry:
611 if (ci->i_auth_cap == NULL || 624 if (ci->i_auth_cap == NULL ||
612 ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) 625 ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0)
613 ci->i_auth_cap = cap; 626 ci->i_auth_cap = cap;
614 } else if (ci->i_auth_cap == cap) { 627 ci->i_cap_exporting_issued = 0;
615 ci->i_auth_cap = NULL; 628 } else {
616 spin_lock(&mdsc->cap_dirty_lock); 629 WARN_ON(ci->i_auth_cap == cap);
617 if (!list_empty(&ci->i_dirty_item)) {
618 dout(" moving %p to cap_dirty_migrating\n", inode);
619 list_move(&ci->i_dirty_item,
620 &mdsc->cap_dirty_migrating);
621 }
622 spin_unlock(&mdsc->cap_dirty_lock);
623 } 630 }
624 631
625 dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n", 632 dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n",
@@ -628,7 +635,7 @@ retry:
628 cap->cap_id = cap_id; 635 cap->cap_id = cap_id;
629 cap->issued = issued; 636 cap->issued = issued;
630 cap->implemented |= issued; 637 cap->implemented |= issued;
631 if (mseq > cap->mseq) 638 if (ceph_seq_cmp(mseq, cap->mseq) > 0)
632 cap->mds_wanted = wanted; 639 cap->mds_wanted = wanted;
633 else 640 else
634 cap->mds_wanted |= wanted; 641 cap->mds_wanted |= wanted;
@@ -816,7 +823,7 @@ int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
816 823
817 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 824 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
818 cap = rb_entry(p, struct ceph_cap, ci_node); 825 cap = rb_entry(p, struct ceph_cap, ci_node);
819 if (cap != ocap && __cap_is_valid(cap) && 826 if (cap != ocap &&
820 (cap->implemented & ~cap->issued & mask)) 827 (cap->implemented & ~cap->issued & mask))
821 return 1; 828 return 1;
822 } 829 }
@@ -888,7 +895,19 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)
888 */ 895 */
889static int __ceph_is_any_caps(struct ceph_inode_info *ci) 896static int __ceph_is_any_caps(struct ceph_inode_info *ci)
890{ 897{
891 return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_mds >= 0; 898 return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_issued;
899}
900
901int ceph_is_any_caps(struct inode *inode)
902{
903 struct ceph_inode_info *ci = ceph_inode(inode);
904 int ret;
905
906 spin_lock(&ci->i_ceph_lock);
907 ret = __ceph_is_any_caps(ci);
908 spin_unlock(&ci->i_ceph_lock);
909
910 return ret;
892} 911}
893 912
894/* 913/*
@@ -1383,13 +1402,10 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
1383 ci->i_snap_realm->cached_context); 1402 ci->i_snap_realm->cached_context);
1384 dout(" inode %p now dirty snapc %p auth cap %p\n", 1403 dout(" inode %p now dirty snapc %p auth cap %p\n",
1385 &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); 1404 &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
1405 WARN_ON(!ci->i_auth_cap);
1386 BUG_ON(!list_empty(&ci->i_dirty_item)); 1406 BUG_ON(!list_empty(&ci->i_dirty_item));
1387 spin_lock(&mdsc->cap_dirty_lock); 1407 spin_lock(&mdsc->cap_dirty_lock);
1388 if (ci->i_auth_cap) 1408 list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
1389 list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
1390 else
1391 list_add(&ci->i_dirty_item,
1392 &mdsc->cap_dirty_migrating);
1393 spin_unlock(&mdsc->cap_dirty_lock); 1409 spin_unlock(&mdsc->cap_dirty_lock);
1394 if (ci->i_flushing_caps == 0) { 1410 if (ci->i_flushing_caps == 0) {
1395 ihold(inode); 1411 ihold(inode);
@@ -1735,13 +1751,12 @@ ack:
1735/* 1751/*
1736 * Try to flush dirty caps back to the auth mds. 1752 * Try to flush dirty caps back to the auth mds.
1737 */ 1753 */
1738static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session, 1754static int try_flush_caps(struct inode *inode, unsigned *flush_tid)
1739 unsigned *flush_tid)
1740{ 1755{
1741 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 1756 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
1742 struct ceph_inode_info *ci = ceph_inode(inode); 1757 struct ceph_inode_info *ci = ceph_inode(inode);
1743 int unlock_session = session ? 0 : 1;
1744 int flushing = 0; 1758 int flushing = 0;
1759 struct ceph_mds_session *session = NULL;
1745 1760
1746retry: 1761retry:
1747 spin_lock(&ci->i_ceph_lock); 1762 spin_lock(&ci->i_ceph_lock);
@@ -1755,13 +1770,14 @@ retry:
1755 int want = __ceph_caps_wanted(ci); 1770 int want = __ceph_caps_wanted(ci);
1756 int delayed; 1771 int delayed;
1757 1772
1758 if (!session) { 1773 if (!session || session != cap->session) {
1759 spin_unlock(&ci->i_ceph_lock); 1774 spin_unlock(&ci->i_ceph_lock);
1775 if (session)
1776 mutex_unlock(&session->s_mutex);
1760 session = cap->session; 1777 session = cap->session;
1761 mutex_lock(&session->s_mutex); 1778 mutex_lock(&session->s_mutex);
1762 goto retry; 1779 goto retry;
1763 } 1780 }
1764 BUG_ON(session != cap->session);
1765 if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) 1781 if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
1766 goto out; 1782 goto out;
1767 1783
@@ -1780,7 +1796,7 @@ retry:
1780out: 1796out:
1781 spin_unlock(&ci->i_ceph_lock); 1797 spin_unlock(&ci->i_ceph_lock);
1782out_unlocked: 1798out_unlocked:
1783 if (session && unlock_session) 1799 if (session)
1784 mutex_unlock(&session->s_mutex); 1800 mutex_unlock(&session->s_mutex);
1785 return flushing; 1801 return flushing;
1786} 1802}
@@ -1865,7 +1881,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
1865 return ret; 1881 return ret;
1866 mutex_lock(&inode->i_mutex); 1882 mutex_lock(&inode->i_mutex);
1867 1883
1868 dirty = try_flush_caps(inode, NULL, &flush_tid); 1884 dirty = try_flush_caps(inode, &flush_tid);
1869 dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); 1885 dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
1870 1886
1871 /* 1887 /*
@@ -1900,7 +1916,7 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
1900 1916
1901 dout("write_inode %p wait=%d\n", inode, wait); 1917 dout("write_inode %p wait=%d\n", inode, wait);
1902 if (wait) { 1918 if (wait) {
1903 dirty = try_flush_caps(inode, NULL, &flush_tid); 1919 dirty = try_flush_caps(inode, &flush_tid);
1904 if (dirty) 1920 if (dirty)
1905 err = wait_event_interruptible(ci->i_cap_wq, 1921 err = wait_event_interruptible(ci->i_cap_wq,
1906 caps_are_flushed(inode, flush_tid)); 1922 caps_are_flushed(inode, flush_tid));
@@ -2350,11 +2366,11 @@ static void invalidate_aliases(struct inode *inode)
2350 d_prune_aliases(inode); 2366 d_prune_aliases(inode);
2351 /* 2367 /*
2352 * For non-directory inode, d_find_alias() only returns 2368 * For non-directory inode, d_find_alias() only returns
2353 * connected dentry. After calling d_invalidate(), the 2369 * hashed dentry. After calling d_invalidate(), the
2354 * dentry become disconnected. 2370 * dentry becomes unhashed.
2355 * 2371 *
2356 * For directory inode, d_find_alias() can return 2372 * For directory inode, d_find_alias() can return
2357 * disconnected dentry. But directory inode should have 2373 * unhashed dentry. But directory inode should have
2358 * one alias at most. 2374 * one alias at most.
2359 */ 2375 */
2360 while ((dn = d_find_alias(inode))) { 2376 while ((dn = d_find_alias(inode))) {
@@ -2408,6 +2424,22 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2408 dout(" size %llu max_size %llu, i_size %llu\n", size, max_size, 2424 dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
2409 inode->i_size); 2425 inode->i_size);
2410 2426
2427
2428 /*
2429 * auth mds of the inode changed. we received the cap export message,
2430 * but still haven't received the cap import message. handle_cap_export
2431 * updated the new auth MDS' cap.
2432 *
2433 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message
2434 * that was sent before the cap import message. So don't remove caps.
2435 */
2436 if (ceph_seq_cmp(seq, cap->seq) <= 0) {
2437 WARN_ON(cap != ci->i_auth_cap);
2438 WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
2439 seq = cap->seq;
2440 newcaps |= cap->issued;
2441 }
2442
2411 /* 2443 /*
2412 * If CACHE is being revoked, and we have no dirty buffers, 2444 * If CACHE is being revoked, and we have no dirty buffers,
2413 * try to invalidate (once). (If there are dirty buffers, we 2445 * try to invalidate (once). (If there are dirty buffers, we
@@ -2434,6 +2466,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2434 issued |= implemented | __ceph_caps_dirty(ci); 2466 issued |= implemented | __ceph_caps_dirty(ci);
2435 2467
2436 cap->cap_gen = session->s_cap_gen; 2468 cap->cap_gen = session->s_cap_gen;
2469 cap->seq = seq;
2437 2470
2438 __check_cap_issue(ci, cap, newcaps); 2471 __check_cap_issue(ci, cap, newcaps);
2439 2472
@@ -2464,6 +2497,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2464 ceph_buffer_put(ci->i_xattrs.blob); 2497 ceph_buffer_put(ci->i_xattrs.blob);
2465 ci->i_xattrs.blob = ceph_buffer_get(xattr_buf); 2498 ci->i_xattrs.blob = ceph_buffer_get(xattr_buf);
2466 ci->i_xattrs.version = version; 2499 ci->i_xattrs.version = version;
2500 ceph_forget_all_cached_acls(inode);
2467 } 2501 }
2468 } 2502 }
2469 2503
@@ -2483,6 +2517,10 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2483 le32_to_cpu(grant->time_warp_seq), &ctime, &mtime, 2517 le32_to_cpu(grant->time_warp_seq), &ctime, &mtime,
2484 &atime); 2518 &atime);
2485 2519
2520
2521 /* file layout may have changed */
2522 ci->i_layout = grant->layout;
2523
2486 /* max size increase? */ 2524 /* max size increase? */
2487 if (ci->i_auth_cap == cap && max_size != ci->i_max_size) { 2525 if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
2488 dout("max_size %lld -> %llu\n", ci->i_max_size, max_size); 2526 dout("max_size %lld -> %llu\n", ci->i_max_size, max_size);
@@ -2511,11 +2549,6 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2511 check_caps = 1; 2549 check_caps = 1;
2512 } 2550 }
2513 2551
2514 cap->seq = seq;
2515
2516 /* file layout may have changed */
2517 ci->i_layout = grant->layout;
2518
2519 /* revocation, grant, or no-op? */ 2552 /* revocation, grant, or no-op? */
2520 if (cap->issued & ~newcaps) { 2553 if (cap->issued & ~newcaps) {
2521 int revoking = cap->issued & ~newcaps; 2554 int revoking = cap->issued & ~newcaps;
@@ -2741,65 +2774,114 @@ static void handle_cap_trunc(struct inode *inode,
2741 * caller holds s_mutex 2774 * caller holds s_mutex
2742 */ 2775 */
2743static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, 2776static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
2744 struct ceph_mds_session *session, 2777 struct ceph_mds_cap_peer *ph,
2745 int *open_target_sessions) 2778 struct ceph_mds_session *session)
2746{ 2779{
2747 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 2780 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
2781 struct ceph_mds_session *tsession = NULL;
2782 struct ceph_cap *cap, *tcap;
2748 struct ceph_inode_info *ci = ceph_inode(inode); 2783 struct ceph_inode_info *ci = ceph_inode(inode);
2749 int mds = session->s_mds; 2784 u64 t_cap_id;
2750 unsigned mseq = le32_to_cpu(ex->migrate_seq); 2785 unsigned mseq = le32_to_cpu(ex->migrate_seq);
2751 struct ceph_cap *cap = NULL, *t; 2786 unsigned t_seq, t_mseq;
2752 struct rb_node *p; 2787 int target, issued;
2753 int remember = 1; 2788 int mds = session->s_mds;
2754 2789
2755 dout("handle_cap_export inode %p ci %p mds%d mseq %d\n", 2790 if (ph) {
2756 inode, ci, mds, mseq); 2791 t_cap_id = le64_to_cpu(ph->cap_id);
2792 t_seq = le32_to_cpu(ph->seq);
2793 t_mseq = le32_to_cpu(ph->mseq);
2794 target = le32_to_cpu(ph->mds);
2795 } else {
2796 t_cap_id = t_seq = t_mseq = 0;
2797 target = -1;
2798 }
2757 2799
2800 dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n",
2801 inode, ci, mds, mseq, target);
2802retry:
2758 spin_lock(&ci->i_ceph_lock); 2803 spin_lock(&ci->i_ceph_lock);
2804 cap = __get_cap_for_mds(ci, mds);
2805 if (!cap)
2806 goto out_unlock;
2759 2807
2760 /* make sure we haven't seen a higher mseq */ 2808 if (target < 0) {
2761 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 2809 __ceph_remove_cap(cap, false);
2762 t = rb_entry(p, struct ceph_cap, ci_node); 2810 goto out_unlock;
2763 if (ceph_seq_cmp(t->mseq, mseq) > 0) {
2764 dout(" higher mseq on cap from mds%d\n",
2765 t->session->s_mds);
2766 remember = 0;
2767 }
2768 if (t->session->s_mds == mds)
2769 cap = t;
2770 } 2811 }
2771 2812
2772 if (cap) { 2813 /*
2773 if (remember) { 2814 * now we know we haven't received the cap import message yet
2774 /* make note */ 2815 * because the exported cap still exist.
2775 ci->i_cap_exporting_mds = mds; 2816 */
2776 ci->i_cap_exporting_mseq = mseq;
2777 ci->i_cap_exporting_issued = cap->issued;
2778
2779 /*
2780 * make sure we have open sessions with all possible
2781 * export targets, so that we get the matching IMPORT
2782 */
2783 *open_target_sessions = 1;
2784 2817
2785 /* 2818 issued = cap->issued;
2786 * we can't flush dirty caps that we've seen the 2819 WARN_ON(issued != cap->implemented);
2787 * EXPORT but no IMPORT for 2820
2788 */ 2821 tcap = __get_cap_for_mds(ci, target);
2789 spin_lock(&mdsc->cap_dirty_lock); 2822 if (tcap) {
2790 if (!list_empty(&ci->i_dirty_item)) { 2823 /* already have caps from the target */
2791 dout(" moving %p to cap_dirty_migrating\n", 2824 if (tcap->cap_id != t_cap_id ||
2792 inode); 2825 ceph_seq_cmp(tcap->seq, t_seq) < 0) {
2793 list_move(&ci->i_dirty_item, 2826 dout(" updating import cap %p mds%d\n", tcap, target);
2794 &mdsc->cap_dirty_migrating); 2827 tcap->cap_id = t_cap_id;
2828 tcap->seq = t_seq - 1;
2829 tcap->issue_seq = t_seq - 1;
2830 tcap->mseq = t_mseq;
2831 tcap->issued |= issued;
2832 tcap->implemented |= issued;
2833 if (cap == ci->i_auth_cap)
2834 ci->i_auth_cap = tcap;
2835 if (ci->i_flushing_caps && ci->i_auth_cap == tcap) {
2836 spin_lock(&mdsc->cap_dirty_lock);
2837 list_move_tail(&ci->i_flushing_item,
2838 &tcap->session->s_cap_flushing);
2839 spin_unlock(&mdsc->cap_dirty_lock);
2795 } 2840 }
2796 spin_unlock(&mdsc->cap_dirty_lock);
2797 } 2841 }
2798 __ceph_remove_cap(cap, false); 2842 __ceph_remove_cap(cap, false);
2843 goto out_unlock;
2799 } 2844 }
2800 /* else, we already released it */
2801 2845
2846 if (tsession) {
2847 int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
2848 spin_unlock(&ci->i_ceph_lock);
2849 /* add placeholder for the export tagert */
2850 ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
2851 t_seq - 1, t_mseq, (u64)-1, flag, NULL);
2852 goto retry;
2853 }
2854
2855 spin_unlock(&ci->i_ceph_lock);
2856 mutex_unlock(&session->s_mutex);
2857
2858 /* open target session */
2859 tsession = ceph_mdsc_open_export_target_session(mdsc, target);
2860 if (!IS_ERR(tsession)) {
2861 if (mds > target) {
2862 mutex_lock(&session->s_mutex);
2863 mutex_lock_nested(&tsession->s_mutex,
2864 SINGLE_DEPTH_NESTING);
2865 } else {
2866 mutex_lock(&tsession->s_mutex);
2867 mutex_lock_nested(&session->s_mutex,
2868 SINGLE_DEPTH_NESTING);
2869 }
2870 ceph_add_cap_releases(mdsc, tsession);
2871 } else {
2872 WARN_ON(1);
2873 tsession = NULL;
2874 target = -1;
2875 }
2876 goto retry;
2877
2878out_unlock:
2802 spin_unlock(&ci->i_ceph_lock); 2879 spin_unlock(&ci->i_ceph_lock);
2880 mutex_unlock(&session->s_mutex);
2881 if (tsession) {
2882 mutex_unlock(&tsession->s_mutex);
2883 ceph_put_mds_session(tsession);
2884 }
2803} 2885}
2804 2886
2805/* 2887/*
@@ -2810,10 +2892,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
2810 */ 2892 */
2811static void handle_cap_import(struct ceph_mds_client *mdsc, 2893static void handle_cap_import(struct ceph_mds_client *mdsc,
2812 struct inode *inode, struct ceph_mds_caps *im, 2894 struct inode *inode, struct ceph_mds_caps *im,
2895 struct ceph_mds_cap_peer *ph,
2813 struct ceph_mds_session *session, 2896 struct ceph_mds_session *session,
2814 void *snaptrace, int snaptrace_len) 2897 void *snaptrace, int snaptrace_len)
2815{ 2898{
2816 struct ceph_inode_info *ci = ceph_inode(inode); 2899 struct ceph_inode_info *ci = ceph_inode(inode);
2900 struct ceph_cap *cap;
2817 int mds = session->s_mds; 2901 int mds = session->s_mds;
2818 unsigned issued = le32_to_cpu(im->caps); 2902 unsigned issued = le32_to_cpu(im->caps);
2819 unsigned wanted = le32_to_cpu(im->wanted); 2903 unsigned wanted = le32_to_cpu(im->wanted);
@@ -2821,28 +2905,44 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
2821 unsigned mseq = le32_to_cpu(im->migrate_seq); 2905 unsigned mseq = le32_to_cpu(im->migrate_seq);
2822 u64 realmino = le64_to_cpu(im->realm); 2906 u64 realmino = le64_to_cpu(im->realm);
2823 u64 cap_id = le64_to_cpu(im->cap_id); 2907 u64 cap_id = le64_to_cpu(im->cap_id);
2908 u64 p_cap_id;
2909 int peer;
2824 2910
2825 if (ci->i_cap_exporting_mds >= 0 && 2911 if (ph) {
2826 ceph_seq_cmp(ci->i_cap_exporting_mseq, mseq) < 0) { 2912 p_cap_id = le64_to_cpu(ph->cap_id);
2827 dout("handle_cap_import inode %p ci %p mds%d mseq %d" 2913 peer = le32_to_cpu(ph->mds);
2828 " - cleared exporting from mds%d\n", 2914 } else {
2829 inode, ci, mds, mseq, 2915 p_cap_id = 0;
2830 ci->i_cap_exporting_mds); 2916 peer = -1;
2831 ci->i_cap_exporting_issued = 0; 2917 }
2832 ci->i_cap_exporting_mseq = 0;
2833 ci->i_cap_exporting_mds = -1;
2834 2918
2835 spin_lock(&mdsc->cap_dirty_lock); 2919 dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
2836 if (!list_empty(&ci->i_dirty_item)) { 2920 inode, ci, mds, mseq, peer);
2837 dout(" moving %p back to cap_dirty\n", inode); 2921
2838 list_move(&ci->i_dirty_item, &mdsc->cap_dirty); 2922 spin_lock(&ci->i_ceph_lock);
2923 cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
2924 if (cap && cap->cap_id == p_cap_id) {
2925 dout(" remove export cap %p mds%d flags %d\n",
2926 cap, peer, ph->flags);
2927 if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
2928 (cap->seq != le32_to_cpu(ph->seq) ||
2929 cap->mseq != le32_to_cpu(ph->mseq))) {
2930 pr_err("handle_cap_import: mismatched seq/mseq: "
2931 "ino (%llx.%llx) mds%d seq %d mseq %d "
2932 "importer mds%d has peer seq %d mseq %d\n",
2933 ceph_vinop(inode), peer, cap->seq,
2934 cap->mseq, mds, le32_to_cpu(ph->seq),
2935 le32_to_cpu(ph->mseq));
2839 } 2936 }
2840 spin_unlock(&mdsc->cap_dirty_lock); 2937 ci->i_cap_exporting_issued = cap->issued;
2841 } else { 2938 __ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
2842 dout("handle_cap_import inode %p ci %p mds%d mseq %d\n",
2843 inode, ci, mds, mseq);
2844 } 2939 }
2845 2940
2941 /* make sure we re-request max_size, if necessary */
2942 ci->i_wanted_max_size = 0;
2943 ci->i_requested_max_size = 0;
2944 spin_unlock(&ci->i_ceph_lock);
2945
2846 down_write(&mdsc->snap_rwsem); 2946 down_write(&mdsc->snap_rwsem);
2847 ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len, 2947 ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
2848 false); 2948 false);
@@ -2853,11 +2953,6 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
2853 kick_flushing_inode_caps(mdsc, session, inode); 2953 kick_flushing_inode_caps(mdsc, session, inode);
2854 up_read(&mdsc->snap_rwsem); 2954 up_read(&mdsc->snap_rwsem);
2855 2955
2856 /* make sure we re-request max_size, if necessary */
2857 spin_lock(&ci->i_ceph_lock);
2858 ci->i_wanted_max_size = 0; /* reset */
2859 ci->i_requested_max_size = 0;
2860 spin_unlock(&ci->i_ceph_lock);
2861} 2956}
2862 2957
2863/* 2958/*
@@ -2875,6 +2970,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2875 struct ceph_inode_info *ci; 2970 struct ceph_inode_info *ci;
2876 struct ceph_cap *cap; 2971 struct ceph_cap *cap;
2877 struct ceph_mds_caps *h; 2972 struct ceph_mds_caps *h;
2973 struct ceph_mds_cap_peer *peer = NULL;
2878 int mds = session->s_mds; 2974 int mds = session->s_mds;
2879 int op; 2975 int op;
2880 u32 seq, mseq; 2976 u32 seq, mseq;
@@ -2885,12 +2981,13 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2885 void *snaptrace; 2981 void *snaptrace;
2886 size_t snaptrace_len; 2982 size_t snaptrace_len;
2887 void *flock; 2983 void *flock;
2984 void *end;
2888 u32 flock_len; 2985 u32 flock_len;
2889 int open_target_sessions = 0;
2890 2986
2891 dout("handle_caps from mds%d\n", mds); 2987 dout("handle_caps from mds%d\n", mds);
2892 2988
2893 /* decode */ 2989 /* decode */
2990 end = msg->front.iov_base + msg->front.iov_len;
2894 tid = le64_to_cpu(msg->hdr.tid); 2991 tid = le64_to_cpu(msg->hdr.tid);
2895 if (msg->front.iov_len < sizeof(*h)) 2992 if (msg->front.iov_len < sizeof(*h))
2896 goto bad; 2993 goto bad;
@@ -2908,17 +3005,28 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2908 snaptrace_len = le32_to_cpu(h->snap_trace_len); 3005 snaptrace_len = le32_to_cpu(h->snap_trace_len);
2909 3006
2910 if (le16_to_cpu(msg->hdr.version) >= 2) { 3007 if (le16_to_cpu(msg->hdr.version) >= 2) {
2911 void *p, *end; 3008 void *p = snaptrace + snaptrace_len;
2912
2913 p = snaptrace + snaptrace_len;
2914 end = msg->front.iov_base + msg->front.iov_len;
2915 ceph_decode_32_safe(&p, end, flock_len, bad); 3009 ceph_decode_32_safe(&p, end, flock_len, bad);
3010 if (p + flock_len > end)
3011 goto bad;
2916 flock = p; 3012 flock = p;
2917 } else { 3013 } else {
2918 flock = NULL; 3014 flock = NULL;
2919 flock_len = 0; 3015 flock_len = 0;
2920 } 3016 }
2921 3017
3018 if (le16_to_cpu(msg->hdr.version) >= 3) {
3019 if (op == CEPH_CAP_OP_IMPORT) {
3020 void *p = flock + flock_len;
3021 if (p + sizeof(*peer) > end)
3022 goto bad;
3023 peer = p;
3024 } else if (op == CEPH_CAP_OP_EXPORT) {
3025 /* recorded in unused fields */
3026 peer = (void *)&h->size;
3027 }
3028 }
3029
2922 mutex_lock(&session->s_mutex); 3030 mutex_lock(&session->s_mutex);
2923 session->s_seq++; 3031 session->s_seq++;
2924 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, 3032 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
@@ -2951,11 +3059,11 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2951 goto done; 3059 goto done;
2952 3060
2953 case CEPH_CAP_OP_EXPORT: 3061 case CEPH_CAP_OP_EXPORT:
2954 handle_cap_export(inode, h, session, &open_target_sessions); 3062 handle_cap_export(inode, h, peer, session);
2955 goto done; 3063 goto done_unlocked;
2956 3064
2957 case CEPH_CAP_OP_IMPORT: 3065 case CEPH_CAP_OP_IMPORT:
2958 handle_cap_import(mdsc, inode, h, session, 3066 handle_cap_import(mdsc, inode, h, peer, session,
2959 snaptrace, snaptrace_len); 3067 snaptrace, snaptrace_len);
2960 } 3068 }
2961 3069
@@ -3007,8 +3115,6 @@ done:
3007done_unlocked: 3115done_unlocked:
3008 if (inode) 3116 if (inode)
3009 iput(inode); 3117 iput(inode);
3010 if (open_target_sessions)
3011 ceph_mdsc_open_export_target_sessions(mdsc, session);
3012 return; 3118 return;
3013 3119
3014bad: 3120bad:
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 2a0bcaeb189a..619616d585b0 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -693,6 +693,10 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
693 if (!err && !req->r_reply_info.head->is_dentry) 693 if (!err && !req->r_reply_info.head->is_dentry)
694 err = ceph_handle_notrace_create(dir, dentry); 694 err = ceph_handle_notrace_create(dir, dentry);
695 ceph_mdsc_put_request(req); 695 ceph_mdsc_put_request(req);
696
697 if (!err)
698 err = ceph_init_acl(dentry, dentry->d_inode, dir);
699
696 if (err) 700 if (err)
697 d_drop(dentry); 701 d_drop(dentry);
698 return err; 702 return err;
@@ -1037,14 +1041,19 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
1037 valid = 1; 1041 valid = 1;
1038 } else if (dentry_lease_is_valid(dentry) || 1042 } else if (dentry_lease_is_valid(dentry) ||
1039 dir_lease_is_valid(dir, dentry)) { 1043 dir_lease_is_valid(dir, dentry)) {
1040 valid = 1; 1044 if (dentry->d_inode)
1045 valid = ceph_is_any_caps(dentry->d_inode);
1046 else
1047 valid = 1;
1041 } 1048 }
1042 1049
1043 dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid"); 1050 dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
1044 if (valid) 1051 if (valid) {
1045 ceph_dentry_lru_touch(dentry); 1052 ceph_dentry_lru_touch(dentry);
1046 else 1053 } else {
1054 ceph_dir_clear_complete(dir);
1047 d_drop(dentry); 1055 d_drop(dentry);
1056 }
1048 iput(dir); 1057 iput(dir);
1049 return valid; 1058 return valid;
1050} 1059}
@@ -1293,6 +1302,7 @@ const struct inode_operations ceph_dir_iops = {
1293 .getxattr = ceph_getxattr, 1302 .getxattr = ceph_getxattr,
1294 .listxattr = ceph_listxattr, 1303 .listxattr = ceph_listxattr,
1295 .removexattr = ceph_removexattr, 1304 .removexattr = ceph_removexattr,
1305 .get_acl = ceph_get_acl,
1296 .mknod = ceph_mknod, 1306 .mknod = ceph_mknod,
1297 .symlink = ceph_symlink, 1307 .symlink = ceph_symlink,
1298 .mkdir = ceph_mkdir, 1308 .mkdir = ceph_mkdir,
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 3de89829e2a1..dfd2ce3419f8 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -408,51 +408,92 @@ more:
408 * 408 *
409 * If the read spans object boundary, just do multiple reads. 409 * If the read spans object boundary, just do multiple reads.
410 */ 410 */
411static ssize_t ceph_sync_read(struct file *file, char __user *data, 411static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
412 unsigned len, loff_t *poff, int *checkeof) 412 int *checkeof)
413{ 413{
414 struct file *file = iocb->ki_filp;
414 struct inode *inode = file_inode(file); 415 struct inode *inode = file_inode(file);
415 struct page **pages; 416 struct page **pages;
416 u64 off = *poff; 417 u64 off = iocb->ki_pos;
417 int num_pages, ret; 418 int num_pages, ret;
419 size_t len = i->count;
418 420
419 dout("sync_read on file %p %llu~%u %s\n", file, off, len, 421 dout("sync_read on file %p %llu~%u %s\n", file, off,
422 (unsigned)len,
420 (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); 423 (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
421
422 if (file->f_flags & O_DIRECT) {
423 num_pages = calc_pages_for((unsigned long)data, len);
424 pages = ceph_get_direct_page_vector(data, num_pages, true);
425 } else {
426 num_pages = calc_pages_for(off, len);
427 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
428 }
429 if (IS_ERR(pages))
430 return PTR_ERR(pages);
431
432 /* 424 /*
433 * flush any page cache pages in this range. this 425 * flush any page cache pages in this range. this
434 * will make concurrent normal and sync io slow, 426 * will make concurrent normal and sync io slow,
435 * but it will at least behave sensibly when they are 427 * but it will at least behave sensibly when they are
436 * in sequence. 428 * in sequence.
437 */ 429 */
438 ret = filemap_write_and_wait(inode->i_mapping); 430 ret = filemap_write_and_wait_range(inode->i_mapping, off,
431 off + len);
439 if (ret < 0) 432 if (ret < 0)
440 goto done; 433 return ret;
441 434
442 ret = striped_read(inode, off, len, pages, num_pages, checkeof, 435 if (file->f_flags & O_DIRECT) {
443 file->f_flags & O_DIRECT, 436 while (iov_iter_count(i)) {
444 (unsigned long)data & ~PAGE_MASK); 437 void __user *data = i->iov[0].iov_base + i->iov_offset;
438 size_t len = i->iov[0].iov_len - i->iov_offset;
439
440 num_pages = calc_pages_for((unsigned long)data, len);
441 pages = ceph_get_direct_page_vector(data,
442 num_pages, true);
443 if (IS_ERR(pages))
444 return PTR_ERR(pages);
445
446 ret = striped_read(inode, off, len,
447 pages, num_pages, checkeof,
448 1, (unsigned long)data & ~PAGE_MASK);
449 ceph_put_page_vector(pages, num_pages, true);
450
451 if (ret <= 0)
452 break;
453 off += ret;
454 iov_iter_advance(i, ret);
455 if (ret < len)
456 break;
457 }
458 } else {
459 num_pages = calc_pages_for(off, len);
460 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
461 if (IS_ERR(pages))
462 return PTR_ERR(pages);
463 ret = striped_read(inode, off, len, pages,
464 num_pages, checkeof, 0, 0);
465 if (ret > 0) {
466 int l, k = 0;
467 size_t left = len = ret;
468
469 while (left) {
470 void __user *data = i->iov[0].iov_base
471 + i->iov_offset;
472 l = min(i->iov[0].iov_len - i->iov_offset,
473 left);
474
475 ret = ceph_copy_page_vector_to_user(&pages[k],
476 data, off,
477 l);
478 if (ret > 0) {
479 iov_iter_advance(i, ret);
480 left -= ret;
481 off += ret;
482 k = calc_pages_for(iocb->ki_pos,
483 len - left + 1) - 1;
484 BUG_ON(k >= num_pages && left);
485 } else
486 break;
487 }
488 }
489 ceph_release_page_vector(pages, num_pages);
490 }
445 491
446 if (ret >= 0 && (file->f_flags & O_DIRECT) == 0) 492 if (off > iocb->ki_pos) {
447 ret = ceph_copy_page_vector_to_user(pages, data, off, ret); 493 ret = off - iocb->ki_pos;
448 if (ret >= 0) 494 iocb->ki_pos = off;
449 *poff = off + ret; 495 }
450 496
451done:
452 if (file->f_flags & O_DIRECT)
453 ceph_put_page_vector(pages, num_pages, true);
454 else
455 ceph_release_page_vector(pages, num_pages);
456 dout("sync_read result %d\n", ret); 497 dout("sync_read result %d\n", ret);
457 return ret; 498 return ret;
458} 499}
@@ -489,83 +530,79 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
489 } 530 }
490} 531}
491 532
533
492/* 534/*
493 * Synchronous write, straight from __user pointer or user pages (if 535 * Synchronous write, straight from __user pointer or user pages.
494 * O_DIRECT).
495 * 536 *
496 * If write spans object boundary, just do multiple writes. (For a 537 * If write spans object boundary, just do multiple writes. (For a
497 * correct atomic write, we should e.g. take write locks on all 538 * correct atomic write, we should e.g. take write locks on all
498 * objects, rollback on failure, etc.) 539 * objects, rollback on failure, etc.)
499 */ 540 */
500static ssize_t ceph_sync_write(struct file *file, const char __user *data, 541static ssize_t
501 size_t left, loff_t pos, loff_t *ppos) 542ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,
543 unsigned long nr_segs, size_t count)
502{ 544{
545 struct file *file = iocb->ki_filp;
503 struct inode *inode = file_inode(file); 546 struct inode *inode = file_inode(file);
504 struct ceph_inode_info *ci = ceph_inode(inode); 547 struct ceph_inode_info *ci = ceph_inode(inode);
505 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 548 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
506 struct ceph_snap_context *snapc; 549 struct ceph_snap_context *snapc;
507 struct ceph_vino vino; 550 struct ceph_vino vino;
508 struct ceph_osd_request *req; 551 struct ceph_osd_request *req;
509 int num_ops = 1;
510 struct page **pages; 552 struct page **pages;
511 int num_pages; 553 int num_pages;
512 u64 len;
513 int written = 0; 554 int written = 0;
514 int flags; 555 int flags;
515 int check_caps = 0; 556 int check_caps = 0;
516 int page_align, io_align; 557 int page_align;
517 unsigned long buf_align;
518 int ret; 558 int ret;
519 struct timespec mtime = CURRENT_TIME; 559 struct timespec mtime = CURRENT_TIME;
520 bool own_pages = false; 560 loff_t pos = iocb->ki_pos;
561 struct iov_iter i;
521 562
522 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) 563 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
523 return -EROFS; 564 return -EROFS;
524 565
525 dout("sync_write on file %p %lld~%u %s\n", file, pos, 566 dout("sync_direct_write on file %p %lld~%u\n", file, pos,
526 (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); 567 (unsigned)count);
527 568
528 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); 569 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
529 if (ret < 0) 570 if (ret < 0)
530 return ret; 571 return ret;
531 572
532 ret = invalidate_inode_pages2_range(inode->i_mapping, 573 ret = invalidate_inode_pages2_range(inode->i_mapping,
533 pos >> PAGE_CACHE_SHIFT, 574 pos >> PAGE_CACHE_SHIFT,
534 (pos + left) >> PAGE_CACHE_SHIFT); 575 (pos + count) >> PAGE_CACHE_SHIFT);
535 if (ret < 0) 576 if (ret < 0)
536 dout("invalidate_inode_pages2_range returned %d\n", ret); 577 dout("invalidate_inode_pages2_range returned %d\n", ret);
537 578
538 flags = CEPH_OSD_FLAG_ORDERSNAP | 579 flags = CEPH_OSD_FLAG_ORDERSNAP |
539 CEPH_OSD_FLAG_ONDISK | 580 CEPH_OSD_FLAG_ONDISK |
540 CEPH_OSD_FLAG_WRITE; 581 CEPH_OSD_FLAG_WRITE;
541 if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
542 flags |= CEPH_OSD_FLAG_ACK;
543 else
544 num_ops++; /* Also include a 'startsync' command. */
545 582
546 /* 583 iov_iter_init(&i, iov, nr_segs, count, 0);
547 * we may need to do multiple writes here if we span an object 584
548 * boundary. this isn't atomic, unfortunately. :( 585 while (iov_iter_count(&i) > 0) {
549 */ 586 void __user *data = i.iov->iov_base + i.iov_offset;
550more: 587 u64 len = i.iov->iov_len - i.iov_offset;
551 io_align = pos & ~PAGE_MASK; 588
552 buf_align = (unsigned long)data & ~PAGE_MASK; 589 page_align = (unsigned long)data & ~PAGE_MASK;
553 len = left; 590
554 591 snapc = ci->i_snap_realm->cached_context;
555 snapc = ci->i_snap_realm->cached_context; 592 vino = ceph_vino(inode);
556 vino = ceph_vino(inode); 593 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
557 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 594 vino, pos, &len,
558 vino, pos, &len, num_ops, 595 2,/*include a 'startsync' command*/
559 CEPH_OSD_OP_WRITE, flags, snapc, 596 CEPH_OSD_OP_WRITE, flags, snapc,
560 ci->i_truncate_seq, ci->i_truncate_size, 597 ci->i_truncate_seq,
561 false); 598 ci->i_truncate_size,
562 if (IS_ERR(req)) 599 false);
563 return PTR_ERR(req); 600 if (IS_ERR(req)) {
601 ret = PTR_ERR(req);
602 goto out;
603 }
564 604
565 /* write from beginning of first page, regardless of io alignment */ 605 num_pages = calc_pages_for(page_align, len);
566 page_align = file->f_flags & O_DIRECT ? buf_align : io_align;
567 num_pages = calc_pages_for(page_align, len);
568 if (file->f_flags & O_DIRECT) {
569 pages = ceph_get_direct_page_vector(data, num_pages, false); 606 pages = ceph_get_direct_page_vector(data, num_pages, false);
570 if (IS_ERR(pages)) { 607 if (IS_ERR(pages)) {
571 ret = PTR_ERR(pages); 608 ret = PTR_ERR(pages);
@@ -577,60 +614,175 @@ more:
577 * may block. 614 * may block.
578 */ 615 */
579 truncate_inode_pages_range(inode->i_mapping, pos, 616 truncate_inode_pages_range(inode->i_mapping, pos,
580 (pos+len) | (PAGE_CACHE_SIZE-1)); 617 (pos+len) | (PAGE_CACHE_SIZE-1));
581 } else { 618 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
619 false, false);
620
621 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
622 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
623
624 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
625 if (!ret)
626 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
627
628 ceph_put_page_vector(pages, num_pages, false);
629
630out:
631 ceph_osdc_put_request(req);
632 if (ret == 0) {
633 pos += len;
634 written += len;
635 iov_iter_advance(&i, (size_t)len);
636
637 if (pos > i_size_read(inode)) {
638 check_caps = ceph_inode_set_size(inode, pos);
639 if (check_caps)
640 ceph_check_caps(ceph_inode(inode),
641 CHECK_CAPS_AUTHONLY,
642 NULL);
643 }
644 } else
645 break;
646 }
647
648 if (ret != -EOLDSNAPC && written > 0) {
649 iocb->ki_pos = pos;
650 ret = written;
651 }
652 return ret;
653}
654
655
656/*
657 * Synchronous write, straight from __user pointer or user pages.
658 *
659 * If write spans object boundary, just do multiple writes. (For a
660 * correct atomic write, we should e.g. take write locks on all
661 * objects, rollback on failure, etc.)
662 */
663static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov,
664 unsigned long nr_segs, size_t count)
665{
666 struct file *file = iocb->ki_filp;
667 struct inode *inode = file_inode(file);
668 struct ceph_inode_info *ci = ceph_inode(inode);
669 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
670 struct ceph_snap_context *snapc;
671 struct ceph_vino vino;
672 struct ceph_osd_request *req;
673 struct page **pages;
674 u64 len;
675 int num_pages;
676 int written = 0;
677 int flags;
678 int check_caps = 0;
679 int ret;
680 struct timespec mtime = CURRENT_TIME;
681 loff_t pos = iocb->ki_pos;
682 struct iov_iter i;
683
684 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
685 return -EROFS;
686
687 dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
688
689 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
690 if (ret < 0)
691 return ret;
692
693 ret = invalidate_inode_pages2_range(inode->i_mapping,
694 pos >> PAGE_CACHE_SHIFT,
695 (pos + count) >> PAGE_CACHE_SHIFT);
696 if (ret < 0)
697 dout("invalidate_inode_pages2_range returned %d\n", ret);
698
699 flags = CEPH_OSD_FLAG_ORDERSNAP |
700 CEPH_OSD_FLAG_ONDISK |
701 CEPH_OSD_FLAG_WRITE |
702 CEPH_OSD_FLAG_ACK;
703
704 iov_iter_init(&i, iov, nr_segs, count, 0);
705
706 while ((len = iov_iter_count(&i)) > 0) {
707 size_t left;
708 int n;
709
710 snapc = ci->i_snap_realm->cached_context;
711 vino = ceph_vino(inode);
712 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
713 vino, pos, &len, 1,
714 CEPH_OSD_OP_WRITE, flags, snapc,
715 ci->i_truncate_seq,
716 ci->i_truncate_size,
717 false);
718 if (IS_ERR(req)) {
719 ret = PTR_ERR(req);
720 goto out;
721 }
722
723 /*
724 * write from beginning of first page,
725 * regardless of io alignment
726 */
727 num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
728
582 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); 729 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
583 if (IS_ERR(pages)) { 730 if (IS_ERR(pages)) {
584 ret = PTR_ERR(pages); 731 ret = PTR_ERR(pages);
585 goto out; 732 goto out;
586 } 733 }
587 ret = ceph_copy_user_to_page_vector(pages, data, pos, len); 734
735 left = len;
736 for (n = 0; n < num_pages; n++) {
737 size_t plen = min_t(size_t, left, PAGE_SIZE);
738 ret = iov_iter_copy_from_user(pages[n], &i, 0, plen);
739 if (ret != plen) {
740 ret = -EFAULT;
741 break;
742 }
743 left -= ret;
744 iov_iter_advance(&i, ret);
745 }
746
588 if (ret < 0) { 747 if (ret < 0) {
589 ceph_release_page_vector(pages, num_pages); 748 ceph_release_page_vector(pages, num_pages);
590 goto out; 749 goto out;
591 } 750 }
592 751
593 if ((file->f_flags & O_SYNC) == 0) { 752 /* get a second commit callback */
594 /* get a second commit callback */ 753 req->r_unsafe_callback = ceph_sync_write_unsafe;
595 req->r_unsafe_callback = ceph_sync_write_unsafe; 754 req->r_inode = inode;
596 req->r_inode = inode;
597 own_pages = true;
598 }
599 }
600 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
601 false, own_pages);
602 755
603 /* BUG_ON(vino.snap != CEPH_NOSNAP); */ 756 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
604 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); 757 false, true);
605 758
606 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); 759 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
607 if (!ret) 760 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
608 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
609 761
610 if (file->f_flags & O_DIRECT) 762 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
611 ceph_put_page_vector(pages, num_pages, false); 763 if (!ret)
612 else if (file->f_flags & O_SYNC) 764 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
613 ceph_release_page_vector(pages, num_pages);
614 765
615out: 766out:
616 ceph_osdc_put_request(req); 767 ceph_osdc_put_request(req);
617 if (ret == 0) { 768 if (ret == 0) {
618 pos += len; 769 pos += len;
619 written += len; 770 written += len;
620 left -= len; 771
621 data += len; 772 if (pos > i_size_read(inode)) {
622 if (left) 773 check_caps = ceph_inode_set_size(inode, pos);
623 goto more; 774 if (check_caps)
775 ceph_check_caps(ceph_inode(inode),
776 CHECK_CAPS_AUTHONLY,
777 NULL);
778 }
779 } else
780 break;
781 }
624 782
783 if (ret != -EOLDSNAPC && written > 0) {
625 ret = written; 784 ret = written;
626 *ppos = pos; 785 iocb->ki_pos = pos;
627 if (pos > i_size_read(inode))
628 check_caps = ceph_inode_set_size(inode, pos);
629 if (check_caps)
630 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
631 NULL);
632 } else if (ret != -EOLDSNAPC && written > 0) {
633 ret = written;
634 } 786 }
635 return ret; 787 return ret;
636} 788}
@@ -647,55 +799,84 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
647{ 799{
648 struct file *filp = iocb->ki_filp; 800 struct file *filp = iocb->ki_filp;
649 struct ceph_file_info *fi = filp->private_data; 801 struct ceph_file_info *fi = filp->private_data;
650 loff_t *ppos = &iocb->ki_pos; 802 size_t len = iocb->ki_nbytes;
651 size_t len = iov->iov_len;
652 struct inode *inode = file_inode(filp); 803 struct inode *inode = file_inode(filp);
653 struct ceph_inode_info *ci = ceph_inode(inode); 804 struct ceph_inode_info *ci = ceph_inode(inode);
654 void __user *base = iov->iov_base;
655 ssize_t ret; 805 ssize_t ret;
656 int want, got = 0; 806 int want, got = 0;
657 int checkeof = 0, read = 0; 807 int checkeof = 0, read = 0;
658 808
659 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
660 inode, ceph_vinop(inode), pos, (unsigned)len, inode);
661again: 809again:
810 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
811 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
812
662 if (fi->fmode & CEPH_FILE_MODE_LAZY) 813 if (fi->fmode & CEPH_FILE_MODE_LAZY)
663 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; 814 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
664 else 815 else
665 want = CEPH_CAP_FILE_CACHE; 816 want = CEPH_CAP_FILE_CACHE;
666 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); 817 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
667 if (ret < 0) 818 if (ret < 0)
668 goto out; 819 return ret;
669 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
670 inode, ceph_vinop(inode), pos, (unsigned)len,
671 ceph_cap_string(got));
672 820
673 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || 821 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
674 (iocb->ki_filp->f_flags & O_DIRECT) || 822 (iocb->ki_filp->f_flags & O_DIRECT) ||
675 (fi->flags & CEPH_F_SYNC)) 823 (fi->flags & CEPH_F_SYNC)) {
824 struct iov_iter i;
825
826 dout("aio_sync_read %p %llx.%llx %llu~%u got cap refs on %s\n",
827 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
828 ceph_cap_string(got));
829
830 if (!read) {
831 ret = generic_segment_checks(iov, &nr_segs,
832 &len, VERIFY_WRITE);
833 if (ret)
834 goto out;
835 }
836
837 iov_iter_init(&i, iov, nr_segs, len, read);
838
676 /* hmm, this isn't really async... */ 839 /* hmm, this isn't really async... */
677 ret = ceph_sync_read(filp, base, len, ppos, &checkeof); 840 ret = ceph_sync_read(iocb, &i, &checkeof);
678 else 841 } else {
679 ret = generic_file_aio_read(iocb, iov, nr_segs, pos); 842 /*
843 * We can't modify the content of iov,
844 * so we only read from beginning.
845 */
846 if (read) {
847 iocb->ki_pos = pos;
848 len = iocb->ki_nbytes;
849 read = 0;
850 }
851 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
852 inode, ceph_vinop(inode), pos, (unsigned)len,
853 ceph_cap_string(got));
680 854
855 ret = generic_file_aio_read(iocb, iov, nr_segs, pos);
856 }
681out: 857out:
682 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", 858 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
683 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); 859 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
684 ceph_put_cap_refs(ci, got); 860 ceph_put_cap_refs(ci, got);
685 861
686 if (checkeof && ret >= 0) { 862 if (checkeof && ret >= 0) {
687 int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); 863 int statret = ceph_do_getattr(inode,
864 CEPH_STAT_CAP_SIZE);
688 865
689 /* hit EOF or hole? */ 866 /* hit EOF or hole? */
690 if (statret == 0 && *ppos < inode->i_size) { 867 if (statret == 0 && iocb->ki_pos < inode->i_size &&
691 dout("aio_read sync_read hit hole, ppos %lld < size %lld, reading more\n", *ppos, inode->i_size); 868 ret < len) {
869 dout("sync_read hit hole, ppos %lld < size %lld"
870 ", reading more\n", iocb->ki_pos,
871 inode->i_size);
872
692 read += ret; 873 read += ret;
693 base += ret;
694 len -= ret; 874 len -= ret;
695 checkeof = 0; 875 checkeof = 0;
696 goto again; 876 goto again;
697 } 877 }
698 } 878 }
879
699 if (ret >= 0) 880 if (ret >= 0)
700 ret += read; 881 ret += read;
701 882
@@ -772,11 +953,13 @@ retry_snap:
772 inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); 953 inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
773 954
774 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || 955 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
775 (iocb->ki_filp->f_flags & O_DIRECT) || 956 (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
776 (fi->flags & CEPH_F_SYNC)) {
777 mutex_unlock(&inode->i_mutex); 957 mutex_unlock(&inode->i_mutex);
778 written = ceph_sync_write(file, iov->iov_base, count, 958 if (file->f_flags & O_DIRECT)
779 pos, &iocb->ki_pos); 959 written = ceph_sync_direct_write(iocb, iov,
960 nr_segs, count);
961 else
962 written = ceph_sync_write(iocb, iov, nr_segs, count);
780 if (written == -EOLDSNAPC) { 963 if (written == -EOLDSNAPC) {
781 dout("aio_write %p %llx.%llx %llu~%u" 964 dout("aio_write %p %llx.%llx %llu~%u"
782 "got EOLDSNAPC, retrying\n", 965 "got EOLDSNAPC, retrying\n",
@@ -1018,7 +1201,7 @@ static long ceph_fallocate(struct file *file, int mode,
1018 loff_t offset, loff_t length) 1201 loff_t offset, loff_t length)
1019{ 1202{
1020 struct ceph_file_info *fi = file->private_data; 1203 struct ceph_file_info *fi = file->private_data;
1021 struct inode *inode = file->f_dentry->d_inode; 1204 struct inode *inode = file_inode(file);
1022 struct ceph_inode_info *ci = ceph_inode(inode); 1205 struct ceph_inode_info *ci = ceph_inode(inode);
1023 struct ceph_osd_client *osdc = 1206 struct ceph_osd_client *osdc =
1024 &ceph_inode_to_client(inode)->client->osdc; 1207 &ceph_inode_to_client(inode)->client->osdc;
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 278fd2891288..6fc10a7d7c59 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -95,6 +95,7 @@ const struct inode_operations ceph_file_iops = {
95 .getxattr = ceph_getxattr, 95 .getxattr = ceph_getxattr,
96 .listxattr = ceph_listxattr, 96 .listxattr = ceph_listxattr,
97 .removexattr = ceph_removexattr, 97 .removexattr = ceph_removexattr,
98 .get_acl = ceph_get_acl,
98}; 99};
99 100
100 101
@@ -335,12 +336,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
335 ci->i_hold_caps_min = 0; 336 ci->i_hold_caps_min = 0;
336 ci->i_hold_caps_max = 0; 337 ci->i_hold_caps_max = 0;
337 INIT_LIST_HEAD(&ci->i_cap_delay_list); 338 INIT_LIST_HEAD(&ci->i_cap_delay_list);
338 ci->i_cap_exporting_mds = 0;
339 ci->i_cap_exporting_mseq = 0;
340 ci->i_cap_exporting_issued = 0;
341 INIT_LIST_HEAD(&ci->i_cap_snaps); 339 INIT_LIST_HEAD(&ci->i_cap_snaps);
342 ci->i_head_snapc = NULL; 340 ci->i_head_snapc = NULL;
343 ci->i_snap_caps = 0; 341 ci->i_snap_caps = 0;
342 ci->i_cap_exporting_issued = 0;
344 343
345 for (i = 0; i < CEPH_FILE_MODE_NUM; i++) 344 for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
346 ci->i_nr_by_mode[i] = 0; 345 ci->i_nr_by_mode[i] = 0;
@@ -436,6 +435,16 @@ void ceph_destroy_inode(struct inode *inode)
436 call_rcu(&inode->i_rcu, ceph_i_callback); 435 call_rcu(&inode->i_rcu, ceph_i_callback);
437} 436}
438 437
438int ceph_drop_inode(struct inode *inode)
439{
440 /*
441 * Positve dentry and corresponding inode are always accompanied
442 * in MDS reply. So no need to keep inode in the cache after
443 * dropping all its aliases.
444 */
445 return 1;
446}
447
439/* 448/*
440 * Helpers to fill in size, ctime, mtime, and atime. We have to be 449 * Helpers to fill in size, ctime, mtime, and atime. We have to be
441 * careful because either the client or MDS may have more up to date 450 * careful because either the client or MDS may have more up to date
@@ -670,6 +679,7 @@ static int fill_inode(struct inode *inode,
670 memcpy(ci->i_xattrs.blob->vec.iov_base, 679 memcpy(ci->i_xattrs.blob->vec.iov_base,
671 iinfo->xattr_data, iinfo->xattr_len); 680 iinfo->xattr_data, iinfo->xattr_len);
672 ci->i_xattrs.version = le64_to_cpu(info->xattr_version); 681 ci->i_xattrs.version = le64_to_cpu(info->xattr_version);
682 ceph_forget_all_cached_acls(inode);
673 xattr_blob = NULL; 683 xattr_blob = NULL;
674 } 684 }
675 685
@@ -1454,7 +1464,8 @@ static void ceph_invalidate_work(struct work_struct *work)
1454 dout("invalidate_pages %p gen %d revoking %d\n", inode, 1464 dout("invalidate_pages %p gen %d revoking %d\n", inode,
1455 ci->i_rdcache_gen, ci->i_rdcache_revoking); 1465 ci->i_rdcache_gen, ci->i_rdcache_revoking);
1456 if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { 1466 if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
1457 /* nevermind! */ 1467 if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE))
1468 check = 1;
1458 spin_unlock(&ci->i_ceph_lock); 1469 spin_unlock(&ci->i_ceph_lock);
1459 mutex_unlock(&ci->i_truncate_mutex); 1470 mutex_unlock(&ci->i_truncate_mutex);
1460 goto out; 1471 goto out;
@@ -1475,13 +1486,14 @@ static void ceph_invalidate_work(struct work_struct *work)
1475 dout("invalidate_pages %p gen %d raced, now %d revoking %d\n", 1486 dout("invalidate_pages %p gen %d raced, now %d revoking %d\n",
1476 inode, orig_gen, ci->i_rdcache_gen, 1487 inode, orig_gen, ci->i_rdcache_gen,
1477 ci->i_rdcache_revoking); 1488 ci->i_rdcache_revoking);
1489 if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE))
1490 check = 1;
1478 } 1491 }
1479 spin_unlock(&ci->i_ceph_lock); 1492 spin_unlock(&ci->i_ceph_lock);
1480 mutex_unlock(&ci->i_truncate_mutex); 1493 mutex_unlock(&ci->i_truncate_mutex);
1481 1494out:
1482 if (check) 1495 if (check)
1483 ceph_check_caps(ci, 0, NULL); 1496 ceph_check_caps(ci, 0, NULL);
1484out:
1485 iput(inode); 1497 iput(inode);
1486} 1498}
1487 1499
@@ -1602,6 +1614,7 @@ static const struct inode_operations ceph_symlink_iops = {
1602 .getxattr = ceph_getxattr, 1614 .getxattr = ceph_getxattr,
1603 .listxattr = ceph_listxattr, 1615 .listxattr = ceph_listxattr,
1604 .removexattr = ceph_removexattr, 1616 .removexattr = ceph_removexattr,
1617 .get_acl = ceph_get_acl,
1605}; 1618};
1606 1619
1607/* 1620/*
@@ -1675,6 +1688,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1675 dirtied |= CEPH_CAP_AUTH_EXCL; 1688 dirtied |= CEPH_CAP_AUTH_EXCL;
1676 } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 || 1689 } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||
1677 attr->ia_mode != inode->i_mode) { 1690 attr->ia_mode != inode->i_mode) {
1691 inode->i_mode = attr->ia_mode;
1678 req->r_args.setattr.mode = cpu_to_le32(attr->ia_mode); 1692 req->r_args.setattr.mode = cpu_to_le32(attr->ia_mode);
1679 mask |= CEPH_SETATTR_MODE; 1693 mask |= CEPH_SETATTR_MODE;
1680 release |= CEPH_CAP_AUTH_SHARED; 1694 release |= CEPH_CAP_AUTH_SHARED;
@@ -1790,6 +1804,12 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1790 if (inode_dirty_flags) 1804 if (inode_dirty_flags)
1791 __mark_inode_dirty(inode, inode_dirty_flags); 1805 __mark_inode_dirty(inode, inode_dirty_flags);
1792 1806
1807 if (ia_valid & ATTR_MODE) {
1808 err = ceph_acl_chmod(dentry, inode);
1809 if (err)
1810 goto out_put;
1811 }
1812
1793 if (mask) { 1813 if (mask) {
1794 req->r_inode = inode; 1814 req->r_inode = inode;
1795 ihold(inode); 1815 ihold(inode);
@@ -1809,6 +1829,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1809 return err; 1829 return err;
1810out: 1830out:
1811 spin_unlock(&ci->i_ceph_lock); 1831 spin_unlock(&ci->i_ceph_lock);
1832out_put:
1812 ceph_mdsc_put_request(req); 1833 ceph_mdsc_put_request(req);
1813 return err; 1834 return err;
1814} 1835}
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index 669622fd1ae3..dc66c9e023e4 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -183,6 +183,8 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
183 struct ceph_inode_info *ci = ceph_inode(inode); 183 struct ceph_inode_info *ci = ceph_inode(inode);
184 struct ceph_osd_client *osdc = 184 struct ceph_osd_client *osdc =
185 &ceph_sb_to_client(inode->i_sb)->client->osdc; 185 &ceph_sb_to_client(inode->i_sb)->client->osdc;
186 struct ceph_object_locator oloc;
187 struct ceph_object_id oid;
186 u64 len = 1, olen; 188 u64 len = 1, olen;
187 u64 tmp; 189 u64 tmp;
188 struct ceph_pg pgid; 190 struct ceph_pg pgid;
@@ -211,8 +213,10 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
211 snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx", 213 snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx",
212 ceph_ino(inode), dl.object_no); 214 ceph_ino(inode), dl.object_no);
213 215
214 r = ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap, 216 oloc.pool = ceph_file_layout_pg_pool(ci->i_layout);
215 ceph_file_layout_pg_pool(ci->i_layout)); 217 ceph_oid_set_name(&oid, dl.object_name);
218
219 r = ceph_oloc_oid_to_pg(osdc->osdmap, &oloc, &oid, &pgid);
216 if (r < 0) { 220 if (r < 0) {
217 up_read(&osdc->map_sem); 221 up_read(&osdc->map_sem);
218 return r; 222 return r;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index d90861f45210..f4f050a69a48 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -63,7 +63,7 @@ static const struct ceph_connection_operations mds_con_ops;
63 */ 63 */
64static int parse_reply_info_in(void **p, void *end, 64static int parse_reply_info_in(void **p, void *end,
65 struct ceph_mds_reply_info_in *info, 65 struct ceph_mds_reply_info_in *info,
66 int features) 66 u64 features)
67{ 67{
68 int err = -EIO; 68 int err = -EIO;
69 69
@@ -98,7 +98,7 @@ bad:
98 */ 98 */
99static int parse_reply_info_trace(void **p, void *end, 99static int parse_reply_info_trace(void **p, void *end,
100 struct ceph_mds_reply_info_parsed *info, 100 struct ceph_mds_reply_info_parsed *info,
101 int features) 101 u64 features)
102{ 102{
103 int err; 103 int err;
104 104
@@ -145,7 +145,7 @@ out_bad:
145 */ 145 */
146static int parse_reply_info_dir(void **p, void *end, 146static int parse_reply_info_dir(void **p, void *end,
147 struct ceph_mds_reply_info_parsed *info, 147 struct ceph_mds_reply_info_parsed *info,
148 int features) 148 u64 features)
149{ 149{
150 u32 num, i = 0; 150 u32 num, i = 0;
151 int err; 151 int err;
@@ -217,7 +217,7 @@ out_bad:
217 */ 217 */
218static int parse_reply_info_filelock(void **p, void *end, 218static int parse_reply_info_filelock(void **p, void *end,
219 struct ceph_mds_reply_info_parsed *info, 219 struct ceph_mds_reply_info_parsed *info,
220 int features) 220 u64 features)
221{ 221{
222 if (*p + sizeof(*info->filelock_reply) > end) 222 if (*p + sizeof(*info->filelock_reply) > end)
223 goto bad; 223 goto bad;
@@ -238,7 +238,7 @@ bad:
238 */ 238 */
239static int parse_reply_info_create(void **p, void *end, 239static int parse_reply_info_create(void **p, void *end,
240 struct ceph_mds_reply_info_parsed *info, 240 struct ceph_mds_reply_info_parsed *info,
241 int features) 241 u64 features)
242{ 242{
243 if (features & CEPH_FEATURE_REPLY_CREATE_INODE) { 243 if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
244 if (*p == end) { 244 if (*p == end) {
@@ -262,7 +262,7 @@ bad:
262 */ 262 */
263static int parse_reply_info_extra(void **p, void *end, 263static int parse_reply_info_extra(void **p, void *end,
264 struct ceph_mds_reply_info_parsed *info, 264 struct ceph_mds_reply_info_parsed *info,
265 int features) 265 u64 features)
266{ 266{
267 if (info->head->op == CEPH_MDS_OP_GETFILELOCK) 267 if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
268 return parse_reply_info_filelock(p, end, info, features); 268 return parse_reply_info_filelock(p, end, info, features);
@@ -280,7 +280,7 @@ static int parse_reply_info_extra(void **p, void *end,
280 */ 280 */
281static int parse_reply_info(struct ceph_msg *msg, 281static int parse_reply_info(struct ceph_msg *msg,
282 struct ceph_mds_reply_info_parsed *info, 282 struct ceph_mds_reply_info_parsed *info,
283 int features) 283 u64 features)
284{ 284{
285 void *p, *end; 285 void *p, *end;
286 u32 len; 286 u32 len;
@@ -713,14 +713,15 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
713 struct dentry *dn = get_nonsnap_parent(parent); 713 struct dentry *dn = get_nonsnap_parent(parent);
714 inode = dn->d_inode; 714 inode = dn->d_inode;
715 dout("__choose_mds using nonsnap parent %p\n", inode); 715 dout("__choose_mds using nonsnap parent %p\n", inode);
716 } else if (req->r_dentry->d_inode) { 716 } else {
717 /* dentry target */ 717 /* dentry target */
718 inode = req->r_dentry->d_inode; 718 inode = req->r_dentry->d_inode;
719 } else { 719 if (!inode || mode == USE_AUTH_MDS) {
720 /* dir + name */ 720 /* dir + name */
721 inode = dir; 721 inode = dir;
722 hash = ceph_dentry_hash(dir, req->r_dentry); 722 hash = ceph_dentry_hash(dir, req->r_dentry);
723 is_hash = true; 723 is_hash = true;
724 }
724 } 725 }
725 } 726 }
726 727
@@ -846,35 +847,56 @@ static int __open_session(struct ceph_mds_client *mdsc,
846 * 847 *
847 * called under mdsc->mutex 848 * called under mdsc->mutex
848 */ 849 */
850static struct ceph_mds_session *
851__open_export_target_session(struct ceph_mds_client *mdsc, int target)
852{
853 struct ceph_mds_session *session;
854
855 session = __ceph_lookup_mds_session(mdsc, target);
856 if (!session) {
857 session = register_session(mdsc, target);
858 if (IS_ERR(session))
859 return session;
860 }
861 if (session->s_state == CEPH_MDS_SESSION_NEW ||
862 session->s_state == CEPH_MDS_SESSION_CLOSING)
863 __open_session(mdsc, session);
864
865 return session;
866}
867
868struct ceph_mds_session *
869ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
870{
871 struct ceph_mds_session *session;
872
873 dout("open_export_target_session to mds%d\n", target);
874
875 mutex_lock(&mdsc->mutex);
876 session = __open_export_target_session(mdsc, target);
877 mutex_unlock(&mdsc->mutex);
878
879 return session;
880}
881
849static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 882static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
850 struct ceph_mds_session *session) 883 struct ceph_mds_session *session)
851{ 884{
852 struct ceph_mds_info *mi; 885 struct ceph_mds_info *mi;
853 struct ceph_mds_session *ts; 886 struct ceph_mds_session *ts;
854 int i, mds = session->s_mds; 887 int i, mds = session->s_mds;
855 int target;
856 888
857 if (mds >= mdsc->mdsmap->m_max_mds) 889 if (mds >= mdsc->mdsmap->m_max_mds)
858 return; 890 return;
891
859 mi = &mdsc->mdsmap->m_info[mds]; 892 mi = &mdsc->mdsmap->m_info[mds];
860 dout("open_export_target_sessions for mds%d (%d targets)\n", 893 dout("open_export_target_sessions for mds%d (%d targets)\n",
861 session->s_mds, mi->num_export_targets); 894 session->s_mds, mi->num_export_targets);
862 895
863 for (i = 0; i < mi->num_export_targets; i++) { 896 for (i = 0; i < mi->num_export_targets; i++) {
864 target = mi->export_targets[i]; 897 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
865 ts = __ceph_lookup_mds_session(mdsc, target); 898 if (!IS_ERR(ts))
866 if (!ts) { 899 ceph_put_mds_session(ts);
867 ts = register_session(mdsc, target);
868 if (IS_ERR(ts))
869 return;
870 }
871 if (session->s_state == CEPH_MDS_SESSION_NEW ||
872 session->s_state == CEPH_MDS_SESSION_CLOSING)
873 __open_session(mdsc, session);
874 else
875 dout(" mds%d target mds%d %p is %s\n", session->s_mds,
876 i, ts, session_state_name(ts->s_state));
877 ceph_put_mds_session(ts);
878 } 900 }
879} 901}
880 902
@@ -1136,6 +1158,21 @@ static int send_renew_caps(struct ceph_mds_client *mdsc,
1136 return 0; 1158 return 0;
1137} 1159}
1138 1160
1161static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1162 struct ceph_mds_session *session, u64 seq)
1163{
1164 struct ceph_msg *msg;
1165
1166 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1167 session->s_mds, session_state_name(session->s_state), seq);
1168 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1169 if (!msg)
1170 return -ENOMEM;
1171 ceph_con_send(&session->s_con, msg);
1172 return 0;
1173}
1174
1175
1139/* 1176/*
1140 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1177 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1141 * 1178 *
@@ -1214,7 +1251,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1214{ 1251{
1215 struct ceph_mds_session *session = arg; 1252 struct ceph_mds_session *session = arg;
1216 struct ceph_inode_info *ci = ceph_inode(inode); 1253 struct ceph_inode_info *ci = ceph_inode(inode);
1217 int used, oissued, mine; 1254 int used, wanted, oissued, mine;
1218 1255
1219 if (session->s_trim_caps <= 0) 1256 if (session->s_trim_caps <= 0)
1220 return -1; 1257 return -1;
@@ -1222,14 +1259,19 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1222 spin_lock(&ci->i_ceph_lock); 1259 spin_lock(&ci->i_ceph_lock);
1223 mine = cap->issued | cap->implemented; 1260 mine = cap->issued | cap->implemented;
1224 used = __ceph_caps_used(ci); 1261 used = __ceph_caps_used(ci);
1262 wanted = __ceph_caps_file_wanted(ci);
1225 oissued = __ceph_caps_issued_other(ci, cap); 1263 oissued = __ceph_caps_issued_other(ci, cap);
1226 1264
1227 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n", 1265 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1228 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1266 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1229 ceph_cap_string(used)); 1267 ceph_cap_string(used), ceph_cap_string(wanted));
1230 if (ci->i_dirty_caps) 1268 if (cap == ci->i_auth_cap) {
1231 goto out; /* dirty caps */ 1269 if (ci->i_dirty_caps | ci->i_flushing_caps)
1232 if ((used & ~oissued) & mine) 1270 goto out;
1271 if ((used | wanted) & CEPH_CAP_ANY_WR)
1272 goto out;
1273 }
1274 if ((used | wanted) & ~oissued & mine)
1233 goto out; /* we need these caps */ 1275 goto out; /* we need these caps */
1234 1276
1235 session->s_trim_caps--; 1277 session->s_trim_caps--;
@@ -2156,26 +2198,16 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2156 */ 2198 */
2157 if (result == -ESTALE) { 2199 if (result == -ESTALE) {
2158 dout("got ESTALE on request %llu", req->r_tid); 2200 dout("got ESTALE on request %llu", req->r_tid);
2159 if (!req->r_inode) { 2201 if (req->r_direct_mode != USE_AUTH_MDS) {
2160 /* do nothing; not an authority problem */
2161 } else if (req->r_direct_mode != USE_AUTH_MDS) {
2162 dout("not using auth, setting for that now"); 2202 dout("not using auth, setting for that now");
2163 req->r_direct_mode = USE_AUTH_MDS; 2203 req->r_direct_mode = USE_AUTH_MDS;
2164 __do_request(mdsc, req); 2204 __do_request(mdsc, req);
2165 mutex_unlock(&mdsc->mutex); 2205 mutex_unlock(&mdsc->mutex);
2166 goto out; 2206 goto out;
2167 } else { 2207 } else {
2168 struct ceph_inode_info *ci = ceph_inode(req->r_inode); 2208 int mds = __choose_mds(mdsc, req);
2169 struct ceph_cap *cap = NULL; 2209 if (mds >= 0 && mds != req->r_session->s_mds) {
2170 2210 dout("but auth changed, so resending");
2171 if (req->r_session)
2172 cap = ceph_get_cap_for_mds(ci,
2173 req->r_session->s_mds);
2174
2175 dout("already using auth");
2176 if ((!cap || cap != ci->i_auth_cap) ||
2177 (cap->mseq != req->r_sent_on_mseq)) {
2178 dout("but cap changed, so resending");
2179 __do_request(mdsc, req); 2211 __do_request(mdsc, req);
2180 mutex_unlock(&mdsc->mutex); 2212 mutex_unlock(&mdsc->mutex);
2181 goto out; 2213 goto out;
@@ -2400,6 +2432,10 @@ static void handle_session(struct ceph_mds_session *session,
2400 trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 2432 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2401 break; 2433 break;
2402 2434
2435 case CEPH_SESSION_FLUSHMSG:
2436 send_flushmsg_ack(mdsc, session, seq);
2437 break;
2438
2403 default: 2439 default:
2404 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 2440 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2405 WARN_ON(1); 2441 WARN_ON(1);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 4c053d099ae4..68288917c737 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -383,6 +383,8 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
383extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, 383extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
384 struct ceph_msg *msg); 384 struct ceph_msg *msg);
385 385
386extern struct ceph_mds_session *
387ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target);
386extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 388extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
387 struct ceph_mds_session *session); 389 struct ceph_mds_session *session);
388 390
diff --git a/fs/ceph/strings.c b/fs/ceph/strings.c
index 89fa4a940a0f..4440f447fd3f 100644
--- a/fs/ceph/strings.c
+++ b/fs/ceph/strings.c
@@ -41,6 +41,8 @@ const char *ceph_session_op_name(int op)
41 case CEPH_SESSION_RENEWCAPS: return "renewcaps"; 41 case CEPH_SESSION_RENEWCAPS: return "renewcaps";
42 case CEPH_SESSION_STALE: return "stale"; 42 case CEPH_SESSION_STALE: return "stale";
43 case CEPH_SESSION_RECALL_STATE: return "recall_state"; 43 case CEPH_SESSION_RECALL_STATE: return "recall_state";
44 case CEPH_SESSION_FLUSHMSG: return "flushmsg";
45 case CEPH_SESSION_FLUSHMSG_ACK: return "flushmsg_ack";
44 } 46 }
45 return "???"; 47 return "???";
46} 48}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 6a0951e43044..2df963f1cf5a 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -490,10 +490,10 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
490 struct ceph_options *opt) 490 struct ceph_options *opt)
491{ 491{
492 struct ceph_fs_client *fsc; 492 struct ceph_fs_client *fsc;
493 const unsigned supported_features = 493 const u64 supported_features =
494 CEPH_FEATURE_FLOCK | 494 CEPH_FEATURE_FLOCK |
495 CEPH_FEATURE_DIRLAYOUTHASH; 495 CEPH_FEATURE_DIRLAYOUTHASH;
496 const unsigned required_features = 0; 496 const u64 required_features = 0;
497 int page_count; 497 int page_count;
498 size_t size; 498 size_t size;
499 int err = -ENOMEM; 499 int err = -ENOMEM;
@@ -686,6 +686,7 @@ static const struct super_operations ceph_super_ops = {
686 .alloc_inode = ceph_alloc_inode, 686 .alloc_inode = ceph_alloc_inode,
687 .destroy_inode = ceph_destroy_inode, 687 .destroy_inode = ceph_destroy_inode,
688 .write_inode = ceph_write_inode, 688 .write_inode = ceph_write_inode,
689 .drop_inode = ceph_drop_inode,
689 .sync_fs = ceph_sync_fs, 690 .sync_fs = ceph_sync_fs,
690 .put_super = ceph_put_super, 691 .put_super = ceph_put_super,
691 .show_options = ceph_show_options, 692 .show_options = ceph_show_options,
@@ -818,7 +819,11 @@ static int ceph_set_super(struct super_block *s, void *data)
818 819
819 s->s_flags = fsc->mount_options->sb_flags; 820 s->s_flags = fsc->mount_options->sb_flags;
820 s->s_maxbytes = 1ULL << 40; /* temp value until we get mdsmap */ 821 s->s_maxbytes = 1ULL << 40; /* temp value until we get mdsmap */
822#ifdef CONFIG_CEPH_FS_POSIX_ACL
823 s->s_flags |= MS_POSIXACL;
824#endif
821 825
826 s->s_xattr = ceph_xattr_handlers;
822 s->s_fs_info = fsc; 827 s->s_fs_info = fsc;
823 fsc->sb = s; 828 fsc->sb = s;
824 829
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index ef4ac38bb614..c299f7d19bf3 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -287,14 +287,12 @@ struct ceph_inode_info {
287 unsigned long i_hold_caps_min; /* jiffies */ 287 unsigned long i_hold_caps_min; /* jiffies */
288 unsigned long i_hold_caps_max; /* jiffies */ 288 unsigned long i_hold_caps_max; /* jiffies */
289 struct list_head i_cap_delay_list; /* for delayed cap release to mds */ 289 struct list_head i_cap_delay_list; /* for delayed cap release to mds */
290 int i_cap_exporting_mds; /* to handle cap migration between */
291 unsigned i_cap_exporting_mseq; /* mds's. */
292 unsigned i_cap_exporting_issued;
293 struct ceph_cap_reservation i_cap_migration_resv; 290 struct ceph_cap_reservation i_cap_migration_resv;
294 struct list_head i_cap_snaps; /* snapped state pending flush to mds */ 291 struct list_head i_cap_snaps; /* snapped state pending flush to mds */
295 struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 or 292 struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 or
296 dirty|flushing caps */ 293 dirty|flushing caps */
297 unsigned i_snap_caps; /* cap bits for snapped files */ 294 unsigned i_snap_caps; /* cap bits for snapped files */
295 unsigned i_cap_exporting_issued;
298 296
299 int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */ 297 int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */
300 298
@@ -335,7 +333,6 @@ struct ceph_inode_info {
335 u32 i_fscache_gen; /* sequence, for delayed fscache validate */ 333 u32 i_fscache_gen; /* sequence, for delayed fscache validate */
336 struct work_struct i_revalidate_work; 334 struct work_struct i_revalidate_work;
337#endif 335#endif
338
339 struct inode vfs_inode; /* at end */ 336 struct inode vfs_inode; /* at end */
340}; 337};
341 338
@@ -529,6 +526,8 @@ static inline int __ceph_caps_dirty(struct ceph_inode_info *ci)
529} 526}
530extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask); 527extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask);
531 528
529extern int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
530 struct ceph_cap *ocap, int mask);
532extern int ceph_caps_revoking(struct ceph_inode_info *ci, int mask); 531extern int ceph_caps_revoking(struct ceph_inode_info *ci, int mask);
533extern int __ceph_caps_used(struct ceph_inode_info *ci); 532extern int __ceph_caps_used(struct ceph_inode_info *ci);
534 533
@@ -691,6 +690,7 @@ extern const struct inode_operations ceph_file_iops;
691 690
692extern struct inode *ceph_alloc_inode(struct super_block *sb); 691extern struct inode *ceph_alloc_inode(struct super_block *sb);
693extern void ceph_destroy_inode(struct inode *inode); 692extern void ceph_destroy_inode(struct inode *inode);
693extern int ceph_drop_inode(struct inode *inode);
694 694
695extern struct inode *ceph_get_inode(struct super_block *sb, 695extern struct inode *ceph_get_inode(struct super_block *sb,
696 struct ceph_vino vino); 696 struct ceph_vino vino);
@@ -724,6 +724,9 @@ extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
724/* xattr.c */ 724/* xattr.c */
725extern int ceph_setxattr(struct dentry *, const char *, const void *, 725extern int ceph_setxattr(struct dentry *, const char *, const void *,
726 size_t, int); 726 size_t, int);
727int __ceph_setxattr(struct dentry *, const char *, const void *, size_t, int);
728ssize_t __ceph_getxattr(struct inode *, const char *, void *, size_t);
729int __ceph_removexattr(struct dentry *, const char *);
727extern ssize_t ceph_getxattr(struct dentry *, const char *, void *, size_t); 730extern ssize_t ceph_getxattr(struct dentry *, const char *, void *, size_t);
728extern ssize_t ceph_listxattr(struct dentry *, char *, size_t); 731extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
729extern int ceph_removexattr(struct dentry *, const char *); 732extern int ceph_removexattr(struct dentry *, const char *);
@@ -732,6 +735,39 @@ extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci);
732extern void __init ceph_xattr_init(void); 735extern void __init ceph_xattr_init(void);
733extern void ceph_xattr_exit(void); 736extern void ceph_xattr_exit(void);
734 737
738/* acl.c */
739extern const struct xattr_handler ceph_xattr_acl_access_handler;
740extern const struct xattr_handler ceph_xattr_acl_default_handler;
741extern const struct xattr_handler *ceph_xattr_handlers[];
742
743#ifdef CONFIG_CEPH_FS_POSIX_ACL
744
745struct posix_acl *ceph_get_acl(struct inode *, int);
746int ceph_init_acl(struct dentry *, struct inode *, struct inode *);
747int ceph_acl_chmod(struct dentry *, struct inode *);
748void ceph_forget_all_cached_acls(struct inode *inode);
749
750#else
751
752#define ceph_get_acl NULL
753
754static inline int ceph_init_acl(struct dentry *dentry, struct inode *inode,
755 struct inode *dir)
756{
757 return 0;
758}
759
760static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode)
761{
762 return 0;
763}
764
765static inline void ceph_forget_all_cached_acls(struct inode *inode)
766{
767}
768
769#endif
770
735/* caps.c */ 771/* caps.c */
736extern const char *ceph_cap_string(int c); 772extern const char *ceph_cap_string(int c);
737extern void ceph_handle_caps(struct ceph_mds_session *session, 773extern void ceph_handle_caps(struct ceph_mds_session *session,
@@ -744,6 +780,7 @@ extern int ceph_add_cap(struct inode *inode,
744extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release); 780extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
745extern void ceph_put_cap(struct ceph_mds_client *mdsc, 781extern void ceph_put_cap(struct ceph_mds_client *mdsc,
746 struct ceph_cap *cap); 782 struct ceph_cap *cap);
783extern int ceph_is_any_caps(struct inode *inode);
747 784
748extern void __queue_cap_release(struct ceph_mds_session *session, u64 ino, 785extern void __queue_cap_release(struct ceph_mds_session *session, u64 ino,
749 u64 cap_id, u32 migrate_seq, u32 issue_seq); 786 u64 cap_id, u32 migrate_seq, u32 issue_seq);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index be661d8f532a..c7581f3733c1 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -11,11 +11,24 @@
11#define XATTR_CEPH_PREFIX "ceph." 11#define XATTR_CEPH_PREFIX "ceph."
12#define XATTR_CEPH_PREFIX_LEN (sizeof (XATTR_CEPH_PREFIX) - 1) 12#define XATTR_CEPH_PREFIX_LEN (sizeof (XATTR_CEPH_PREFIX) - 1)
13 13
14/*
15 * List of handlers for synthetic system.* attributes. Other
16 * attributes are handled directly.
17 */
18const struct xattr_handler *ceph_xattr_handlers[] = {
19#ifdef CONFIG_CEPH_FS_POSIX_ACL
20 &ceph_xattr_acl_access_handler,
21 &ceph_xattr_acl_default_handler,
22#endif
23 NULL,
24};
25
14static bool ceph_is_valid_xattr(const char *name) 26static bool ceph_is_valid_xattr(const char *name)
15{ 27{
16 return !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) || 28 return !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) ||
17 !strncmp(name, XATTR_SECURITY_PREFIX, 29 !strncmp(name, XATTR_SECURITY_PREFIX,
18 XATTR_SECURITY_PREFIX_LEN) || 30 XATTR_SECURITY_PREFIX_LEN) ||
31 !strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN) ||
19 !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) || 32 !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) ||
20 !strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN); 33 !strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN);
21} 34}
@@ -663,10 +676,9 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
663 } 676 }
664} 677}
665 678
666ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, 679ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
667 size_t size) 680 size_t size)
668{ 681{
669 struct inode *inode = dentry->d_inode;
670 struct ceph_inode_info *ci = ceph_inode(inode); 682 struct ceph_inode_info *ci = ceph_inode(inode);
671 int err; 683 int err;
672 struct ceph_inode_xattr *xattr; 684 struct ceph_inode_xattr *xattr;
@@ -675,7 +687,6 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
675 if (!ceph_is_valid_xattr(name)) 687 if (!ceph_is_valid_xattr(name))
676 return -ENODATA; 688 return -ENODATA;
677 689
678
679 /* let's see if a virtual xattr was requested */ 690 /* let's see if a virtual xattr was requested */
680 vxattr = ceph_match_vxattr(inode, name); 691 vxattr = ceph_match_vxattr(inode, name);
681 if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) { 692 if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) {
@@ -725,6 +736,15 @@ out:
725 return err; 736 return err;
726} 737}
727 738
739ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
740 size_t size)
741{
742 if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
743 return generic_getxattr(dentry, name, value, size);
744
745 return __ceph_getxattr(dentry->d_inode, name, value, size);
746}
747
728ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size) 748ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
729{ 749{
730 struct inode *inode = dentry->d_inode; 750 struct inode *inode = dentry->d_inode;
@@ -863,8 +883,8 @@ out:
863 return err; 883 return err;
864} 884}
865 885
866int ceph_setxattr(struct dentry *dentry, const char *name, 886int __ceph_setxattr(struct dentry *dentry, const char *name,
867 const void *value, size_t size, int flags) 887 const void *value, size_t size, int flags)
868{ 888{
869 struct inode *inode = dentry->d_inode; 889 struct inode *inode = dentry->d_inode;
870 struct ceph_vxattr *vxattr; 890 struct ceph_vxattr *vxattr;
@@ -879,9 +899,6 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
879 struct ceph_inode_xattr *xattr = NULL; 899 struct ceph_inode_xattr *xattr = NULL;
880 int required_blob_size; 900 int required_blob_size;
881 901
882 if (ceph_snap(inode) != CEPH_NOSNAP)
883 return -EROFS;
884
885 if (!ceph_is_valid_xattr(name)) 902 if (!ceph_is_valid_xattr(name))
886 return -EOPNOTSUPP; 903 return -EOPNOTSUPP;
887 904
@@ -958,6 +975,18 @@ out:
958 return err; 975 return err;
959} 976}
960 977
978int ceph_setxattr(struct dentry *dentry, const char *name,
979 const void *value, size_t size, int flags)
980{
981 if (ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
982 return -EROFS;
983
984 if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
985 return generic_setxattr(dentry, name, value, size, flags);
986
987 return __ceph_setxattr(dentry, name, value, size, flags);
988}
989
961static int ceph_send_removexattr(struct dentry *dentry, const char *name) 990static int ceph_send_removexattr(struct dentry *dentry, const char *name)
962{ 991{
963 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 992 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
@@ -984,7 +1013,7 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name)
984 return err; 1013 return err;
985} 1014}
986 1015
987int ceph_removexattr(struct dentry *dentry, const char *name) 1016int __ceph_removexattr(struct dentry *dentry, const char *name)
988{ 1017{
989 struct inode *inode = dentry->d_inode; 1018 struct inode *inode = dentry->d_inode;
990 struct ceph_vxattr *vxattr; 1019 struct ceph_vxattr *vxattr;
@@ -994,9 +1023,6 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
994 int required_blob_size; 1023 int required_blob_size;
995 int dirty; 1024 int dirty;
996 1025
997 if (ceph_snap(inode) != CEPH_NOSNAP)
998 return -EROFS;
999
1000 if (!ceph_is_valid_xattr(name)) 1026 if (!ceph_is_valid_xattr(name))
1001 return -EOPNOTSUPP; 1027 return -EOPNOTSUPP;
1002 1028
@@ -1053,3 +1079,13 @@ out:
1053 return err; 1079 return err;
1054} 1080}
1055 1081
1082int ceph_removexattr(struct dentry *dentry, const char *name)
1083{
1084 if (ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
1085 return -EROFS;
1086
1087 if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
1088 return generic_removexattr(dentry, name);
1089
1090 return __ceph_removexattr(dentry, name);
1091}