aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/Kconfig13
-rw-r--r--fs/ceph/Makefile1
-rw-r--r--fs/ceph/acl.c332
-rw-r--r--fs/ceph/addr.c93
-rw-r--r--fs/ceph/cache.h13
-rw-r--r--fs/ceph/caps.c338
-rw-r--r--fs/ceph/dir.c16
-rw-r--r--fs/ceph/file.c437
-rw-r--r--fs/ceph/inode.c33
-rw-r--r--fs/ceph/ioctl.c8
-rw-r--r--fs/ceph/mds_client.c132
-rw-r--r--fs/ceph/mds_client.h2
-rw-r--r--fs/ceph/strings.c2
-rw-r--r--fs/ceph/super.c9
-rw-r--r--fs/ceph/super.h45
-rw-r--r--fs/ceph/xattr.c60
16 files changed, 1202 insertions, 332 deletions
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
index ac9a2ef5bb9b..264e9bf83ff3 100644
--- a/fs/ceph/Kconfig
+++ b/fs/ceph/Kconfig
@@ -25,3 +25,16 @@ config CEPH_FSCACHE
25 caching support for Ceph clients using FS-Cache 25 caching support for Ceph clients using FS-Cache
26 26
27endif 27endif
28
29config CEPH_FS_POSIX_ACL
30 bool "Ceph POSIX Access Control Lists"
31 depends on CEPH_FS
32 select FS_POSIX_ACL
33 help
34 POSIX Access Control Lists (ACLs) support permissions for users and
35 groups beyond the owner/group/world scheme.
36
37 To learn more about Access Control Lists, visit the POSIX ACLs for
38 Linux website <http://acl.bestbits.at/>.
39
40 If you don't know what Access Control Lists are, say N
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index 32e30106a2f0..85a4230b9bff 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -10,3 +10,4 @@ ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
10 debugfs.o 10 debugfs.o
11 11
12ceph-$(CONFIG_CEPH_FSCACHE) += cache.o 12ceph-$(CONFIG_CEPH_FSCACHE) += cache.o
13ceph-$(CONFIG_CEPH_FS_POSIX_ACL) += acl.o
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
new file mode 100644
index 000000000000..64fddbc1d17b
--- /dev/null
+++ b/fs/ceph/acl.c
@@ -0,0 +1,332 @@
1/*
2 * linux/fs/ceph/acl.c
3 *
4 * Copyright (C) 2013 Guangliang Zhao, <lucienchao@gmail.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public
8 * License v2 as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public
16 * License along with this program; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 021110-1307, USA.
19 */
20
21#include <linux/ceph/ceph_debug.h>
22#include <linux/fs.h>
23#include <linux/string.h>
24#include <linux/xattr.h>
25#include <linux/posix_acl_xattr.h>
26#include <linux/posix_acl.h>
27#include <linux/sched.h>
28#include <linux/slab.h>
29
30#include "super.h"
31
32static inline void ceph_set_cached_acl(struct inode *inode,
33 int type, struct posix_acl *acl)
34{
35 struct ceph_inode_info *ci = ceph_inode(inode);
36
37 spin_lock(&ci->i_ceph_lock);
38 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0))
39 set_cached_acl(inode, type, acl);
40 spin_unlock(&ci->i_ceph_lock);
41}
42
43static inline struct posix_acl *ceph_get_cached_acl(struct inode *inode,
44 int type)
45{
46 struct ceph_inode_info *ci = ceph_inode(inode);
47 struct posix_acl *acl = ACL_NOT_CACHED;
48
49 spin_lock(&ci->i_ceph_lock);
50 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0))
51 acl = get_cached_acl(inode, type);
52 spin_unlock(&ci->i_ceph_lock);
53
54 return acl;
55}
56
57void ceph_forget_all_cached_acls(struct inode *inode)
58{
59 forget_all_cached_acls(inode);
60}
61
62struct posix_acl *ceph_get_acl(struct inode *inode, int type)
63{
64 int size;
65 const char *name;
66 char *value = NULL;
67 struct posix_acl *acl;
68
69 if (!IS_POSIXACL(inode))
70 return NULL;
71
72 acl = ceph_get_cached_acl(inode, type);
73 if (acl != ACL_NOT_CACHED)
74 return acl;
75
76 switch (type) {
77 case ACL_TYPE_ACCESS:
78 name = POSIX_ACL_XATTR_ACCESS;
79 break;
80 case ACL_TYPE_DEFAULT:
81 name = POSIX_ACL_XATTR_DEFAULT;
82 break;
83 default:
84 BUG();
85 }
86
87 size = __ceph_getxattr(inode, name, "", 0);
88 if (size > 0) {
89 value = kzalloc(size, GFP_NOFS);
90 if (!value)
91 return ERR_PTR(-ENOMEM);
92 size = __ceph_getxattr(inode, name, value, size);
93 }
94
95 if (size > 0)
96 acl = posix_acl_from_xattr(&init_user_ns, value, size);
97 else if (size == -ERANGE || size == -ENODATA || size == 0)
98 acl = NULL;
99 else
100 acl = ERR_PTR(-EIO);
101
102 kfree(value);
103
104 if (!IS_ERR(acl))
105 ceph_set_cached_acl(inode, type, acl);
106
107 return acl;
108}
109
110static int ceph_set_acl(struct dentry *dentry, struct inode *inode,
111 struct posix_acl *acl, int type)
112{
113 int ret = 0, size = 0;
114 const char *name = NULL;
115 char *value = NULL;
116 struct iattr newattrs;
117 umode_t new_mode = inode->i_mode, old_mode = inode->i_mode;
118
119 if (acl) {
120 ret = posix_acl_valid(acl);
121 if (ret < 0)
122 goto out;
123 }
124
125 switch (type) {
126 case ACL_TYPE_ACCESS:
127 name = POSIX_ACL_XATTR_ACCESS;
128 if (acl) {
129 ret = posix_acl_equiv_mode(acl, &new_mode);
130 if (ret < 0)
131 goto out;
132 if (ret == 0)
133 acl = NULL;
134 }
135 break;
136 case ACL_TYPE_DEFAULT:
137 if (!S_ISDIR(inode->i_mode)) {
138 ret = acl ? -EINVAL : 0;
139 goto out;
140 }
141 name = POSIX_ACL_XATTR_DEFAULT;
142 break;
143 default:
144 ret = -EINVAL;
145 goto out;
146 }
147
148 if (acl) {
149 size = posix_acl_xattr_size(acl->a_count);
150 value = kmalloc(size, GFP_NOFS);
151 if (!value) {
152 ret = -ENOMEM;
153 goto out;
154 }
155
156 ret = posix_acl_to_xattr(&init_user_ns, acl, value, size);
157 if (ret < 0)
158 goto out_free;
159 }
160
161 if (new_mode != old_mode) {
162 newattrs.ia_mode = new_mode;
163 newattrs.ia_valid = ATTR_MODE;
164 ret = ceph_setattr(dentry, &newattrs);
165 if (ret)
166 goto out_free;
167 }
168
169 if (value)
170 ret = __ceph_setxattr(dentry, name, value, size, 0);
171 else
172 ret = __ceph_removexattr(dentry, name);
173
174 if (ret) {
175 if (new_mode != old_mode) {
176 newattrs.ia_mode = old_mode;
177 newattrs.ia_valid = ATTR_MODE;
178 ceph_setattr(dentry, &newattrs);
179 }
180 goto out_free;
181 }
182
183 ceph_set_cached_acl(inode, type, acl);
184
185out_free:
186 kfree(value);
187out:
188 return ret;
189}
190
191int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir)
192{
193 struct posix_acl *acl = NULL;
194 int ret = 0;
195
196 if (!S_ISLNK(inode->i_mode)) {
197 if (IS_POSIXACL(dir)) {
198 acl = ceph_get_acl(dir, ACL_TYPE_DEFAULT);
199 if (IS_ERR(acl)) {
200 ret = PTR_ERR(acl);
201 goto out;
202 }
203 }
204
205 if (!acl)
206 inode->i_mode &= ~current_umask();
207 }
208
209 if (IS_POSIXACL(dir) && acl) {
210 if (S_ISDIR(inode->i_mode)) {
211 ret = ceph_set_acl(dentry, inode, acl,
212 ACL_TYPE_DEFAULT);
213 if (ret)
214 goto out_release;
215 }
216 ret = posix_acl_create(&acl, GFP_NOFS, &inode->i_mode);
217 if (ret < 0)
218 goto out;
219 else if (ret > 0)
220 ret = ceph_set_acl(dentry, inode, acl, ACL_TYPE_ACCESS);
221 else
222 cache_no_acl(inode);
223 } else {
224 cache_no_acl(inode);
225 }
226
227out_release:
228 posix_acl_release(acl);
229out:
230 return ret;
231}
232
233int ceph_acl_chmod(struct dentry *dentry, struct inode *inode)
234{
235 struct posix_acl *acl;
236 int ret = 0;
237
238 if (S_ISLNK(inode->i_mode)) {
239 ret = -EOPNOTSUPP;
240 goto out;
241 }
242
243 if (!IS_POSIXACL(inode))
244 goto out;
245
246 acl = ceph_get_acl(inode, ACL_TYPE_ACCESS);
247 if (IS_ERR_OR_NULL(acl)) {
248 ret = PTR_ERR(acl);
249 goto out;
250 }
251
252 ret = posix_acl_chmod(&acl, GFP_KERNEL, inode->i_mode);
253 if (ret)
254 goto out;
255 ret = ceph_set_acl(dentry, inode, acl, ACL_TYPE_ACCESS);
256 posix_acl_release(acl);
257out:
258 return ret;
259}
260
261static int ceph_xattr_acl_get(struct dentry *dentry, const char *name,
262 void *value, size_t size, int type)
263{
264 struct posix_acl *acl;
265 int ret = 0;
266
267 if (!IS_POSIXACL(dentry->d_inode))
268 return -EOPNOTSUPP;
269
270 acl = ceph_get_acl(dentry->d_inode, type);
271 if (IS_ERR(acl))
272 return PTR_ERR(acl);
273 if (acl == NULL)
274 return -ENODATA;
275
276 ret = posix_acl_to_xattr(&init_user_ns, acl, value, size);
277 posix_acl_release(acl);
278
279 return ret;
280}
281
282static int ceph_xattr_acl_set(struct dentry *dentry, const char *name,
283 const void *value, size_t size, int flags, int type)
284{
285 int ret = 0;
286 struct posix_acl *acl = NULL;
287
288 if (!inode_owner_or_capable(dentry->d_inode)) {
289 ret = -EPERM;
290 goto out;
291 }
292
293 if (!IS_POSIXACL(dentry->d_inode)) {
294 ret = -EOPNOTSUPP;
295 goto out;
296 }
297
298 if (value) {
299 acl = posix_acl_from_xattr(&init_user_ns, value, size);
300 if (IS_ERR(acl)) {
301 ret = PTR_ERR(acl);
302 goto out;
303 }
304
305 if (acl) {
306 ret = posix_acl_valid(acl);
307 if (ret)
308 goto out_release;
309 }
310 }
311
312 ret = ceph_set_acl(dentry, dentry->d_inode, acl, type);
313
314out_release:
315 posix_acl_release(acl);
316out:
317 return ret;
318}
319
320const struct xattr_handler ceph_xattr_acl_default_handler = {
321 .prefix = POSIX_ACL_XATTR_DEFAULT,
322 .flags = ACL_TYPE_DEFAULT,
323 .get = ceph_xattr_acl_get,
324 .set = ceph_xattr_acl_set,
325};
326
327const struct xattr_handler ceph_xattr_acl_access_handler = {
328 .prefix = POSIX_ACL_XATTR_ACCESS,
329 .flags = ACL_TYPE_ACCESS,
330 .get = ceph_xattr_acl_get,
331 .set = ceph_xattr_acl_set,
332};
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index ec3ba43b9faa..b53278c9fd97 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -209,6 +209,7 @@ static int readpage_nounlock(struct file *filp, struct page *page)
209 err = 0; 209 err = 0;
210 if (err < 0) { 210 if (err < 0) {
211 SetPageError(page); 211 SetPageError(page);
212 ceph_fscache_readpage_cancel(inode, page);
212 goto out; 213 goto out;
213 } else { 214 } else {
214 if (err < PAGE_CACHE_SIZE) { 215 if (err < PAGE_CACHE_SIZE) {
@@ -256,6 +257,8 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
256 for (i = 0; i < num_pages; i++) { 257 for (i = 0; i < num_pages; i++) {
257 struct page *page = osd_data->pages[i]; 258 struct page *page = osd_data->pages[i];
258 259
260 if (rc < 0)
261 goto unlock;
259 if (bytes < (int)PAGE_CACHE_SIZE) { 262 if (bytes < (int)PAGE_CACHE_SIZE) {
260 /* zero (remainder of) page */ 263 /* zero (remainder of) page */
261 int s = bytes < 0 ? 0 : bytes; 264 int s = bytes < 0 ? 0 : bytes;
@@ -266,6 +269,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
266 flush_dcache_page(page); 269 flush_dcache_page(page);
267 SetPageUptodate(page); 270 SetPageUptodate(page);
268 ceph_readpage_to_fscache(inode, page); 271 ceph_readpage_to_fscache(inode, page);
272unlock:
269 unlock_page(page); 273 unlock_page(page);
270 page_cache_release(page); 274 page_cache_release(page);
271 bytes -= PAGE_CACHE_SIZE; 275 bytes -= PAGE_CACHE_SIZE;
@@ -1207,6 +1211,41 @@ const struct address_space_operations ceph_aops = {
1207/* 1211/*
1208 * vm ops 1212 * vm ops
1209 */ 1213 */
1214static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1215{
1216 struct inode *inode = file_inode(vma->vm_file);
1217 struct ceph_inode_info *ci = ceph_inode(inode);
1218 struct ceph_file_info *fi = vma->vm_file->private_data;
1219 loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT;
1220 int want, got, ret;
1221
1222 dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n",
1223 inode, ceph_vinop(inode), off, (size_t)PAGE_CACHE_SIZE);
1224 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1225 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
1226 else
1227 want = CEPH_CAP_FILE_CACHE;
1228 while (1) {
1229 got = 0;
1230 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
1231 if (ret == 0)
1232 break;
1233 if (ret != -ERESTARTSYS) {
1234 WARN_ON(1);
1235 return VM_FAULT_SIGBUS;
1236 }
1237 }
1238 dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
1239 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got));
1240
1241 ret = filemap_fault(vma, vmf);
1242
1243 dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
1244 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret);
1245 ceph_put_cap_refs(ci, got);
1246
1247 return ret;
1248}
1210 1249
1211/* 1250/*
1212 * Reuse write_begin here for simplicity. 1251 * Reuse write_begin here for simplicity.
@@ -1214,23 +1253,41 @@ const struct address_space_operations ceph_aops = {
1214static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) 1253static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1215{ 1254{
1216 struct inode *inode = file_inode(vma->vm_file); 1255 struct inode *inode = file_inode(vma->vm_file);
1217 struct page *page = vmf->page; 1256 struct ceph_inode_info *ci = ceph_inode(inode);
1257 struct ceph_file_info *fi = vma->vm_file->private_data;
1218 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 1258 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
1259 struct page *page = vmf->page;
1219 loff_t off = page_offset(page); 1260 loff_t off = page_offset(page);
1220 loff_t size, len; 1261 loff_t size = i_size_read(inode);
1221 int ret; 1262 size_t len;
1222 1263 int want, got, ret;
1223 /* Update time before taking page lock */
1224 file_update_time(vma->vm_file);
1225 1264
1226 size = i_size_read(inode);
1227 if (off + PAGE_CACHE_SIZE <= size) 1265 if (off + PAGE_CACHE_SIZE <= size)
1228 len = PAGE_CACHE_SIZE; 1266 len = PAGE_CACHE_SIZE;
1229 else 1267 else
1230 len = size & ~PAGE_CACHE_MASK; 1268 len = size & ~PAGE_CACHE_MASK;
1231 1269
1232 dout("page_mkwrite %p %llu~%llu page %p idx %lu\n", inode, 1270 dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n",
1233 off, len, page, page->index); 1271 inode, ceph_vinop(inode), off, len, size);
1272 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1273 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1274 else
1275 want = CEPH_CAP_FILE_BUFFER;
1276 while (1) {
1277 got = 0;
1278 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len);
1279 if (ret == 0)
1280 break;
1281 if (ret != -ERESTARTSYS) {
1282 WARN_ON(1);
1283 return VM_FAULT_SIGBUS;
1284 }
1285 }
1286 dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
1287 inode, off, len, ceph_cap_string(got));
1288
1289 /* Update time before taking page lock */
1290 file_update_time(vma->vm_file);
1234 1291
1235 lock_page(page); 1292 lock_page(page);
1236 1293
@@ -1252,14 +1309,26 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1252 ret = VM_FAULT_SIGBUS; 1309 ret = VM_FAULT_SIGBUS;
1253 } 1310 }
1254out: 1311out:
1255 dout("page_mkwrite %p %llu~%llu = %d\n", inode, off, len, ret); 1312 if (ret != VM_FAULT_LOCKED) {
1256 if (ret != VM_FAULT_LOCKED)
1257 unlock_page(page); 1313 unlock_page(page);
1314 } else {
1315 int dirty;
1316 spin_lock(&ci->i_ceph_lock);
1317 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
1318 spin_unlock(&ci->i_ceph_lock);
1319 if (dirty)
1320 __mark_inode_dirty(inode, dirty);
1321 }
1322
1323 dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n",
1324 inode, off, len, ceph_cap_string(got), ret);
1325 ceph_put_cap_refs(ci, got);
1326
1258 return ret; 1327 return ret;
1259} 1328}
1260 1329
1261static struct vm_operations_struct ceph_vmops = { 1330static struct vm_operations_struct ceph_vmops = {
1262 .fault = filemap_fault, 1331 .fault = ceph_filemap_fault,
1263 .page_mkwrite = ceph_page_mkwrite, 1332 .page_mkwrite = ceph_page_mkwrite,
1264 .remap_pages = generic_file_remap_pages, 1333 .remap_pages = generic_file_remap_pages,
1265}; 1334};
diff --git a/fs/ceph/cache.h b/fs/ceph/cache.h
index ba949408a336..da95f61b7a09 100644
--- a/fs/ceph/cache.h
+++ b/fs/ceph/cache.h
@@ -67,6 +67,14 @@ static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp)
67 return fscache_maybe_release_page(ci->fscache, page, gfp); 67 return fscache_maybe_release_page(ci->fscache, page, gfp);
68} 68}
69 69
70static inline void ceph_fscache_readpage_cancel(struct inode *inode,
71 struct page *page)
72{
73 struct ceph_inode_info *ci = ceph_inode(inode);
74 if (fscache_cookie_valid(ci->fscache) && PageFsCache(page))
75 __fscache_uncache_page(ci->fscache, page);
76}
77
70static inline void ceph_fscache_readpages_cancel(struct inode *inode, 78static inline void ceph_fscache_readpages_cancel(struct inode *inode,
71 struct list_head *pages) 79 struct list_head *pages)
72{ 80{
@@ -145,6 +153,11 @@ static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp)
145 return 1; 153 return 1;
146} 154}
147 155
156static inline void ceph_fscache_readpage_cancel(struct inode *inode,
157 struct page *page)
158{
159}
160
148static inline void ceph_fscache_readpages_cancel(struct inode *inode, 161static inline void ceph_fscache_readpages_cancel(struct inode *inode,
149 struct list_head *pages) 162 struct list_head *pages)
150{ 163{
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 3c0a4bd74996..17543383545c 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -555,21 +555,34 @@ retry:
555 cap->ci = ci; 555 cap->ci = ci;
556 __insert_cap_node(ci, cap); 556 __insert_cap_node(ci, cap);
557 557
558 /* clear out old exporting info? (i.e. on cap import) */
559 if (ci->i_cap_exporting_mds == mds) {
560 ci->i_cap_exporting_issued = 0;
561 ci->i_cap_exporting_mseq = 0;
562 ci->i_cap_exporting_mds = -1;
563 }
564
565 /* add to session cap list */ 558 /* add to session cap list */
566 cap->session = session; 559 cap->session = session;
567 spin_lock(&session->s_cap_lock); 560 spin_lock(&session->s_cap_lock);
568 list_add_tail(&cap->session_caps, &session->s_caps); 561 list_add_tail(&cap->session_caps, &session->s_caps);
569 session->s_nr_caps++; 562 session->s_nr_caps++;
570 spin_unlock(&session->s_cap_lock); 563 spin_unlock(&session->s_cap_lock);
571 } else if (new_cap) 564 } else {
572 ceph_put_cap(mdsc, new_cap); 565 if (new_cap)
566 ceph_put_cap(mdsc, new_cap);
567
568 /*
569 * auth mds of the inode changed. we received the cap export
570 * message, but still haven't received the cap import message.
571 * handle_cap_export() updated the new auth MDS' cap.
572 *
573 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing
574 * a message that was send before the cap import message. So
575 * don't remove caps.
576 */
577 if (ceph_seq_cmp(seq, cap->seq) <= 0) {
578 WARN_ON(cap != ci->i_auth_cap);
579 WARN_ON(cap->cap_id != cap_id);
580 seq = cap->seq;
581 mseq = cap->mseq;
582 issued |= cap->issued;
583 flags |= CEPH_CAP_FLAG_AUTH;
584 }
585 }
573 586
574 if (!ci->i_snap_realm) { 587 if (!ci->i_snap_realm) {
575 /* 588 /*
@@ -611,15 +624,9 @@ retry:
611 if (ci->i_auth_cap == NULL || 624 if (ci->i_auth_cap == NULL ||
612 ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) 625 ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0)
613 ci->i_auth_cap = cap; 626 ci->i_auth_cap = cap;
614 } else if (ci->i_auth_cap == cap) { 627 ci->i_cap_exporting_issued = 0;
615 ci->i_auth_cap = NULL; 628 } else {
616 spin_lock(&mdsc->cap_dirty_lock); 629 WARN_ON(ci->i_auth_cap == cap);
617 if (!list_empty(&ci->i_dirty_item)) {
618 dout(" moving %p to cap_dirty_migrating\n", inode);
619 list_move(&ci->i_dirty_item,
620 &mdsc->cap_dirty_migrating);
621 }
622 spin_unlock(&mdsc->cap_dirty_lock);
623 } 630 }
624 631
625 dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n", 632 dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n",
@@ -628,7 +635,7 @@ retry:
628 cap->cap_id = cap_id; 635 cap->cap_id = cap_id;
629 cap->issued = issued; 636 cap->issued = issued;
630 cap->implemented |= issued; 637 cap->implemented |= issued;
631 if (mseq > cap->mseq) 638 if (ceph_seq_cmp(mseq, cap->mseq) > 0)
632 cap->mds_wanted = wanted; 639 cap->mds_wanted = wanted;
633 else 640 else
634 cap->mds_wanted |= wanted; 641 cap->mds_wanted |= wanted;
@@ -816,7 +823,7 @@ int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
816 823
817 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 824 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
818 cap = rb_entry(p, struct ceph_cap, ci_node); 825 cap = rb_entry(p, struct ceph_cap, ci_node);
819 if (cap != ocap && __cap_is_valid(cap) && 826 if (cap != ocap &&
820 (cap->implemented & ~cap->issued & mask)) 827 (cap->implemented & ~cap->issued & mask))
821 return 1; 828 return 1;
822 } 829 }
@@ -888,7 +895,19 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)
888 */ 895 */
889static int __ceph_is_any_caps(struct ceph_inode_info *ci) 896static int __ceph_is_any_caps(struct ceph_inode_info *ci)
890{ 897{
891 return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_mds >= 0; 898 return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_issued;
899}
900
901int ceph_is_any_caps(struct inode *inode)
902{
903 struct ceph_inode_info *ci = ceph_inode(inode);
904 int ret;
905
906 spin_lock(&ci->i_ceph_lock);
907 ret = __ceph_is_any_caps(ci);
908 spin_unlock(&ci->i_ceph_lock);
909
910 return ret;
892} 911}
893 912
894/* 913/*
@@ -1383,13 +1402,10 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
1383 ci->i_snap_realm->cached_context); 1402 ci->i_snap_realm->cached_context);
1384 dout(" inode %p now dirty snapc %p auth cap %p\n", 1403 dout(" inode %p now dirty snapc %p auth cap %p\n",
1385 &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); 1404 &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
1405 WARN_ON(!ci->i_auth_cap);
1386 BUG_ON(!list_empty(&ci->i_dirty_item)); 1406 BUG_ON(!list_empty(&ci->i_dirty_item));
1387 spin_lock(&mdsc->cap_dirty_lock); 1407 spin_lock(&mdsc->cap_dirty_lock);
1388 if (ci->i_auth_cap) 1408 list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
1389 list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
1390 else
1391 list_add(&ci->i_dirty_item,
1392 &mdsc->cap_dirty_migrating);
1393 spin_unlock(&mdsc->cap_dirty_lock); 1409 spin_unlock(&mdsc->cap_dirty_lock);
1394 if (ci->i_flushing_caps == 0) { 1410 if (ci->i_flushing_caps == 0) {
1395 ihold(inode); 1411 ihold(inode);
@@ -1735,13 +1751,12 @@ ack:
1735/* 1751/*
1736 * Try to flush dirty caps back to the auth mds. 1752 * Try to flush dirty caps back to the auth mds.
1737 */ 1753 */
1738static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session, 1754static int try_flush_caps(struct inode *inode, unsigned *flush_tid)
1739 unsigned *flush_tid)
1740{ 1755{
1741 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 1756 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
1742 struct ceph_inode_info *ci = ceph_inode(inode); 1757 struct ceph_inode_info *ci = ceph_inode(inode);
1743 int unlock_session = session ? 0 : 1;
1744 int flushing = 0; 1758 int flushing = 0;
1759 struct ceph_mds_session *session = NULL;
1745 1760
1746retry: 1761retry:
1747 spin_lock(&ci->i_ceph_lock); 1762 spin_lock(&ci->i_ceph_lock);
@@ -1755,13 +1770,14 @@ retry:
1755 int want = __ceph_caps_wanted(ci); 1770 int want = __ceph_caps_wanted(ci);
1756 int delayed; 1771 int delayed;
1757 1772
1758 if (!session) { 1773 if (!session || session != cap->session) {
1759 spin_unlock(&ci->i_ceph_lock); 1774 spin_unlock(&ci->i_ceph_lock);
1775 if (session)
1776 mutex_unlock(&session->s_mutex);
1760 session = cap->session; 1777 session = cap->session;
1761 mutex_lock(&session->s_mutex); 1778 mutex_lock(&session->s_mutex);
1762 goto retry; 1779 goto retry;
1763 } 1780 }
1764 BUG_ON(session != cap->session);
1765 if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) 1781 if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
1766 goto out; 1782 goto out;
1767 1783
@@ -1780,7 +1796,7 @@ retry:
1780out: 1796out:
1781 spin_unlock(&ci->i_ceph_lock); 1797 spin_unlock(&ci->i_ceph_lock);
1782out_unlocked: 1798out_unlocked:
1783 if (session && unlock_session) 1799 if (session)
1784 mutex_unlock(&session->s_mutex); 1800 mutex_unlock(&session->s_mutex);
1785 return flushing; 1801 return flushing;
1786} 1802}
@@ -1865,7 +1881,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
1865 return ret; 1881 return ret;
1866 mutex_lock(&inode->i_mutex); 1882 mutex_lock(&inode->i_mutex);
1867 1883
1868 dirty = try_flush_caps(inode, NULL, &flush_tid); 1884 dirty = try_flush_caps(inode, &flush_tid);
1869 dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); 1885 dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
1870 1886
1871 /* 1887 /*
@@ -1900,7 +1916,7 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
1900 1916
1901 dout("write_inode %p wait=%d\n", inode, wait); 1917 dout("write_inode %p wait=%d\n", inode, wait);
1902 if (wait) { 1918 if (wait) {
1903 dirty = try_flush_caps(inode, NULL, &flush_tid); 1919 dirty = try_flush_caps(inode, &flush_tid);
1904 if (dirty) 1920 if (dirty)
1905 err = wait_event_interruptible(ci->i_cap_wq, 1921 err = wait_event_interruptible(ci->i_cap_wq,
1906 caps_are_flushed(inode, flush_tid)); 1922 caps_are_flushed(inode, flush_tid));
@@ -2350,11 +2366,11 @@ static void invalidate_aliases(struct inode *inode)
2350 d_prune_aliases(inode); 2366 d_prune_aliases(inode);
2351 /* 2367 /*
2352 * For non-directory inode, d_find_alias() only returns 2368 * For non-directory inode, d_find_alias() only returns
2353 * connected dentry. After calling d_invalidate(), the 2369 * hashed dentry. After calling d_invalidate(), the
2354 * dentry become disconnected. 2370 * dentry becomes unhashed.
2355 * 2371 *
2356 * For directory inode, d_find_alias() can return 2372 * For directory inode, d_find_alias() can return
2357 * disconnected dentry. But directory inode should have 2373 * unhashed dentry. But directory inode should have
2358 * one alias at most. 2374 * one alias at most.
2359 */ 2375 */
2360 while ((dn = d_find_alias(inode))) { 2376 while ((dn = d_find_alias(inode))) {
@@ -2408,6 +2424,22 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2408 dout(" size %llu max_size %llu, i_size %llu\n", size, max_size, 2424 dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
2409 inode->i_size); 2425 inode->i_size);
2410 2426
2427
2428 /*
2429 * auth mds of the inode changed. we received the cap export message,
2430 * but still haven't received the cap import message. handle_cap_export
2431 * updated the new auth MDS' cap.
2432 *
2433 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message
2434 * that was sent before the cap import message. So don't remove caps.
2435 */
2436 if (ceph_seq_cmp(seq, cap->seq) <= 0) {
2437 WARN_ON(cap != ci->i_auth_cap);
2438 WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
2439 seq = cap->seq;
2440 newcaps |= cap->issued;
2441 }
2442
2411 /* 2443 /*
2412 * If CACHE is being revoked, and we have no dirty buffers, 2444 * If CACHE is being revoked, and we have no dirty buffers,
2413 * try to invalidate (once). (If there are dirty buffers, we 2445 * try to invalidate (once). (If there are dirty buffers, we
@@ -2434,6 +2466,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2434 issued |= implemented | __ceph_caps_dirty(ci); 2466 issued |= implemented | __ceph_caps_dirty(ci);
2435 2467
2436 cap->cap_gen = session->s_cap_gen; 2468 cap->cap_gen = session->s_cap_gen;
2469 cap->seq = seq;
2437 2470
2438 __check_cap_issue(ci, cap, newcaps); 2471 __check_cap_issue(ci, cap, newcaps);
2439 2472
@@ -2464,6 +2497,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2464 ceph_buffer_put(ci->i_xattrs.blob); 2497 ceph_buffer_put(ci->i_xattrs.blob);
2465 ci->i_xattrs.blob = ceph_buffer_get(xattr_buf); 2498 ci->i_xattrs.blob = ceph_buffer_get(xattr_buf);
2466 ci->i_xattrs.version = version; 2499 ci->i_xattrs.version = version;
2500 ceph_forget_all_cached_acls(inode);
2467 } 2501 }
2468 } 2502 }
2469 2503
@@ -2483,6 +2517,10 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2483 le32_to_cpu(grant->time_warp_seq), &ctime, &mtime, 2517 le32_to_cpu(grant->time_warp_seq), &ctime, &mtime,
2484 &atime); 2518 &atime);
2485 2519
2520
2521 /* file layout may have changed */
2522 ci->i_layout = grant->layout;
2523
2486 /* max size increase? */ 2524 /* max size increase? */
2487 if (ci->i_auth_cap == cap && max_size != ci->i_max_size) { 2525 if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
2488 dout("max_size %lld -> %llu\n", ci->i_max_size, max_size); 2526 dout("max_size %lld -> %llu\n", ci->i_max_size, max_size);
@@ -2511,11 +2549,6 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2511 check_caps = 1; 2549 check_caps = 1;
2512 } 2550 }
2513 2551
2514 cap->seq = seq;
2515
2516 /* file layout may have changed */
2517 ci->i_layout = grant->layout;
2518
2519 /* revocation, grant, or no-op? */ 2552 /* revocation, grant, or no-op? */
2520 if (cap->issued & ~newcaps) { 2553 if (cap->issued & ~newcaps) {
2521 int revoking = cap->issued & ~newcaps; 2554 int revoking = cap->issued & ~newcaps;
@@ -2741,65 +2774,114 @@ static void handle_cap_trunc(struct inode *inode,
2741 * caller holds s_mutex 2774 * caller holds s_mutex
2742 */ 2775 */
2743static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, 2776static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
2744 struct ceph_mds_session *session, 2777 struct ceph_mds_cap_peer *ph,
2745 int *open_target_sessions) 2778 struct ceph_mds_session *session)
2746{ 2779{
2747 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 2780 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
2781 struct ceph_mds_session *tsession = NULL;
2782 struct ceph_cap *cap, *tcap;
2748 struct ceph_inode_info *ci = ceph_inode(inode); 2783 struct ceph_inode_info *ci = ceph_inode(inode);
2749 int mds = session->s_mds; 2784 u64 t_cap_id;
2750 unsigned mseq = le32_to_cpu(ex->migrate_seq); 2785 unsigned mseq = le32_to_cpu(ex->migrate_seq);
2751 struct ceph_cap *cap = NULL, *t; 2786 unsigned t_seq, t_mseq;
2752 struct rb_node *p; 2787 int target, issued;
2753 int remember = 1; 2788 int mds = session->s_mds;
2754 2789
2755 dout("handle_cap_export inode %p ci %p mds%d mseq %d\n", 2790 if (ph) {
2756 inode, ci, mds, mseq); 2791 t_cap_id = le64_to_cpu(ph->cap_id);
2792 t_seq = le32_to_cpu(ph->seq);
2793 t_mseq = le32_to_cpu(ph->mseq);
2794 target = le32_to_cpu(ph->mds);
2795 } else {
2796 t_cap_id = t_seq = t_mseq = 0;
2797 target = -1;
2798 }
2757 2799
2800 dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n",
2801 inode, ci, mds, mseq, target);
2802retry:
2758 spin_lock(&ci->i_ceph_lock); 2803 spin_lock(&ci->i_ceph_lock);
2804 cap = __get_cap_for_mds(ci, mds);
2805 if (!cap)
2806 goto out_unlock;
2759 2807
2760 /* make sure we haven't seen a higher mseq */ 2808 if (target < 0) {
2761 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 2809 __ceph_remove_cap(cap, false);
2762 t = rb_entry(p, struct ceph_cap, ci_node); 2810 goto out_unlock;
2763 if (ceph_seq_cmp(t->mseq, mseq) > 0) {
2764 dout(" higher mseq on cap from mds%d\n",
2765 t->session->s_mds);
2766 remember = 0;
2767 }
2768 if (t->session->s_mds == mds)
2769 cap = t;
2770 } 2811 }
2771 2812
2772 if (cap) { 2813 /*
2773 if (remember) { 2814 * now we know we haven't received the cap import message yet
2774 /* make note */ 2815 * because the exported cap still exist.
2775 ci->i_cap_exporting_mds = mds; 2816 */
2776 ci->i_cap_exporting_mseq = mseq;
2777 ci->i_cap_exporting_issued = cap->issued;
2778
2779 /*
2780 * make sure we have open sessions with all possible
2781 * export targets, so that we get the matching IMPORT
2782 */
2783 *open_target_sessions = 1;
2784 2817
2785 /* 2818 issued = cap->issued;
2786 * we can't flush dirty caps that we've seen the 2819 WARN_ON(issued != cap->implemented);
2787 * EXPORT but no IMPORT for 2820
2788 */ 2821 tcap = __get_cap_for_mds(ci, target);
2789 spin_lock(&mdsc->cap_dirty_lock); 2822 if (tcap) {
2790 if (!list_empty(&ci->i_dirty_item)) { 2823 /* already have caps from the target */
2791 dout(" moving %p to cap_dirty_migrating\n", 2824 if (tcap->cap_id != t_cap_id ||
2792 inode); 2825 ceph_seq_cmp(tcap->seq, t_seq) < 0) {
2793 list_move(&ci->i_dirty_item, 2826 dout(" updating import cap %p mds%d\n", tcap, target);
2794 &mdsc->cap_dirty_migrating); 2827 tcap->cap_id = t_cap_id;
2828 tcap->seq = t_seq - 1;
2829 tcap->issue_seq = t_seq - 1;
2830 tcap->mseq = t_mseq;
2831 tcap->issued |= issued;
2832 tcap->implemented |= issued;
2833 if (cap == ci->i_auth_cap)
2834 ci->i_auth_cap = tcap;
2835 if (ci->i_flushing_caps && ci->i_auth_cap == tcap) {
2836 spin_lock(&mdsc->cap_dirty_lock);
2837 list_move_tail(&ci->i_flushing_item,
2838 &tcap->session->s_cap_flushing);
2839 spin_unlock(&mdsc->cap_dirty_lock);
2795 } 2840 }
2796 spin_unlock(&mdsc->cap_dirty_lock);
2797 } 2841 }
2798 __ceph_remove_cap(cap, false); 2842 __ceph_remove_cap(cap, false);
2843 goto out_unlock;
2799 } 2844 }
2800 /* else, we already released it */
2801 2845
2846 if (tsession) {
2847 int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
2848 spin_unlock(&ci->i_ceph_lock);
2849 /* add placeholder for the export tagert */
2850 ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
2851 t_seq - 1, t_mseq, (u64)-1, flag, NULL);
2852 goto retry;
2853 }
2854
2855 spin_unlock(&ci->i_ceph_lock);
2856 mutex_unlock(&session->s_mutex);
2857
2858 /* open target session */
2859 tsession = ceph_mdsc_open_export_target_session(mdsc, target);
2860 if (!IS_ERR(tsession)) {
2861 if (mds > target) {
2862 mutex_lock(&session->s_mutex);
2863 mutex_lock_nested(&tsession->s_mutex,
2864 SINGLE_DEPTH_NESTING);
2865 } else {
2866 mutex_lock(&tsession->s_mutex);
2867 mutex_lock_nested(&session->s_mutex,
2868 SINGLE_DEPTH_NESTING);
2869 }
2870 ceph_add_cap_releases(mdsc, tsession);
2871 } else {
2872 WARN_ON(1);
2873 tsession = NULL;
2874 target = -1;
2875 }
2876 goto retry;
2877
2878out_unlock:
2802 spin_unlock(&ci->i_ceph_lock); 2879 spin_unlock(&ci->i_ceph_lock);
2880 mutex_unlock(&session->s_mutex);
2881 if (tsession) {
2882 mutex_unlock(&tsession->s_mutex);
2883 ceph_put_mds_session(tsession);
2884 }
2803} 2885}
2804 2886
2805/* 2887/*
@@ -2810,10 +2892,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
2810 */ 2892 */
2811static void handle_cap_import(struct ceph_mds_client *mdsc, 2893static void handle_cap_import(struct ceph_mds_client *mdsc,
2812 struct inode *inode, struct ceph_mds_caps *im, 2894 struct inode *inode, struct ceph_mds_caps *im,
2895 struct ceph_mds_cap_peer *ph,
2813 struct ceph_mds_session *session, 2896 struct ceph_mds_session *session,
2814 void *snaptrace, int snaptrace_len) 2897 void *snaptrace, int snaptrace_len)
2815{ 2898{
2816 struct ceph_inode_info *ci = ceph_inode(inode); 2899 struct ceph_inode_info *ci = ceph_inode(inode);
2900 struct ceph_cap *cap;
2817 int mds = session->s_mds; 2901 int mds = session->s_mds;
2818 unsigned issued = le32_to_cpu(im->caps); 2902 unsigned issued = le32_to_cpu(im->caps);
2819 unsigned wanted = le32_to_cpu(im->wanted); 2903 unsigned wanted = le32_to_cpu(im->wanted);
@@ -2821,28 +2905,44 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
2821 unsigned mseq = le32_to_cpu(im->migrate_seq); 2905 unsigned mseq = le32_to_cpu(im->migrate_seq);
2822 u64 realmino = le64_to_cpu(im->realm); 2906 u64 realmino = le64_to_cpu(im->realm);
2823 u64 cap_id = le64_to_cpu(im->cap_id); 2907 u64 cap_id = le64_to_cpu(im->cap_id);
2908 u64 p_cap_id;
2909 int peer;
2824 2910
2825 if (ci->i_cap_exporting_mds >= 0 && 2911 if (ph) {
2826 ceph_seq_cmp(ci->i_cap_exporting_mseq, mseq) < 0) { 2912 p_cap_id = le64_to_cpu(ph->cap_id);
2827 dout("handle_cap_import inode %p ci %p mds%d mseq %d" 2913 peer = le32_to_cpu(ph->mds);
2828 " - cleared exporting from mds%d\n", 2914 } else {
2829 inode, ci, mds, mseq, 2915 p_cap_id = 0;
2830 ci->i_cap_exporting_mds); 2916 peer = -1;
2831 ci->i_cap_exporting_issued = 0; 2917 }
2832 ci->i_cap_exporting_mseq = 0;
2833 ci->i_cap_exporting_mds = -1;
2834 2918
2835 spin_lock(&mdsc->cap_dirty_lock); 2919 dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
2836 if (!list_empty(&ci->i_dirty_item)) { 2920 inode, ci, mds, mseq, peer);
2837 dout(" moving %p back to cap_dirty\n", inode); 2921
2838 list_move(&ci->i_dirty_item, &mdsc->cap_dirty); 2922 spin_lock(&ci->i_ceph_lock);
2923 cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
2924 if (cap && cap->cap_id == p_cap_id) {
2925 dout(" remove export cap %p mds%d flags %d\n",
2926 cap, peer, ph->flags);
2927 if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
2928 (cap->seq != le32_to_cpu(ph->seq) ||
2929 cap->mseq != le32_to_cpu(ph->mseq))) {
2930 pr_err("handle_cap_import: mismatched seq/mseq: "
2931 "ino (%llx.%llx) mds%d seq %d mseq %d "
2932 "importer mds%d has peer seq %d mseq %d\n",
2933 ceph_vinop(inode), peer, cap->seq,
2934 cap->mseq, mds, le32_to_cpu(ph->seq),
2935 le32_to_cpu(ph->mseq));
2839 } 2936 }
2840 spin_unlock(&mdsc->cap_dirty_lock); 2937 ci->i_cap_exporting_issued = cap->issued;
2841 } else { 2938 __ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
2842 dout("handle_cap_import inode %p ci %p mds%d mseq %d\n",
2843 inode, ci, mds, mseq);
2844 } 2939 }
2845 2940
2941 /* make sure we re-request max_size, if necessary */
2942 ci->i_wanted_max_size = 0;
2943 ci->i_requested_max_size = 0;
2944 spin_unlock(&ci->i_ceph_lock);
2945
2846 down_write(&mdsc->snap_rwsem); 2946 down_write(&mdsc->snap_rwsem);
2847 ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len, 2947 ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
2848 false); 2948 false);
@@ -2853,11 +2953,6 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
2853 kick_flushing_inode_caps(mdsc, session, inode); 2953 kick_flushing_inode_caps(mdsc, session, inode);
2854 up_read(&mdsc->snap_rwsem); 2954 up_read(&mdsc->snap_rwsem);
2855 2955
2856 /* make sure we re-request max_size, if necessary */
2857 spin_lock(&ci->i_ceph_lock);
2858 ci->i_wanted_max_size = 0; /* reset */
2859 ci->i_requested_max_size = 0;
2860 spin_unlock(&ci->i_ceph_lock);
2861} 2956}
2862 2957
2863/* 2958/*
@@ -2875,6 +2970,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2875 struct ceph_inode_info *ci; 2970 struct ceph_inode_info *ci;
2876 struct ceph_cap *cap; 2971 struct ceph_cap *cap;
2877 struct ceph_mds_caps *h; 2972 struct ceph_mds_caps *h;
2973 struct ceph_mds_cap_peer *peer = NULL;
2878 int mds = session->s_mds; 2974 int mds = session->s_mds;
2879 int op; 2975 int op;
2880 u32 seq, mseq; 2976 u32 seq, mseq;
@@ -2885,12 +2981,13 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2885 void *snaptrace; 2981 void *snaptrace;
2886 size_t snaptrace_len; 2982 size_t snaptrace_len;
2887 void *flock; 2983 void *flock;
2984 void *end;
2888 u32 flock_len; 2985 u32 flock_len;
2889 int open_target_sessions = 0;
2890 2986
2891 dout("handle_caps from mds%d\n", mds); 2987 dout("handle_caps from mds%d\n", mds);
2892 2988
2893 /* decode */ 2989 /* decode */
2990 end = msg->front.iov_base + msg->front.iov_len;
2894 tid = le64_to_cpu(msg->hdr.tid); 2991 tid = le64_to_cpu(msg->hdr.tid);
2895 if (msg->front.iov_len < sizeof(*h)) 2992 if (msg->front.iov_len < sizeof(*h))
2896 goto bad; 2993 goto bad;
@@ -2908,17 +3005,28 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2908 snaptrace_len = le32_to_cpu(h->snap_trace_len); 3005 snaptrace_len = le32_to_cpu(h->snap_trace_len);
2909 3006
2910 if (le16_to_cpu(msg->hdr.version) >= 2) { 3007 if (le16_to_cpu(msg->hdr.version) >= 2) {
2911 void *p, *end; 3008 void *p = snaptrace + snaptrace_len;
2912
2913 p = snaptrace + snaptrace_len;
2914 end = msg->front.iov_base + msg->front.iov_len;
2915 ceph_decode_32_safe(&p, end, flock_len, bad); 3009 ceph_decode_32_safe(&p, end, flock_len, bad);
3010 if (p + flock_len > end)
3011 goto bad;
2916 flock = p; 3012 flock = p;
2917 } else { 3013 } else {
2918 flock = NULL; 3014 flock = NULL;
2919 flock_len = 0; 3015 flock_len = 0;
2920 } 3016 }
2921 3017
3018 if (le16_to_cpu(msg->hdr.version) >= 3) {
3019 if (op == CEPH_CAP_OP_IMPORT) {
3020 void *p = flock + flock_len;
3021 if (p + sizeof(*peer) > end)
3022 goto bad;
3023 peer = p;
3024 } else if (op == CEPH_CAP_OP_EXPORT) {
3025 /* recorded in unused fields */
3026 peer = (void *)&h->size;
3027 }
3028 }
3029
2922 mutex_lock(&session->s_mutex); 3030 mutex_lock(&session->s_mutex);
2923 session->s_seq++; 3031 session->s_seq++;
2924 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, 3032 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
@@ -2951,11 +3059,11 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2951 goto done; 3059 goto done;
2952 3060
2953 case CEPH_CAP_OP_EXPORT: 3061 case CEPH_CAP_OP_EXPORT:
2954 handle_cap_export(inode, h, session, &open_target_sessions); 3062 handle_cap_export(inode, h, peer, session);
2955 goto done; 3063 goto done_unlocked;
2956 3064
2957 case CEPH_CAP_OP_IMPORT: 3065 case CEPH_CAP_OP_IMPORT:
2958 handle_cap_import(mdsc, inode, h, session, 3066 handle_cap_import(mdsc, inode, h, peer, session,
2959 snaptrace, snaptrace_len); 3067 snaptrace, snaptrace_len);
2960 } 3068 }
2961 3069
@@ -3007,8 +3115,6 @@ done:
3007done_unlocked: 3115done_unlocked:
3008 if (inode) 3116 if (inode)
3009 iput(inode); 3117 iput(inode);
3010 if (open_target_sessions)
3011 ceph_mdsc_open_export_target_sessions(mdsc, session);
3012 return; 3118 return;
3013 3119
3014bad: 3120bad:
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 2a0bcaeb189a..619616d585b0 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -693,6 +693,10 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
693 if (!err && !req->r_reply_info.head->is_dentry) 693 if (!err && !req->r_reply_info.head->is_dentry)
694 err = ceph_handle_notrace_create(dir, dentry); 694 err = ceph_handle_notrace_create(dir, dentry);
695 ceph_mdsc_put_request(req); 695 ceph_mdsc_put_request(req);
696
697 if (!err)
698 err = ceph_init_acl(dentry, dentry->d_inode, dir);
699
696 if (err) 700 if (err)
697 d_drop(dentry); 701 d_drop(dentry);
698 return err; 702 return err;
@@ -1037,14 +1041,19 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
1037 valid = 1; 1041 valid = 1;
1038 } else if (dentry_lease_is_valid(dentry) || 1042 } else if (dentry_lease_is_valid(dentry) ||
1039 dir_lease_is_valid(dir, dentry)) { 1043 dir_lease_is_valid(dir, dentry)) {
1040 valid = 1; 1044 if (dentry->d_inode)
1045 valid = ceph_is_any_caps(dentry->d_inode);
1046 else
1047 valid = 1;
1041 } 1048 }
1042 1049
1043 dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid"); 1050 dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
1044 if (valid) 1051 if (valid) {
1045 ceph_dentry_lru_touch(dentry); 1052 ceph_dentry_lru_touch(dentry);
1046 else 1053 } else {
1054 ceph_dir_clear_complete(dir);
1047 d_drop(dentry); 1055 d_drop(dentry);
1056 }
1048 iput(dir); 1057 iput(dir);
1049 return valid; 1058 return valid;
1050} 1059}
@@ -1293,6 +1302,7 @@ const struct inode_operations ceph_dir_iops = {
1293 .getxattr = ceph_getxattr, 1302 .getxattr = ceph_getxattr,
1294 .listxattr = ceph_listxattr, 1303 .listxattr = ceph_listxattr,
1295 .removexattr = ceph_removexattr, 1304 .removexattr = ceph_removexattr,
1305 .get_acl = ceph_get_acl,
1296 .mknod = ceph_mknod, 1306 .mknod = ceph_mknod,
1297 .symlink = ceph_symlink, 1307 .symlink = ceph_symlink,
1298 .mkdir = ceph_mkdir, 1308 .mkdir = ceph_mkdir,
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 3de89829e2a1..dfd2ce3419f8 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -408,51 +408,92 @@ more:
408 * 408 *
409 * If the read spans object boundary, just do multiple reads. 409 * If the read spans object boundary, just do multiple reads.
410 */ 410 */
411static ssize_t ceph_sync_read(struct file *file, char __user *data, 411static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
412 unsigned len, loff_t *poff, int *checkeof) 412 int *checkeof)
413{ 413{
414 struct file *file = iocb->ki_filp;
414 struct inode *inode = file_inode(file); 415 struct inode *inode = file_inode(file);
415 struct page **pages; 416 struct page **pages;
416 u64 off = *poff; 417 u64 off = iocb->ki_pos;
417 int num_pages, ret; 418 int num_pages, ret;
419 size_t len = i->count;
418 420
419 dout("sync_read on file %p %llu~%u %s\n", file, off, len, 421 dout("sync_read on file %p %llu~%u %s\n", file, off,
422 (unsigned)len,
420 (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); 423 (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
421
422 if (file->f_flags & O_DIRECT) {
423 num_pages = calc_pages_for((unsigned long)data, len);
424 pages = ceph_get_direct_page_vector(data, num_pages, true);
425 } else {
426 num_pages = calc_pages_for(off, len);
427 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
428 }
429 if (IS_ERR(pages))
430 return PTR_ERR(pages);
431
432 /* 424 /*
433 * flush any page cache pages in this range. this 425 * flush any page cache pages in this range. this
434 * will make concurrent normal and sync io slow, 426 * will make concurrent normal and sync io slow,
435 * but it will at least behave sensibly when they are 427 * but it will at least behave sensibly when they are
436 * in sequence. 428 * in sequence.
437 */ 429 */
438 ret = filemap_write_and_wait(inode->i_mapping); 430 ret = filemap_write_and_wait_range(inode->i_mapping, off,
431 off + len);
439 if (ret < 0) 432 if (ret < 0)
440 goto done; 433 return ret;
441 434
442 ret = striped_read(inode, off, len, pages, num_pages, checkeof, 435 if (file->f_flags & O_DIRECT) {
443 file->f_flags & O_DIRECT, 436 while (iov_iter_count(i)) {
444 (unsigned long)data & ~PAGE_MASK); 437 void __user *data = i->iov[0].iov_base + i->iov_offset;
438 size_t len = i->iov[0].iov_len - i->iov_offset;
439
440 num_pages = calc_pages_for((unsigned long)data, len);
441 pages = ceph_get_direct_page_vector(data,
442 num_pages, true);
443 if (IS_ERR(pages))
444 return PTR_ERR(pages);
445
446 ret = striped_read(inode, off, len,
447 pages, num_pages, checkeof,
448 1, (unsigned long)data & ~PAGE_MASK);
449 ceph_put_page_vector(pages, num_pages, true);
450
451 if (ret <= 0)
452 break;
453 off += ret;
454 iov_iter_advance(i, ret);
455 if (ret < len)
456 break;
457 }
458 } else {
459 num_pages = calc_pages_for(off, len);
460 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
461 if (IS_ERR(pages))
462 return PTR_ERR(pages);
463 ret = striped_read(inode, off, len, pages,
464 num_pages, checkeof, 0, 0);
465 if (ret > 0) {
466 int l, k = 0;
467 size_t left = len = ret;
468
469 while (left) {
470 void __user *data = i->iov[0].iov_base
471 + i->iov_offset;
472 l = min(i->iov[0].iov_len - i->iov_offset,
473 left);
474
475 ret = ceph_copy_page_vector_to_user(&pages[k],
476 data, off,
477 l);
478 if (ret > 0) {
479 iov_iter_advance(i, ret);
480 left -= ret;
481 off += ret;
482 k = calc_pages_for(iocb->ki_pos,
483 len - left + 1) - 1;
484 BUG_ON(k >= num_pages && left);
485 } else
486 break;
487 }
488 }
489 ceph_release_page_vector(pages, num_pages);
490 }
445 491
446 if (ret >= 0 && (file->f_flags & O_DIRECT) == 0) 492 if (off > iocb->ki_pos) {
447 ret = ceph_copy_page_vector_to_user(pages, data, off, ret); 493 ret = off - iocb->ki_pos;
448 if (ret >= 0) 494 iocb->ki_pos = off;
449 *poff = off + ret; 495 }
450 496
451done:
452 if (file->f_flags & O_DIRECT)
453 ceph_put_page_vector(pages, num_pages, true);
454 else
455 ceph_release_page_vector(pages, num_pages);
456 dout("sync_read result %d\n", ret); 497 dout("sync_read result %d\n", ret);
457 return ret; 498 return ret;
458} 499}
@@ -489,83 +530,79 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
489 } 530 }
490} 531}
491 532
533
492/* 534/*
493 * Synchronous write, straight from __user pointer or user pages (if 535 * Synchronous write, straight from __user pointer or user pages.
494 * O_DIRECT).
495 * 536 *
496 * If write spans object boundary, just do multiple writes. (For a 537 * If write spans object boundary, just do multiple writes. (For a
497 * correct atomic write, we should e.g. take write locks on all 538 * correct atomic write, we should e.g. take write locks on all
498 * objects, rollback on failure, etc.) 539 * objects, rollback on failure, etc.)
499 */ 540 */
500static ssize_t ceph_sync_write(struct file *file, const char __user *data, 541static ssize_t
501 size_t left, loff_t pos, loff_t *ppos) 542ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,
543 unsigned long nr_segs, size_t count)
502{ 544{
545 struct file *file = iocb->ki_filp;
503 struct inode *inode = file_inode(file); 546 struct inode *inode = file_inode(file);
504 struct ceph_inode_info *ci = ceph_inode(inode); 547 struct ceph_inode_info *ci = ceph_inode(inode);
505 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 548 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
506 struct ceph_snap_context *snapc; 549 struct ceph_snap_context *snapc;
507 struct ceph_vino vino; 550 struct ceph_vino vino;
508 struct ceph_osd_request *req; 551 struct ceph_osd_request *req;
509 int num_ops = 1;
510 struct page **pages; 552 struct page **pages;
511 int num_pages; 553 int num_pages;
512 u64 len;
513 int written = 0; 554 int written = 0;
514 int flags; 555 int flags;
515 int check_caps = 0; 556 int check_caps = 0;
516 int page_align, io_align; 557 int page_align;
517 unsigned long buf_align;
518 int ret; 558 int ret;
519 struct timespec mtime = CURRENT_TIME; 559 struct timespec mtime = CURRENT_TIME;
520 bool own_pages = false; 560 loff_t pos = iocb->ki_pos;
561 struct iov_iter i;
521 562
522 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) 563 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
523 return -EROFS; 564 return -EROFS;
524 565
525 dout("sync_write on file %p %lld~%u %s\n", file, pos, 566 dout("sync_direct_write on file %p %lld~%u\n", file, pos,
526 (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); 567 (unsigned)count);
527 568
528 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); 569 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
529 if (ret < 0) 570 if (ret < 0)
530 return ret; 571 return ret;
531 572
532 ret = invalidate_inode_pages2_range(inode->i_mapping, 573 ret = invalidate_inode_pages2_range(inode->i_mapping,
533 pos >> PAGE_CACHE_SHIFT, 574 pos >> PAGE_CACHE_SHIFT,
534 (pos + left) >> PAGE_CACHE_SHIFT); 575 (pos + count) >> PAGE_CACHE_SHIFT);
535 if (ret < 0) 576 if (ret < 0)
536 dout("invalidate_inode_pages2_range returned %d\n", ret); 577 dout("invalidate_inode_pages2_range returned %d\n", ret);
537 578
538 flags = CEPH_OSD_FLAG_ORDERSNAP | 579 flags = CEPH_OSD_FLAG_ORDERSNAP |
539 CEPH_OSD_FLAG_ONDISK | 580 CEPH_OSD_FLAG_ONDISK |
540 CEPH_OSD_FLAG_WRITE; 581 CEPH_OSD_FLAG_WRITE;
541 if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
542 flags |= CEPH_OSD_FLAG_ACK;
543 else
544 num_ops++; /* Also include a 'startsync' command. */
545 582
546 /* 583 iov_iter_init(&i, iov, nr_segs, count, 0);
547 * we may need to do multiple writes here if we span an object 584
548 * boundary. this isn't atomic, unfortunately. :( 585 while (iov_iter_count(&i) > 0) {
549 */ 586 void __user *data = i.iov->iov_base + i.iov_offset;
550more: 587 u64 len = i.iov->iov_len - i.iov_offset;
551 io_align = pos & ~PAGE_MASK; 588
552 buf_align = (unsigned long)data & ~PAGE_MASK; 589 page_align = (unsigned long)data & ~PAGE_MASK;
553 len = left; 590
554 591 snapc = ci->i_snap_realm->cached_context;
555 snapc = ci->i_snap_realm->cached_context; 592 vino = ceph_vino(inode);
556 vino = ceph_vino(inode); 593 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
557 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 594 vino, pos, &len,
558 vino, pos, &len, num_ops, 595 2,/*include a 'startsync' command*/
559 CEPH_OSD_OP_WRITE, flags, snapc, 596 CEPH_OSD_OP_WRITE, flags, snapc,
560 ci->i_truncate_seq, ci->i_truncate_size, 597 ci->i_truncate_seq,
561 false); 598 ci->i_truncate_size,
562 if (IS_ERR(req)) 599 false);
563 return PTR_ERR(req); 600 if (IS_ERR(req)) {
601 ret = PTR_ERR(req);
602 goto out;
603 }
564 604
565 /* write from beginning of first page, regardless of io alignment */ 605 num_pages = calc_pages_for(page_align, len);
566 page_align = file->f_flags & O_DIRECT ? buf_align : io_align;
567 num_pages = calc_pages_for(page_align, len);
568 if (file->f_flags & O_DIRECT) {
569 pages = ceph_get_direct_page_vector(data, num_pages, false); 606 pages = ceph_get_direct_page_vector(data, num_pages, false);
570 if (IS_ERR(pages)) { 607 if (IS_ERR(pages)) {
571 ret = PTR_ERR(pages); 608 ret = PTR_ERR(pages);
@@ -577,60 +614,175 @@ more:
577 * may block. 614 * may block.
578 */ 615 */
579 truncate_inode_pages_range(inode->i_mapping, pos, 616 truncate_inode_pages_range(inode->i_mapping, pos,
580 (pos+len) | (PAGE_CACHE_SIZE-1)); 617 (pos+len) | (PAGE_CACHE_SIZE-1));
581 } else { 618 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
619 false, false);
620
621 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
622 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
623
624 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
625 if (!ret)
626 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
627
628 ceph_put_page_vector(pages, num_pages, false);
629
630out:
631 ceph_osdc_put_request(req);
632 if (ret == 0) {
633 pos += len;
634 written += len;
635 iov_iter_advance(&i, (size_t)len);
636
637 if (pos > i_size_read(inode)) {
638 check_caps = ceph_inode_set_size(inode, pos);
639 if (check_caps)
640 ceph_check_caps(ceph_inode(inode),
641 CHECK_CAPS_AUTHONLY,
642 NULL);
643 }
644 } else
645 break;
646 }
647
648 if (ret != -EOLDSNAPC && written > 0) {
649 iocb->ki_pos = pos;
650 ret = written;
651 }
652 return ret;
653}
654
655
656/*
657 * Synchronous write, straight from __user pointer or user pages.
658 *
659 * If write spans object boundary, just do multiple writes. (For a
660 * correct atomic write, we should e.g. take write locks on all
661 * objects, rollback on failure, etc.)
662 */
663static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov,
664 unsigned long nr_segs, size_t count)
665{
666 struct file *file = iocb->ki_filp;
667 struct inode *inode = file_inode(file);
668 struct ceph_inode_info *ci = ceph_inode(inode);
669 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
670 struct ceph_snap_context *snapc;
671 struct ceph_vino vino;
672 struct ceph_osd_request *req;
673 struct page **pages;
674 u64 len;
675 int num_pages;
676 int written = 0;
677 int flags;
678 int check_caps = 0;
679 int ret;
680 struct timespec mtime = CURRENT_TIME;
681 loff_t pos = iocb->ki_pos;
682 struct iov_iter i;
683
684 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
685 return -EROFS;
686
687 dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
688
689 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
690 if (ret < 0)
691 return ret;
692
693 ret = invalidate_inode_pages2_range(inode->i_mapping,
694 pos >> PAGE_CACHE_SHIFT,
695 (pos + count) >> PAGE_CACHE_SHIFT);
696 if (ret < 0)
697 dout("invalidate_inode_pages2_range returned %d\n", ret);
698
699 flags = CEPH_OSD_FLAG_ORDERSNAP |
700 CEPH_OSD_FLAG_ONDISK |
701 CEPH_OSD_FLAG_WRITE |
702 CEPH_OSD_FLAG_ACK;
703
704 iov_iter_init(&i, iov, nr_segs, count, 0);
705
706 while ((len = iov_iter_count(&i)) > 0) {
707 size_t left;
708 int n;
709
710 snapc = ci->i_snap_realm->cached_context;
711 vino = ceph_vino(inode);
712 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
713 vino, pos, &len, 1,
714 CEPH_OSD_OP_WRITE, flags, snapc,
715 ci->i_truncate_seq,
716 ci->i_truncate_size,
717 false);
718 if (IS_ERR(req)) {
719 ret = PTR_ERR(req);
720 goto out;
721 }
722
723 /*
724 * write from beginning of first page,
725 * regardless of io alignment
726 */
727 num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
728
582 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); 729 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
583 if (IS_ERR(pages)) { 730 if (IS_ERR(pages)) {
584 ret = PTR_ERR(pages); 731 ret = PTR_ERR(pages);
585 goto out; 732 goto out;
586 } 733 }
587 ret = ceph_copy_user_to_page_vector(pages, data, pos, len); 734
735 left = len;
736 for (n = 0; n < num_pages; n++) {
737 size_t plen = min_t(size_t, left, PAGE_SIZE);
738 ret = iov_iter_copy_from_user(pages[n], &i, 0, plen);
739 if (ret != plen) {
740 ret = -EFAULT;
741 break;
742 }
743 left -= ret;
744 iov_iter_advance(&i, ret);
745 }
746
588 if (ret < 0) { 747 if (ret < 0) {
589 ceph_release_page_vector(pages, num_pages); 748 ceph_release_page_vector(pages, num_pages);
590 goto out; 749 goto out;
591 } 750 }
592 751
593 if ((file->f_flags & O_SYNC) == 0) { 752 /* get a second commit callback */
594 /* get a second commit callback */ 753 req->r_unsafe_callback = ceph_sync_write_unsafe;
595 req->r_unsafe_callback = ceph_sync_write_unsafe; 754 req->r_inode = inode;
596 req->r_inode = inode;
597 own_pages = true;
598 }
599 }
600 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
601 false, own_pages);
602 755
603 /* BUG_ON(vino.snap != CEPH_NOSNAP); */ 756 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
604 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); 757 false, true);
605 758
606 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); 759 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
607 if (!ret) 760 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
608 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
609 761
610 if (file->f_flags & O_DIRECT) 762 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
611 ceph_put_page_vector(pages, num_pages, false); 763 if (!ret)
612 else if (file->f_flags & O_SYNC) 764 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
613 ceph_release_page_vector(pages, num_pages);
614 765
615out: 766out:
616 ceph_osdc_put_request(req); 767 ceph_osdc_put_request(req);
617 if (ret == 0) { 768 if (ret == 0) {
618 pos += len; 769 pos += len;
619 written += len; 770 written += len;
620 left -= len; 771
621 data += len; 772 if (pos > i_size_read(inode)) {
622 if (left) 773 check_caps = ceph_inode_set_size(inode, pos);
623 goto more; 774 if (check_caps)
775 ceph_check_caps(ceph_inode(inode),
776 CHECK_CAPS_AUTHONLY,
777 NULL);
778 }
779 } else
780 break;
781 }
624 782
783 if (ret != -EOLDSNAPC && written > 0) {
625 ret = written; 784 ret = written;
626 *ppos = pos; 785 iocb->ki_pos = pos;
627 if (pos > i_size_read(inode))
628 check_caps = ceph_inode_set_size(inode, pos);
629 if (check_caps)
630 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
631 NULL);
632 } else if (ret != -EOLDSNAPC && written > 0) {
633 ret = written;
634 } 786 }
635 return ret; 787 return ret;
636} 788}
@@ -647,55 +799,84 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
647{ 799{
648 struct file *filp = iocb->ki_filp; 800 struct file *filp = iocb->ki_filp;
649 struct ceph_file_info *fi = filp->private_data; 801 struct ceph_file_info *fi = filp->private_data;
650 loff_t *ppos = &iocb->ki_pos; 802 size_t len = iocb->ki_nbytes;
651 size_t len = iov->iov_len;
652 struct inode *inode = file_inode(filp); 803 struct inode *inode = file_inode(filp);
653 struct ceph_inode_info *ci = ceph_inode(inode); 804 struct ceph_inode_info *ci = ceph_inode(inode);
654 void __user *base = iov->iov_base;
655 ssize_t ret; 805 ssize_t ret;
656 int want, got = 0; 806 int want, got = 0;
657 int checkeof = 0, read = 0; 807 int checkeof = 0, read = 0;
658 808
659 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
660 inode, ceph_vinop(inode), pos, (unsigned)len, inode);
661again: 809again:
810 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
811 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
812
662 if (fi->fmode & CEPH_FILE_MODE_LAZY) 813 if (fi->fmode & CEPH_FILE_MODE_LAZY)
663 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; 814 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
664 else 815 else
665 want = CEPH_CAP_FILE_CACHE; 816 want = CEPH_CAP_FILE_CACHE;
666 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); 817 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
667 if (ret < 0) 818 if (ret < 0)
668 goto out; 819 return ret;
669 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
670 inode, ceph_vinop(inode), pos, (unsigned)len,
671 ceph_cap_string(got));
672 820
673 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || 821 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
674 (iocb->ki_filp->f_flags & O_DIRECT) || 822 (iocb->ki_filp->f_flags & O_DIRECT) ||
675 (fi->flags & CEPH_F_SYNC)) 823 (fi->flags & CEPH_F_SYNC)) {
824 struct iov_iter i;
825
826 dout("aio_sync_read %p %llx.%llx %llu~%u got cap refs on %s\n",
827 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
828 ceph_cap_string(got));
829
830 if (!read) {
831 ret = generic_segment_checks(iov, &nr_segs,
832 &len, VERIFY_WRITE);
833 if (ret)
834 goto out;
835 }
836
837 iov_iter_init(&i, iov, nr_segs, len, read);
838
676 /* hmm, this isn't really async... */ 839 /* hmm, this isn't really async... */
677 ret = ceph_sync_read(filp, base, len, ppos, &checkeof); 840 ret = ceph_sync_read(iocb, &i, &checkeof);
678 else 841 } else {
679 ret = generic_file_aio_read(iocb, iov, nr_segs, pos); 842 /*
843 * We can't modify the content of iov,
844 * so we only read from beginning.
845 */
846 if (read) {
847 iocb->ki_pos = pos;
848 len = iocb->ki_nbytes;
849 read = 0;
850 }
851 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
852 inode, ceph_vinop(inode), pos, (unsigned)len,
853 ceph_cap_string(got));
680 854
855 ret = generic_file_aio_read(iocb, iov, nr_segs, pos);
856 }
681out: 857out:
682 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", 858 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
683 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); 859 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
684 ceph_put_cap_refs(ci, got); 860 ceph_put_cap_refs(ci, got);
685 861
686 if (checkeof && ret >= 0) { 862 if (checkeof && ret >= 0) {
687 int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); 863 int statret = ceph_do_getattr(inode,
864 CEPH_STAT_CAP_SIZE);
688 865
689 /* hit EOF or hole? */ 866 /* hit EOF or hole? */
690 if (statret == 0 && *ppos < inode->i_size) { 867 if (statret == 0 && iocb->ki_pos < inode->i_size &&
691 dout("aio_read sync_read hit hole, ppos %lld < size %lld, reading more\n", *ppos, inode->i_size); 868 ret < len) {
869 dout("sync_read hit hole, ppos %lld < size %lld"
870 ", reading more\n", iocb->ki_pos,
871 inode->i_size);
872
692 read += ret; 873 read += ret;
693 base += ret;
694 len -= ret; 874 len -= ret;
695 checkeof = 0; 875 checkeof = 0;
696 goto again; 876 goto again;
697 } 877 }
698 } 878 }
879
699 if (ret >= 0) 880 if (ret >= 0)
700 ret += read; 881 ret += read;
701 882
@@ -772,11 +953,13 @@ retry_snap:
772 inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); 953 inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
773 954
774 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || 955 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
775 (iocb->ki_filp->f_flags & O_DIRECT) || 956 (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
776 (fi->flags & CEPH_F_SYNC)) {
777 mutex_unlock(&inode->i_mutex); 957 mutex_unlock(&inode->i_mutex);
778 written = ceph_sync_write(file, iov->iov_base, count, 958 if (file->f_flags & O_DIRECT)
779 pos, &iocb->ki_pos); 959 written = ceph_sync_direct_write(iocb, iov,
960 nr_segs, count);
961 else
962 written = ceph_sync_write(iocb, iov, nr_segs, count);
780 if (written == -EOLDSNAPC) { 963 if (written == -EOLDSNAPC) {
781 dout("aio_write %p %llx.%llx %llu~%u" 964 dout("aio_write %p %llx.%llx %llu~%u"
782 "got EOLDSNAPC, retrying\n", 965 "got EOLDSNAPC, retrying\n",
@@ -1018,7 +1201,7 @@ static long ceph_fallocate(struct file *file, int mode,
1018 loff_t offset, loff_t length) 1201 loff_t offset, loff_t length)
1019{ 1202{
1020 struct ceph_file_info *fi = file->private_data; 1203 struct ceph_file_info *fi = file->private_data;
1021 struct inode *inode = file->f_dentry->d_inode; 1204 struct inode *inode = file_inode(file);
1022 struct ceph_inode_info *ci = ceph_inode(inode); 1205 struct ceph_inode_info *ci = ceph_inode(inode);
1023 struct ceph_osd_client *osdc = 1206 struct ceph_osd_client *osdc =
1024 &ceph_inode_to_client(inode)->client->osdc; 1207 &ceph_inode_to_client(inode)->client->osdc;
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 278fd2891288..6fc10a7d7c59 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -95,6 +95,7 @@ const struct inode_operations ceph_file_iops = {
95 .getxattr = ceph_getxattr, 95 .getxattr = ceph_getxattr,
96 .listxattr = ceph_listxattr, 96 .listxattr = ceph_listxattr,
97 .removexattr = ceph_removexattr, 97 .removexattr = ceph_removexattr,
98 .get_acl = ceph_get_acl,
98}; 99};
99 100
100 101
@@ -335,12 +336,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
335 ci->i_hold_caps_min = 0; 336 ci->i_hold_caps_min = 0;
336 ci->i_hold_caps_max = 0; 337 ci->i_hold_caps_max = 0;
337 INIT_LIST_HEAD(&ci->i_cap_delay_list); 338 INIT_LIST_HEAD(&ci->i_cap_delay_list);
338 ci->i_cap_exporting_mds = 0;
339 ci->i_cap_exporting_mseq = 0;
340 ci->i_cap_exporting_issued = 0;
341 INIT_LIST_HEAD(&ci->i_cap_snaps); 339 INIT_LIST_HEAD(&ci->i_cap_snaps);
342 ci->i_head_snapc = NULL; 340 ci->i_head_snapc = NULL;
343 ci->i_snap_caps = 0; 341 ci->i_snap_caps = 0;
342 ci->i_cap_exporting_issued = 0;
344 343
345 for (i = 0; i < CEPH_FILE_MODE_NUM; i++) 344 for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
346 ci->i_nr_by_mode[i] = 0; 345 ci->i_nr_by_mode[i] = 0;
@@ -436,6 +435,16 @@ void ceph_destroy_inode(struct inode *inode)
436 call_rcu(&inode->i_rcu, ceph_i_callback); 435 call_rcu(&inode->i_rcu, ceph_i_callback);
437} 436}
438 437
438int ceph_drop_inode(struct inode *inode)
439{
440 /*
441 * Positve dentry and corresponding inode are always accompanied
442 * in MDS reply. So no need to keep inode in the cache after
443 * dropping all its aliases.
444 */
445 return 1;
446}
447
439/* 448/*
440 * Helpers to fill in size, ctime, mtime, and atime. We have to be 449 * Helpers to fill in size, ctime, mtime, and atime. We have to be
441 * careful because either the client or MDS may have more up to date 450 * careful because either the client or MDS may have more up to date
@@ -670,6 +679,7 @@ static int fill_inode(struct inode *inode,
670 memcpy(ci->i_xattrs.blob->vec.iov_base, 679 memcpy(ci->i_xattrs.blob->vec.iov_base,
671 iinfo->xattr_data, iinfo->xattr_len); 680 iinfo->xattr_data, iinfo->xattr_len);
672 ci->i_xattrs.version = le64_to_cpu(info->xattr_version); 681 ci->i_xattrs.version = le64_to_cpu(info->xattr_version);
682 ceph_forget_all_cached_acls(inode);
673 xattr_blob = NULL; 683 xattr_blob = NULL;
674 } 684 }
675 685
@@ -1454,7 +1464,8 @@ static void ceph_invalidate_work(struct work_struct *work)
1454 dout("invalidate_pages %p gen %d revoking %d\n", inode, 1464 dout("invalidate_pages %p gen %d revoking %d\n", inode,
1455 ci->i_rdcache_gen, ci->i_rdcache_revoking); 1465 ci->i_rdcache_gen, ci->i_rdcache_revoking);
1456 if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { 1466 if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
1457 /* nevermind! */ 1467 if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE))
1468 check = 1;
1458 spin_unlock(&ci->i_ceph_lock); 1469 spin_unlock(&ci->i_ceph_lock);
1459 mutex_unlock(&ci->i_truncate_mutex); 1470 mutex_unlock(&ci->i_truncate_mutex);
1460 goto out; 1471 goto out;
@@ -1475,13 +1486,14 @@ static void ceph_invalidate_work(struct work_struct *work)
1475 dout("invalidate_pages %p gen %d raced, now %d revoking %d\n", 1486 dout("invalidate_pages %p gen %d raced, now %d revoking %d\n",
1476 inode, orig_gen, ci->i_rdcache_gen, 1487 inode, orig_gen, ci->i_rdcache_gen,
1477 ci->i_rdcache_revoking); 1488 ci->i_rdcache_revoking);
1489 if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE))
1490 check = 1;
1478 } 1491 }
1479 spin_unlock(&ci->i_ceph_lock); 1492 spin_unlock(&ci->i_ceph_lock);
1480 mutex_unlock(&ci->i_truncate_mutex); 1493 mutex_unlock(&ci->i_truncate_mutex);
1481 1494out:
1482 if (check) 1495 if (check)
1483 ceph_check_caps(ci, 0, NULL); 1496 ceph_check_caps(ci, 0, NULL);
1484out:
1485 iput(inode); 1497 iput(inode);
1486} 1498}
1487 1499
@@ -1602,6 +1614,7 @@ static const struct inode_operations ceph_symlink_iops = {
1602 .getxattr = ceph_getxattr, 1614 .getxattr = ceph_getxattr,
1603 .listxattr = ceph_listxattr, 1615 .listxattr = ceph_listxattr,
1604 .removexattr = ceph_removexattr, 1616 .removexattr = ceph_removexattr,
1617 .get_acl = ceph_get_acl,
1605}; 1618};
1606 1619
1607/* 1620/*
@@ -1675,6 +1688,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1675 dirtied |= CEPH_CAP_AUTH_EXCL; 1688 dirtied |= CEPH_CAP_AUTH_EXCL;
1676 } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 || 1689 } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||
1677 attr->ia_mode != inode->i_mode) { 1690 attr->ia_mode != inode->i_mode) {
1691 inode->i_mode = attr->ia_mode;
1678 req->r_args.setattr.mode = cpu_to_le32(attr->ia_mode); 1692 req->r_args.setattr.mode = cpu_to_le32(attr->ia_mode);
1679 mask |= CEPH_SETATTR_MODE; 1693 mask |= CEPH_SETATTR_MODE;
1680 release |= CEPH_CAP_AUTH_SHARED; 1694 release |= CEPH_CAP_AUTH_SHARED;
@@ -1790,6 +1804,12 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1790 if (inode_dirty_flags) 1804 if (inode_dirty_flags)
1791 __mark_inode_dirty(inode, inode_dirty_flags); 1805 __mark_inode_dirty(inode, inode_dirty_flags);
1792 1806
1807 if (ia_valid & ATTR_MODE) {
1808 err = ceph_acl_chmod(dentry, inode);
1809 if (err)
1810 goto out_put;
1811 }
1812
1793 if (mask) { 1813 if (mask) {
1794 req->r_inode = inode; 1814 req->r_inode = inode;
1795 ihold(inode); 1815 ihold(inode);
@@ -1809,6 +1829,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1809 return err; 1829 return err;
1810out: 1830out:
1811 spin_unlock(&ci->i_ceph_lock); 1831 spin_unlock(&ci->i_ceph_lock);
1832out_put:
1812 ceph_mdsc_put_request(req); 1833 ceph_mdsc_put_request(req);
1813 return err; 1834 return err;
1814} 1835}
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index 669622fd1ae3..dc66c9e023e4 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -183,6 +183,8 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
183 struct ceph_inode_info *ci = ceph_inode(inode); 183 struct ceph_inode_info *ci = ceph_inode(inode);
184 struct ceph_osd_client *osdc = 184 struct ceph_osd_client *osdc =
185 &ceph_sb_to_client(inode->i_sb)->client->osdc; 185 &ceph_sb_to_client(inode->i_sb)->client->osdc;
186 struct ceph_object_locator oloc;
187 struct ceph_object_id oid;
186 u64 len = 1, olen; 188 u64 len = 1, olen;
187 u64 tmp; 189 u64 tmp;
188 struct ceph_pg pgid; 190 struct ceph_pg pgid;
@@ -211,8 +213,10 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
211 snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx", 213 snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx",
212 ceph_ino(inode), dl.object_no); 214 ceph_ino(inode), dl.object_no);
213 215
214 r = ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap, 216 oloc.pool = ceph_file_layout_pg_pool(ci->i_layout);
215 ceph_file_layout_pg_pool(ci->i_layout)); 217 ceph_oid_set_name(&oid, dl.object_name);
218
219 r = ceph_oloc_oid_to_pg(osdc->osdmap, &oloc, &oid, &pgid);
216 if (r < 0) { 220 if (r < 0) {
217 up_read(&osdc->map_sem); 221 up_read(&osdc->map_sem);
218 return r; 222 return r;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index d90861f45210..f4f050a69a48 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -63,7 +63,7 @@ static const struct ceph_connection_operations mds_con_ops;
63 */ 63 */
64static int parse_reply_info_in(void **p, void *end, 64static int parse_reply_info_in(void **p, void *end,
65 struct ceph_mds_reply_info_in *info, 65 struct ceph_mds_reply_info_in *info,
66 int features) 66 u64 features)
67{ 67{
68 int err = -EIO; 68 int err = -EIO;
69 69
@@ -98,7 +98,7 @@ bad:
98 */ 98 */
99static int parse_reply_info_trace(void **p, void *end, 99static int parse_reply_info_trace(void **p, void *end,
100 struct ceph_mds_reply_info_parsed *info, 100 struct ceph_mds_reply_info_parsed *info,
101 int features) 101 u64 features)
102{ 102{
103 int err; 103 int err;
104 104
@@ -145,7 +145,7 @@ out_bad:
145 */ 145 */
146static int parse_reply_info_dir(void **p, void *end, 146static int parse_reply_info_dir(void **p, void *end,
147 struct ceph_mds_reply_info_parsed *info, 147 struct ceph_mds_reply_info_parsed *info,
148 int features) 148 u64 features)
149{ 149{
150 u32 num, i = 0; 150 u32 num, i = 0;
151 int err; 151 int err;
@@ -217,7 +217,7 @@ out_bad:
217 */ 217 */
218static int parse_reply_info_filelock(void **p, void *end, 218static int parse_reply_info_filelock(void **p, void *end,
219 struct ceph_mds_reply_info_parsed *info, 219 struct ceph_mds_reply_info_parsed *info,
220 int features) 220 u64 features)
221{ 221{
222 if (*p + sizeof(*info->filelock_reply) > end) 222 if (*p + sizeof(*info->filelock_reply) > end)
223 goto bad; 223 goto bad;
@@ -238,7 +238,7 @@ bad:
238 */ 238 */
239static int parse_reply_info_create(void **p, void *end, 239static int parse_reply_info_create(void **p, void *end,
240 struct ceph_mds_reply_info_parsed *info, 240 struct ceph_mds_reply_info_parsed *info,
241 int features) 241 u64 features)
242{ 242{
243 if (features & CEPH_FEATURE_REPLY_CREATE_INODE) { 243 if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
244 if (*p == end) { 244 if (*p == end) {
@@ -262,7 +262,7 @@ bad:
262 */ 262 */
263static int parse_reply_info_extra(void **p, void *end, 263static int parse_reply_info_extra(void **p, void *end,
264 struct ceph_mds_reply_info_parsed *info, 264 struct ceph_mds_reply_info_parsed *info,
265 int features) 265 u64 features)
266{ 266{
267 if (info->head->op == CEPH_MDS_OP_GETFILELOCK) 267 if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
268 return parse_reply_info_filelock(p, end, info, features); 268 return parse_reply_info_filelock(p, end, info, features);
@@ -280,7 +280,7 @@ static int parse_reply_info_extra(void **p, void *end,
280 */ 280 */
281static int parse_reply_info(struct ceph_msg *msg, 281static int parse_reply_info(struct ceph_msg *msg,
282 struct ceph_mds_reply_info_parsed *info, 282 struct ceph_mds_reply_info_parsed *info,
283 int features) 283 u64 features)
284{ 284{
285 void *p, *end; 285 void *p, *end;
286 u32 len; 286 u32 len;
@@ -713,14 +713,15 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
713 struct dentry *dn = get_nonsnap_parent(parent); 713 struct dentry *dn = get_nonsnap_parent(parent);
714 inode = dn->d_inode; 714 inode = dn->d_inode;
715 dout("__choose_mds using nonsnap parent %p\n", inode); 715 dout("__choose_mds using nonsnap parent %p\n", inode);
716 } else if (req->r_dentry->d_inode) { 716 } else {
717 /* dentry target */ 717 /* dentry target */
718 inode = req->r_dentry->d_inode; 718 inode = req->r_dentry->d_inode;
719 } else { 719 if (!inode || mode == USE_AUTH_MDS) {
720 /* dir + name */ 720 /* dir + name */
721 inode = dir; 721 inode = dir;
722 hash = ceph_dentry_hash(dir, req->r_dentry); 722 hash = ceph_dentry_hash(dir, req->r_dentry);
723 is_hash = true; 723 is_hash = true;
724 }
724 } 725 }
725 } 726 }
726 727
@@ -846,35 +847,56 @@ static int __open_session(struct ceph_mds_client *mdsc,
846 * 847 *
847 * called under mdsc->mutex 848 * called under mdsc->mutex
848 */ 849 */
850static struct ceph_mds_session *
851__open_export_target_session(struct ceph_mds_client *mdsc, int target)
852{
853 struct ceph_mds_session *session;
854
855 session = __ceph_lookup_mds_session(mdsc, target);
856 if (!session) {
857 session = register_session(mdsc, target);
858 if (IS_ERR(session))
859 return session;
860 }
861 if (session->s_state == CEPH_MDS_SESSION_NEW ||
862 session->s_state == CEPH_MDS_SESSION_CLOSING)
863 __open_session(mdsc, session);
864
865 return session;
866}
867
868struct ceph_mds_session *
869ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
870{
871 struct ceph_mds_session *session;
872
873 dout("open_export_target_session to mds%d\n", target);
874
875 mutex_lock(&mdsc->mutex);
876 session = __open_export_target_session(mdsc, target);
877 mutex_unlock(&mdsc->mutex);
878
879 return session;
880}
881
849static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 882static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
850 struct ceph_mds_session *session) 883 struct ceph_mds_session *session)
851{ 884{
852 struct ceph_mds_info *mi; 885 struct ceph_mds_info *mi;
853 struct ceph_mds_session *ts; 886 struct ceph_mds_session *ts;
854 int i, mds = session->s_mds; 887 int i, mds = session->s_mds;
855 int target;
856 888
857 if (mds >= mdsc->mdsmap->m_max_mds) 889 if (mds >= mdsc->mdsmap->m_max_mds)
858 return; 890 return;
891
859 mi = &mdsc->mdsmap->m_info[mds]; 892 mi = &mdsc->mdsmap->m_info[mds];
860 dout("open_export_target_sessions for mds%d (%d targets)\n", 893 dout("open_export_target_sessions for mds%d (%d targets)\n",
861 session->s_mds, mi->num_export_targets); 894 session->s_mds, mi->num_export_targets);
862 895
863 for (i = 0; i < mi->num_export_targets; i++) { 896 for (i = 0; i < mi->num_export_targets; i++) {
864 target = mi->export_targets[i]; 897 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
865 ts = __ceph_lookup_mds_session(mdsc, target); 898 if (!IS_ERR(ts))
866 if (!ts) { 899 ceph_put_mds_session(ts);
867 ts = register_session(mdsc, target);
868 if (IS_ERR(ts))
869 return;
870 }
871 if (session->s_state == CEPH_MDS_SESSION_NEW ||
872 session->s_state == CEPH_MDS_SESSION_CLOSING)
873 __open_session(mdsc, session);
874 else
875 dout(" mds%d target mds%d %p is %s\n", session->s_mds,
876 i, ts, session_state_name(ts->s_state));
877 ceph_put_mds_session(ts);
878 } 900 }
879} 901}
880 902
@@ -1136,6 +1158,21 @@ static int send_renew_caps(struct ceph_mds_client *mdsc,
1136 return 0; 1158 return 0;
1137} 1159}
1138 1160
1161static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1162 struct ceph_mds_session *session, u64 seq)
1163{
1164 struct ceph_msg *msg;
1165
1166 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1167 session->s_mds, session_state_name(session->s_state), seq);
1168 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1169 if (!msg)
1170 return -ENOMEM;
1171 ceph_con_send(&session->s_con, msg);
1172 return 0;
1173}
1174
1175
1139/* 1176/*
1140 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1177 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1141 * 1178 *
@@ -1214,7 +1251,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1214{ 1251{
1215 struct ceph_mds_session *session = arg; 1252 struct ceph_mds_session *session = arg;
1216 struct ceph_inode_info *ci = ceph_inode(inode); 1253 struct ceph_inode_info *ci = ceph_inode(inode);
1217 int used, oissued, mine; 1254 int used, wanted, oissued, mine;
1218 1255
1219 if (session->s_trim_caps <= 0) 1256 if (session->s_trim_caps <= 0)
1220 return -1; 1257 return -1;
@@ -1222,14 +1259,19 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1222 spin_lock(&ci->i_ceph_lock); 1259 spin_lock(&ci->i_ceph_lock);
1223 mine = cap->issued | cap->implemented; 1260 mine = cap->issued | cap->implemented;
1224 used = __ceph_caps_used(ci); 1261 used = __ceph_caps_used(ci);
1262 wanted = __ceph_caps_file_wanted(ci);
1225 oissued = __ceph_caps_issued_other(ci, cap); 1263 oissued = __ceph_caps_issued_other(ci, cap);
1226 1264
1227 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n", 1265 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1228 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1266 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1229 ceph_cap_string(used)); 1267 ceph_cap_string(used), ceph_cap_string(wanted));
1230 if (ci->i_dirty_caps) 1268 if (cap == ci->i_auth_cap) {
1231 goto out; /* dirty caps */ 1269 if (ci->i_dirty_caps | ci->i_flushing_caps)
1232 if ((used & ~oissued) & mine) 1270 goto out;
1271 if ((used | wanted) & CEPH_CAP_ANY_WR)
1272 goto out;
1273 }
1274 if ((used | wanted) & ~oissued & mine)
1233 goto out; /* we need these caps */ 1275 goto out; /* we need these caps */
1234 1276
1235 session->s_trim_caps--; 1277 session->s_trim_caps--;
@@ -2156,26 +2198,16 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2156 */ 2198 */
2157 if (result == -ESTALE) { 2199 if (result == -ESTALE) {
2158 dout("got ESTALE on request %llu", req->r_tid); 2200 dout("got ESTALE on request %llu", req->r_tid);
2159 if (!req->r_inode) { 2201 if (req->r_direct_mode != USE_AUTH_MDS) {
2160 /* do nothing; not an authority problem */
2161 } else if (req->r_direct_mode != USE_AUTH_MDS) {
2162 dout("not using auth, setting for that now"); 2202 dout("not using auth, setting for that now");
2163 req->r_direct_mode = USE_AUTH_MDS; 2203 req->r_direct_mode = USE_AUTH_MDS;
2164 __do_request(mdsc, req); 2204 __do_request(mdsc, req);
2165 mutex_unlock(&mdsc->mutex); 2205 mutex_unlock(&mdsc->mutex);
2166 goto out; 2206 goto out;
2167 } else { 2207 } else {
2168 struct ceph_inode_info *ci = ceph_inode(req->r_inode); 2208 int mds = __choose_mds(mdsc, req);
2169 struct ceph_cap *cap = NULL; 2209 if (mds >= 0 && mds != req->r_session->s_mds) {
2170 2210 dout("but auth changed, so resending");
2171 if (req->r_session)
2172 cap = ceph_get_cap_for_mds(ci,
2173 req->r_session->s_mds);
2174
2175 dout("already using auth");
2176 if ((!cap || cap != ci->i_auth_cap) ||
2177 (cap->mseq != req->r_sent_on_mseq)) {
2178 dout("but cap changed, so resending");
2179 __do_request(mdsc, req); 2211 __do_request(mdsc, req);
2180 mutex_unlock(&mdsc->mutex); 2212 mutex_unlock(&mdsc->mutex);
2181 goto out; 2213 goto out;
@@ -2400,6 +2432,10 @@ static void handle_session(struct ceph_mds_session *session,
2400 trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 2432 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2401 break; 2433 break;
2402 2434
2435 case CEPH_SESSION_FLUSHMSG:
2436 send_flushmsg_ack(mdsc, session, seq);
2437 break;
2438
2403 default: 2439 default:
2404 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 2440 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2405 WARN_ON(1); 2441 WARN_ON(1);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 4c053d099ae4..68288917c737 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -383,6 +383,8 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
383extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, 383extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
384 struct ceph_msg *msg); 384 struct ceph_msg *msg);
385 385
386extern struct ceph_mds_session *
387ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target);
386extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 388extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
387 struct ceph_mds_session *session); 389 struct ceph_mds_session *session);
388 390
diff --git a/fs/ceph/strings.c b/fs/ceph/strings.c
index 89fa4a940a0f..4440f447fd3f 100644
--- a/fs/ceph/strings.c
+++ b/fs/ceph/strings.c
@@ -41,6 +41,8 @@ const char *ceph_session_op_name(int op)
41 case CEPH_SESSION_RENEWCAPS: return "renewcaps"; 41 case CEPH_SESSION_RENEWCAPS: return "renewcaps";
42 case CEPH_SESSION_STALE: return "stale"; 42 case CEPH_SESSION_STALE: return "stale";
43 case CEPH_SESSION_RECALL_STATE: return "recall_state"; 43 case CEPH_SESSION_RECALL_STATE: return "recall_state";
44 case CEPH_SESSION_FLUSHMSG: return "flushmsg";
45 case CEPH_SESSION_FLUSHMSG_ACK: return "flushmsg_ack";
44 } 46 }
45 return "???"; 47 return "???";
46} 48}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 6a0951e43044..2df963f1cf5a 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -490,10 +490,10 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
490 struct ceph_options *opt) 490 struct ceph_options *opt)
491{ 491{
492 struct ceph_fs_client *fsc; 492 struct ceph_fs_client *fsc;
493 const unsigned supported_features = 493 const u64 supported_features =
494 CEPH_FEATURE_FLOCK | 494 CEPH_FEATURE_FLOCK |
495 CEPH_FEATURE_DIRLAYOUTHASH; 495 CEPH_FEATURE_DIRLAYOUTHASH;
496 const unsigned required_features = 0; 496 const u64 required_features = 0;
497 int page_count; 497 int page_count;
498 size_t size; 498 size_t size;
499 int err = -ENOMEM; 499 int err = -ENOMEM;
@@ -686,6 +686,7 @@ static const struct super_operations ceph_super_ops = {
686 .alloc_inode = ceph_alloc_inode, 686 .alloc_inode = ceph_alloc_inode,
687 .destroy_inode = ceph_destroy_inode, 687 .destroy_inode = ceph_destroy_inode,
688 .write_inode = ceph_write_inode, 688 .write_inode = ceph_write_inode,
689 .drop_inode = ceph_drop_inode,
689 .sync_fs = ceph_sync_fs, 690 .sync_fs = ceph_sync_fs,
690 .put_super = ceph_put_super, 691 .put_super = ceph_put_super,
691 .show_options = ceph_show_options, 692 .show_options = ceph_show_options,
@@ -818,7 +819,11 @@ static int ceph_set_super(struct super_block *s, void *data)
818 819
819 s->s_flags = fsc->mount_options->sb_flags; 820 s->s_flags = fsc->mount_options->sb_flags;
820 s->s_maxbytes = 1ULL << 40; /* temp value until we get mdsmap */ 821 s->s_maxbytes = 1ULL << 40; /* temp value until we get mdsmap */
822#ifdef CONFIG_CEPH_FS_POSIX_ACL
823 s->s_flags |= MS_POSIXACL;
824#endif
821 825
826 s->s_xattr = ceph_xattr_handlers;
822 s->s_fs_info = fsc; 827 s->s_fs_info = fsc;
823 fsc->sb = s; 828 fsc->sb = s;
824 829
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index ef4ac38bb614..c299f7d19bf3 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -287,14 +287,12 @@ struct ceph_inode_info {
287 unsigned long i_hold_caps_min; /* jiffies */ 287 unsigned long i_hold_caps_min; /* jiffies */
288 unsigned long i_hold_caps_max; /* jiffies */ 288 unsigned long i_hold_caps_max; /* jiffies */
289 struct list_head i_cap_delay_list; /* for delayed cap release to mds */ 289 struct list_head i_cap_delay_list; /* for delayed cap release to mds */
290 int i_cap_exporting_mds; /* to handle cap migration between */
291 unsigned i_cap_exporting_mseq; /* mds's. */
292 unsigned i_cap_exporting_issued;
293 struct ceph_cap_reservation i_cap_migration_resv; 290 struct ceph_cap_reservation i_cap_migration_resv;
294 struct list_head i_cap_snaps; /* snapped state pending flush to mds */ 291 struct list_head i_cap_snaps; /* snapped state pending flush to mds */
295 struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 or 292 struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 or
296 dirty|flushing caps */ 293 dirty|flushing caps */
297 unsigned i_snap_caps; /* cap bits for snapped files */ 294 unsigned i_snap_caps; /* cap bits for snapped files */
295 unsigned i_cap_exporting_issued;
298 296
299 int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */ 297 int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */
300 298
@@ -335,7 +333,6 @@ struct ceph_inode_info {
335 u32 i_fscache_gen; /* sequence, for delayed fscache validate */ 333 u32 i_fscache_gen; /* sequence, for delayed fscache validate */
336 struct work_struct i_revalidate_work; 334 struct work_struct i_revalidate_work;
337#endif 335#endif
338
339 struct inode vfs_inode; /* at end */ 336 struct inode vfs_inode; /* at end */
340}; 337};
341 338
@@ -529,6 +526,8 @@ static inline int __ceph_caps_dirty(struct ceph_inode_info *ci)
529} 526}
530extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask); 527extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask);
531 528
529extern int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
530 struct ceph_cap *ocap, int mask);
532extern int ceph_caps_revoking(struct ceph_inode_info *ci, int mask); 531extern int ceph_caps_revoking(struct ceph_inode_info *ci, int mask);
533extern int __ceph_caps_used(struct ceph_inode_info *ci); 532extern int __ceph_caps_used(struct ceph_inode_info *ci);
534 533
@@ -691,6 +690,7 @@ extern const struct inode_operations ceph_file_iops;
691 690
692extern struct inode *ceph_alloc_inode(struct super_block *sb); 691extern struct inode *ceph_alloc_inode(struct super_block *sb);
693extern void ceph_destroy_inode(struct inode *inode); 692extern void ceph_destroy_inode(struct inode *inode);
693extern int ceph_drop_inode(struct inode *inode);
694 694
695extern struct inode *ceph_get_inode(struct super_block *sb, 695extern struct inode *ceph_get_inode(struct super_block *sb,
696 struct ceph_vino vino); 696 struct ceph_vino vino);
@@ -724,6 +724,9 @@ extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
724/* xattr.c */ 724/* xattr.c */
725extern int ceph_setxattr(struct dentry *, const char *, const void *, 725extern int ceph_setxattr(struct dentry *, const char *, const void *,
726 size_t, int); 726 size_t, int);
727int __ceph_setxattr(struct dentry *, const char *, const void *, size_t, int);
728ssize_t __ceph_getxattr(struct inode *, const char *, void *, size_t);
729int __ceph_removexattr(struct dentry *, const char *);
727extern ssize_t ceph_getxattr(struct dentry *, const char *, void *, size_t); 730extern ssize_t ceph_getxattr(struct dentry *, const char *, void *, size_t);
728extern ssize_t ceph_listxattr(struct dentry *, char *, size_t); 731extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
729extern int ceph_removexattr(struct dentry *, const char *); 732extern int ceph_removexattr(struct dentry *, const char *);
@@ -732,6 +735,39 @@ extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci);
732extern void __init ceph_xattr_init(void); 735extern void __init ceph_xattr_init(void);
733extern void ceph_xattr_exit(void); 736extern void ceph_xattr_exit(void);
734 737
738/* acl.c */
739extern const struct xattr_handler ceph_xattr_acl_access_handler;
740extern const struct xattr_handler ceph_xattr_acl_default_handler;
741extern const struct xattr_handler *ceph_xattr_handlers[];
742
743#ifdef CONFIG_CEPH_FS_POSIX_ACL
744
745struct posix_acl *ceph_get_acl(struct inode *, int);
746int ceph_init_acl(struct dentry *, struct inode *, struct inode *);
747int ceph_acl_chmod(struct dentry *, struct inode *);
748void ceph_forget_all_cached_acls(struct inode *inode);
749
750#else
751
752#define ceph_get_acl NULL
753
754static inline int ceph_init_acl(struct dentry *dentry, struct inode *inode,
755 struct inode *dir)
756{
757 return 0;
758}
759
760static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode)
761{
762 return 0;
763}
764
765static inline void ceph_forget_all_cached_acls(struct inode *inode)
766{
767}
768
769#endif
770
735/* caps.c */ 771/* caps.c */
736extern const char *ceph_cap_string(int c); 772extern const char *ceph_cap_string(int c);
737extern void ceph_handle_caps(struct ceph_mds_session *session, 773extern void ceph_handle_caps(struct ceph_mds_session *session,
@@ -744,6 +780,7 @@ extern int ceph_add_cap(struct inode *inode,
744extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release); 780extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
745extern void ceph_put_cap(struct ceph_mds_client *mdsc, 781extern void ceph_put_cap(struct ceph_mds_client *mdsc,
746 struct ceph_cap *cap); 782 struct ceph_cap *cap);
783extern int ceph_is_any_caps(struct inode *inode);
747 784
748extern void __queue_cap_release(struct ceph_mds_session *session, u64 ino, 785extern void __queue_cap_release(struct ceph_mds_session *session, u64 ino,
749 u64 cap_id, u32 migrate_seq, u32 issue_seq); 786 u64 cap_id, u32 migrate_seq, u32 issue_seq);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index be661d8f532a..c7581f3733c1 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -11,11 +11,24 @@
11#define XATTR_CEPH_PREFIX "ceph." 11#define XATTR_CEPH_PREFIX "ceph."
12#define XATTR_CEPH_PREFIX_LEN (sizeof (XATTR_CEPH_PREFIX) - 1) 12#define XATTR_CEPH_PREFIX_LEN (sizeof (XATTR_CEPH_PREFIX) - 1)
13 13
14/*
15 * List of handlers for synthetic system.* attributes. Other
16 * attributes are handled directly.
17 */
18const struct xattr_handler *ceph_xattr_handlers[] = {
19#ifdef CONFIG_CEPH_FS_POSIX_ACL
20 &ceph_xattr_acl_access_handler,
21 &ceph_xattr_acl_default_handler,
22#endif
23 NULL,
24};
25
14static bool ceph_is_valid_xattr(const char *name) 26static bool ceph_is_valid_xattr(const char *name)
15{ 27{
16 return !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) || 28 return !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) ||
17 !strncmp(name, XATTR_SECURITY_PREFIX, 29 !strncmp(name, XATTR_SECURITY_PREFIX,
18 XATTR_SECURITY_PREFIX_LEN) || 30 XATTR_SECURITY_PREFIX_LEN) ||
31 !strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN) ||
19 !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) || 32 !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) ||
20 !strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN); 33 !strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN);
21} 34}
@@ -663,10 +676,9 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
663 } 676 }
664} 677}
665 678
666ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, 679ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
667 size_t size) 680 size_t size)
668{ 681{
669 struct inode *inode = dentry->d_inode;
670 struct ceph_inode_info *ci = ceph_inode(inode); 682 struct ceph_inode_info *ci = ceph_inode(inode);
671 int err; 683 int err;
672 struct ceph_inode_xattr *xattr; 684 struct ceph_inode_xattr *xattr;
@@ -675,7 +687,6 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
675 if (!ceph_is_valid_xattr(name)) 687 if (!ceph_is_valid_xattr(name))
676 return -ENODATA; 688 return -ENODATA;
677 689
678
679 /* let's see if a virtual xattr was requested */ 690 /* let's see if a virtual xattr was requested */
680 vxattr = ceph_match_vxattr(inode, name); 691 vxattr = ceph_match_vxattr(inode, name);
681 if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) { 692 if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) {
@@ -725,6 +736,15 @@ out:
725 return err; 736 return err;
726} 737}
727 738
739ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
740 size_t size)
741{
742 if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
743 return generic_getxattr(dentry, name, value, size);
744
745 return __ceph_getxattr(dentry->d_inode, name, value, size);
746}
747
728ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size) 748ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
729{ 749{
730 struct inode *inode = dentry->d_inode; 750 struct inode *inode = dentry->d_inode;
@@ -863,8 +883,8 @@ out:
863 return err; 883 return err;
864} 884}
865 885
866int ceph_setxattr(struct dentry *dentry, const char *name, 886int __ceph_setxattr(struct dentry *dentry, const char *name,
867 const void *value, size_t size, int flags) 887 const void *value, size_t size, int flags)
868{ 888{
869 struct inode *inode = dentry->d_inode; 889 struct inode *inode = dentry->d_inode;
870 struct ceph_vxattr *vxattr; 890 struct ceph_vxattr *vxattr;
@@ -879,9 +899,6 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
879 struct ceph_inode_xattr *xattr = NULL; 899 struct ceph_inode_xattr *xattr = NULL;
880 int required_blob_size; 900 int required_blob_size;
881 901
882 if (ceph_snap(inode) != CEPH_NOSNAP)
883 return -EROFS;
884
885 if (!ceph_is_valid_xattr(name)) 902 if (!ceph_is_valid_xattr(name))
886 return -EOPNOTSUPP; 903 return -EOPNOTSUPP;
887 904
@@ -958,6 +975,18 @@ out:
958 return err; 975 return err;
959} 976}
960 977
978int ceph_setxattr(struct dentry *dentry, const char *name,
979 const void *value, size_t size, int flags)
980{
981 if (ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
982 return -EROFS;
983
984 if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
985 return generic_setxattr(dentry, name, value, size, flags);
986
987 return __ceph_setxattr(dentry, name, value, size, flags);
988}
989
961static int ceph_send_removexattr(struct dentry *dentry, const char *name) 990static int ceph_send_removexattr(struct dentry *dentry, const char *name)
962{ 991{
963 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 992 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
@@ -984,7 +1013,7 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name)
984 return err; 1013 return err;
985} 1014}
986 1015
987int ceph_removexattr(struct dentry *dentry, const char *name) 1016int __ceph_removexattr(struct dentry *dentry, const char *name)
988{ 1017{
989 struct inode *inode = dentry->d_inode; 1018 struct inode *inode = dentry->d_inode;
990 struct ceph_vxattr *vxattr; 1019 struct ceph_vxattr *vxattr;
@@ -994,9 +1023,6 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
994 int required_blob_size; 1023 int required_blob_size;
995 int dirty; 1024 int dirty;
996 1025
997 if (ceph_snap(inode) != CEPH_NOSNAP)
998 return -EROFS;
999
1000 if (!ceph_is_valid_xattr(name)) 1026 if (!ceph_is_valid_xattr(name))
1001 return -EOPNOTSUPP; 1027 return -EOPNOTSUPP;
1002 1028
@@ -1053,3 +1079,13 @@ out:
1053 return err; 1079 return err;
1054} 1080}
1055 1081
1082int ceph_removexattr(struct dentry *dentry, const char *name)
1083{
1084 if (ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
1085 return -EROFS;
1086
1087 if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
1088 return generic_removexattr(dentry, name);
1089
1090 return __ceph_removexattr(dentry, name);
1091}