aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorYan, Zheng <zyan@redhat.com>2014-09-16 08:35:17 -0400
committerSage Weil <sage@redhat.com>2014-10-14 15:56:49 -0400
commitb1ee94aa593abd03634bc3887b8e189840e42c12 (patch)
tree59f8d66fcc03a21a2f5364cd1b1d418705e45bdc /fs
parent25e6bae356502cde283f1804111b44e6fad20fc2 (diff)
ceph: include the initial ACL in create/mkdir/mknod MDS requests
Current code set new file/directory's initial ACL in a non-atomic manner. Client first sends request to MDS to create new file/directory, then set the initial ACL after the new file/directory is successfully created. The fix is include the initial ACL in create/mkdir/mknod MDS requests. So MDS can handle creating file/directory and setting the initial ACL in one request. Signed-off-by: Yan, Zheng <zyan@redhat.com> Reviewed-by: Sage Weil <sage@redhat.com>
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/acl.c125
-rw-r--r--fs/ceph/dir.c41
-rw-r--r--fs/ceph/file.c27
-rw-r--r--fs/ceph/super.h24
4 files changed, 170 insertions, 47 deletions
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index cebf2ebefb55..5bd853ba44ff 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -169,36 +169,109 @@ out:
169 return ret; 169 return ret;
170} 170}
171 171
172int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir) 172int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
173 struct ceph_acls_info *info)
173{ 174{
174 struct posix_acl *default_acl, *acl; 175 struct posix_acl *acl, *default_acl;
175 umode_t new_mode = inode->i_mode; 176 size_t val_size1 = 0, val_size2 = 0;
176 int error; 177 struct ceph_pagelist *pagelist = NULL;
177 178 void *tmp_buf = NULL;
178 error = posix_acl_create(dir, &new_mode, &default_acl, &acl); 179 int err;
179 if (error) 180
180 return error; 181 err = posix_acl_create(dir, mode, &default_acl, &acl);
181 182 if (err)
182 if (!default_acl && !acl) { 183 return err;
183 cache_no_acl(inode); 184
184 if (new_mode != inode->i_mode) { 185 if (acl) {
185 struct iattr newattrs = { 186 int ret = posix_acl_equiv_mode(acl, mode);
186 .ia_mode = new_mode, 187 if (ret < 0)
187 .ia_valid = ATTR_MODE, 188 goto out_err;
188 }; 189 if (ret == 0) {
189 error = ceph_setattr(dentry, &newattrs); 190 posix_acl_release(acl);
191 acl = NULL;
190 } 192 }
191 return error;
192 } 193 }
193 194
194 if (default_acl) { 195 if (!default_acl && !acl)
195 error = ceph_set_acl(inode, default_acl, ACL_TYPE_DEFAULT); 196 return 0;
196 posix_acl_release(default_acl); 197
197 } 198 if (acl)
199 val_size1 = posix_acl_xattr_size(acl->a_count);
200 if (default_acl)
201 val_size2 = posix_acl_xattr_size(default_acl->a_count);
202
203 err = -ENOMEM;
204 tmp_buf = kmalloc(max(val_size1, val_size2), GFP_NOFS);
205 if (!tmp_buf)
206 goto out_err;
207 pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_NOFS);
208 if (!pagelist)
209 goto out_err;
210 ceph_pagelist_init(pagelist);
211
212 err = ceph_pagelist_reserve(pagelist, PAGE_SIZE);
213 if (err)
214 goto out_err;
215
216 ceph_pagelist_encode_32(pagelist, acl && default_acl ? 2 : 1);
217
198 if (acl) { 218 if (acl) {
199 if (!error) 219 size_t len = strlen(POSIX_ACL_XATTR_ACCESS);
200 error = ceph_set_acl(inode, acl, ACL_TYPE_ACCESS); 220 err = ceph_pagelist_reserve(pagelist, len + val_size1 + 8);
201 posix_acl_release(acl); 221 if (err)
222 goto out_err;
223 ceph_pagelist_encode_string(pagelist, POSIX_ACL_XATTR_ACCESS,
224 len);
225 err = posix_acl_to_xattr(&init_user_ns, acl,
226 tmp_buf, val_size1);
227 if (err < 0)
228 goto out_err;
229 ceph_pagelist_encode_32(pagelist, val_size1);
230 ceph_pagelist_append(pagelist, tmp_buf, val_size1);
202 } 231 }
203 return error; 232 if (default_acl) {
233 size_t len = strlen(POSIX_ACL_XATTR_DEFAULT);
234 err = ceph_pagelist_reserve(pagelist, len + val_size2 + 8);
235 if (err)
236 goto out_err;
237 err = ceph_pagelist_encode_string(pagelist,
238 POSIX_ACL_XATTR_DEFAULT, len);
239 err = posix_acl_to_xattr(&init_user_ns, default_acl,
240 tmp_buf, val_size2);
241 if (err < 0)
242 goto out_err;
243 ceph_pagelist_encode_32(pagelist, val_size2);
244 ceph_pagelist_append(pagelist, tmp_buf, val_size2);
245 }
246
247 kfree(tmp_buf);
248
249 info->acl = acl;
250 info->default_acl = default_acl;
251 info->pagelist = pagelist;
252 return 0;
253
254out_err:
255 posix_acl_release(acl);
256 posix_acl_release(default_acl);
257 kfree(tmp_buf);
258 if (pagelist)
259 ceph_pagelist_release(pagelist);
260 return err;
261}
262
263void ceph_init_inode_acls(struct inode* inode, struct ceph_acls_info *info)
264{
265 if (!inode)
266 return;
267 ceph_set_cached_acl(inode, ACL_TYPE_ACCESS, info->acl);
268 ceph_set_cached_acl(inode, ACL_TYPE_DEFAULT, info->default_acl);
269}
270
271void ceph_release_acls_info(struct ceph_acls_info *info)
272{
273 posix_acl_release(info->acl);
274 posix_acl_release(info->default_acl);
275 if (info->pagelist)
276 ceph_pagelist_release(info->pagelist);
204} 277}
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index c29d6ae68874..26be84929439 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -682,17 +682,22 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
682 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 682 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
683 struct ceph_mds_client *mdsc = fsc->mdsc; 683 struct ceph_mds_client *mdsc = fsc->mdsc;
684 struct ceph_mds_request *req; 684 struct ceph_mds_request *req;
685 struct ceph_acls_info acls = {};
685 int err; 686 int err;
686 687
687 if (ceph_snap(dir) != CEPH_NOSNAP) 688 if (ceph_snap(dir) != CEPH_NOSNAP)
688 return -EROFS; 689 return -EROFS;
689 690
691 err = ceph_pre_init_acls(dir, &mode, &acls);
692 if (err < 0)
693 return err;
694
690 dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n", 695 dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n",
691 dir, dentry, mode, rdev); 696 dir, dentry, mode, rdev);
692 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS); 697 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS);
693 if (IS_ERR(req)) { 698 if (IS_ERR(req)) {
694 d_drop(dentry); 699 err = PTR_ERR(req);
695 return PTR_ERR(req); 700 goto out;
696 } 701 }
697 req->r_dentry = dget(dentry); 702 req->r_dentry = dget(dentry);
698 req->r_num_caps = 2; 703 req->r_num_caps = 2;
@@ -701,15 +706,20 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
701 req->r_args.mknod.rdev = cpu_to_le32(rdev); 706 req->r_args.mknod.rdev = cpu_to_le32(rdev);
702 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 707 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
703 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 708 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
709 if (acls.pagelist) {
710 req->r_pagelist = acls.pagelist;
711 acls.pagelist = NULL;
712 }
704 err = ceph_mdsc_do_request(mdsc, dir, req); 713 err = ceph_mdsc_do_request(mdsc, dir, req);
705 if (!err && !req->r_reply_info.head->is_dentry) 714 if (!err && !req->r_reply_info.head->is_dentry)
706 err = ceph_handle_notrace_create(dir, dentry); 715 err = ceph_handle_notrace_create(dir, dentry);
707 ceph_mdsc_put_request(req); 716 ceph_mdsc_put_request(req);
708 717out:
709 if (!err) 718 if (!err)
710 ceph_init_acl(dentry, dentry->d_inode, dir); 719 ceph_init_inode_acls(dentry->d_inode, &acls);
711 else 720 else
712 d_drop(dentry); 721 d_drop(dentry);
722 ceph_release_acls_info(&acls);
713 return err; 723 return err;
714} 724}
715 725
@@ -733,8 +743,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
733 dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest); 743 dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
734 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS); 744 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS);
735 if (IS_ERR(req)) { 745 if (IS_ERR(req)) {
736 d_drop(dentry); 746 err = PTR_ERR(req);
737 return PTR_ERR(req); 747 goto out;
738 } 748 }
739 req->r_dentry = dget(dentry); 749 req->r_dentry = dget(dentry);
740 req->r_num_caps = 2; 750 req->r_num_caps = 2;
@@ -746,9 +756,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
746 if (!err && !req->r_reply_info.head->is_dentry) 756 if (!err && !req->r_reply_info.head->is_dentry)
747 err = ceph_handle_notrace_create(dir, dentry); 757 err = ceph_handle_notrace_create(dir, dentry);
748 ceph_mdsc_put_request(req); 758 ceph_mdsc_put_request(req);
749 if (!err) 759out:
750 ceph_init_acl(dentry, dentry->d_inode, dir); 760 if (err)
751 else
752 d_drop(dentry); 761 d_drop(dentry);
753 return err; 762 return err;
754} 763}
@@ -758,6 +767,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
758 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 767 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
759 struct ceph_mds_client *mdsc = fsc->mdsc; 768 struct ceph_mds_client *mdsc = fsc->mdsc;
760 struct ceph_mds_request *req; 769 struct ceph_mds_request *req;
770 struct ceph_acls_info acls = {};
761 int err = -EROFS; 771 int err = -EROFS;
762 int op; 772 int op;
763 773
@@ -772,6 +782,12 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
772 } else { 782 } else {
773 goto out; 783 goto out;
774 } 784 }
785
786 mode |= S_IFDIR;
787 err = ceph_pre_init_acls(dir, &mode, &acls);
788 if (err < 0)
789 goto out;
790
775 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 791 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
776 if (IS_ERR(req)) { 792 if (IS_ERR(req)) {
777 err = PTR_ERR(req); 793 err = PTR_ERR(req);
@@ -784,15 +800,20 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
784 req->r_args.mkdir.mode = cpu_to_le32(mode); 800 req->r_args.mkdir.mode = cpu_to_le32(mode);
785 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 801 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
786 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 802 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
803 if (acls.pagelist) {
804 req->r_pagelist = acls.pagelist;
805 acls.pagelist = NULL;
806 }
787 err = ceph_mdsc_do_request(mdsc, dir, req); 807 err = ceph_mdsc_do_request(mdsc, dir, req);
788 if (!err && !req->r_reply_info.head->is_dentry) 808 if (!err && !req->r_reply_info.head->is_dentry)
789 err = ceph_handle_notrace_create(dir, dentry); 809 err = ceph_handle_notrace_create(dir, dentry);
790 ceph_mdsc_put_request(req); 810 ceph_mdsc_put_request(req);
791out: 811out:
792 if (!err) 812 if (!err)
793 ceph_init_acl(dentry, dentry->d_inode, dir); 813 ceph_init_inode_acls(dentry->d_inode, &acls);
794 else 814 else
795 d_drop(dentry); 815 d_drop(dentry);
816 ceph_release_acls_info(&acls);
796 return err; 817 return err;
797} 818}
798 819
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index d190650811c4..d7e0da8366e6 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -235,6 +235,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
235 struct ceph_mds_client *mdsc = fsc->mdsc; 235 struct ceph_mds_client *mdsc = fsc->mdsc;
236 struct ceph_mds_request *req; 236 struct ceph_mds_request *req;
237 struct dentry *dn; 237 struct dentry *dn;
238 struct ceph_acls_info acls = {};
238 int err; 239 int err;
239 240
240 dout("atomic_open %p dentry %p '%.*s' %s flags %d mode 0%o\n", 241 dout("atomic_open %p dentry %p '%.*s' %s flags %d mode 0%o\n",
@@ -248,22 +249,34 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
248 if (err < 0) 249 if (err < 0)
249 return err; 250 return err;
250 251
252 if (flags & O_CREAT) {
253 err = ceph_pre_init_acls(dir, &mode, &acls);
254 if (err < 0)
255 return err;
256 }
257
251 /* do the open */ 258 /* do the open */
252 req = prepare_open_request(dir->i_sb, flags, mode); 259 req = prepare_open_request(dir->i_sb, flags, mode);
253 if (IS_ERR(req)) 260 if (IS_ERR(req)) {
254 return PTR_ERR(req); 261 err = PTR_ERR(req);
262 goto out_acl;
263 }
255 req->r_dentry = dget(dentry); 264 req->r_dentry = dget(dentry);
256 req->r_num_caps = 2; 265 req->r_num_caps = 2;
257 if (flags & O_CREAT) { 266 if (flags & O_CREAT) {
258 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 267 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
259 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 268 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
269 if (acls.pagelist) {
270 req->r_pagelist = acls.pagelist;
271 acls.pagelist = NULL;
272 }
260 } 273 }
261 req->r_locked_dir = dir; /* caller holds dir->i_mutex */ 274 req->r_locked_dir = dir; /* caller holds dir->i_mutex */
262 err = ceph_mdsc_do_request(mdsc, 275 err = ceph_mdsc_do_request(mdsc,
263 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, 276 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
264 req); 277 req);
265 if (err) 278 if (err)
266 goto out_err; 279 goto out_req;
267 280
268 err = ceph_handle_snapdir(req, dentry, err); 281 err = ceph_handle_snapdir(req, dentry, err);
269 if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) 282 if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
@@ -278,7 +291,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
278 dn = NULL; 291 dn = NULL;
279 } 292 }
280 if (err) 293 if (err)
281 goto out_err; 294 goto out_req;
282 if (dn || dentry->d_inode == NULL || S_ISLNK(dentry->d_inode->i_mode)) { 295 if (dn || dentry->d_inode == NULL || S_ISLNK(dentry->d_inode->i_mode)) {
283 /* make vfs retry on splice, ENOENT, or symlink */ 296 /* make vfs retry on splice, ENOENT, or symlink */
284 dout("atomic_open finish_no_open on dn %p\n", dn); 297 dout("atomic_open finish_no_open on dn %p\n", dn);
@@ -286,15 +299,17 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
286 } else { 299 } else {
287 dout("atomic_open finish_open on dn %p\n", dn); 300 dout("atomic_open finish_open on dn %p\n", dn);
288 if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) { 301 if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
289 ceph_init_acl(dentry, dentry->d_inode, dir); 302 ceph_init_inode_acls(dentry->d_inode, &acls);
290 *opened |= FILE_CREATED; 303 *opened |= FILE_CREATED;
291 } 304 }
292 err = finish_open(file, dentry, ceph_open, opened); 305 err = finish_open(file, dentry, ceph_open, opened);
293 } 306 }
294out_err: 307out_req:
295 if (!req->r_err && req->r_target_inode) 308 if (!req->r_err && req->r_target_inode)
296 ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode); 309 ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
297 ceph_mdsc_put_request(req); 310 ceph_mdsc_put_request(req);
311out_acl:
312 ceph_release_acls_info(&acls);
298 dout("atomic_open result=%d\n", err); 313 dout("atomic_open result=%d\n", err);
299 return err; 314 return err;
300} 315}
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index bbb44cdcf1ab..f62a09863ff1 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -733,15 +733,23 @@ extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci);
733extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci); 733extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci);
734extern void __init ceph_xattr_init(void); 734extern void __init ceph_xattr_init(void);
735extern void ceph_xattr_exit(void); 735extern void ceph_xattr_exit(void);
736extern const struct xattr_handler *ceph_xattr_handlers[];
736 737
737/* acl.c */ 738/* acl.c */
738extern const struct xattr_handler *ceph_xattr_handlers[]; 739struct ceph_acls_info {
740 void *default_acl;
741 void *acl;
742 struct ceph_pagelist *pagelist;
743};
739 744
740#ifdef CONFIG_CEPH_FS_POSIX_ACL 745#ifdef CONFIG_CEPH_FS_POSIX_ACL
741 746
742struct posix_acl *ceph_get_acl(struct inode *, int); 747struct posix_acl *ceph_get_acl(struct inode *, int);
743int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type); 748int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type);
744int ceph_init_acl(struct dentry *, struct inode *, struct inode *); 749int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
750 struct ceph_acls_info *info);
751void ceph_init_inode_acls(struct inode *inode, struct ceph_acls_info *info);
752void ceph_release_acls_info(struct ceph_acls_info *info);
745 753
746static inline void ceph_forget_all_cached_acls(struct inode *inode) 754static inline void ceph_forget_all_cached_acls(struct inode *inode)
747{ 755{
@@ -753,12 +761,18 @@ static inline void ceph_forget_all_cached_acls(struct inode *inode)
753#define ceph_get_acl NULL 761#define ceph_get_acl NULL
754#define ceph_set_acl NULL 762#define ceph_set_acl NULL
755 763
756static inline int ceph_init_acl(struct dentry *dentry, struct inode *inode, 764static inline int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
757 struct inode *dir) 765 struct ceph_acls_info *info)
758{ 766{
759 return 0; 767 return 0;
760} 768}
761 769static inline void ceph_init_inode_acls(struct inode *inode,
770 struct ceph_acls_info *info)
771{
772}
773static inline void ceph_release_acls_info(struct ceph_acls_info *info)
774{
775}
762static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode) 776static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode)
763{ 777{
764 return 0; 778 return 0;