aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2013-02-28 20:43:09 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2013-02-28 20:43:09 -0500
commit1cf0209c431fa7790253c532039d53b0773193aa (patch)
tree24310eaaf4c9583988d9098f6c85a4a34970b5b9 /fs/ceph
parentde1a2262b006220dae2561a299a6ea128c46f4fe (diff)
parent83ca14fdd35821554058e5fd4fa7b118ee504a33 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil: "A few groups of patches here. Alex has been hard at work improving the RBD code, layout groundwork for understanding the new formats and doing layering. Most of the infrastructure is now in place for the final bits that will come with the next window. There are a few changes to the data layout. Jim Schutt's patch fixes some non-ideal CRUSH behavior, and a set of patches from me updates the client to speak a newer version of the protocol and implement an improved hashing strategy across storage nodes (when the server side supports it too). A pair of patches from Sam Lang fix the atomicity of open+create operations. Several patches from Yan, Zheng fix various mds/client issues that turned up during multi-mds torture tests. A final set of patches expose file layouts via virtual xattrs, and allow the policies to be set on directories via xattrs as well (avoiding the awkward ioctl interface and providing a consistent interface for both kernel mount and ceph-fuse users)." * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (143 commits) libceph: add support for HASHPSPOOL pool flag libceph: update osd request/reply encoding libceph: calculate placement based on the internal data types ceph: update support for PGID64, PGPOOL3, OSDENC protocol features ceph: update "ceph_features.h" libceph: decode into cpu-native ceph_pg type libceph: rename ceph_pg -> ceph_pg_v1 rbd: pass length, not op for osd completions rbd: move rbd_osd_trivial_callback() libceph: use a do..while loop in con_work() libceph: use a flag to indicate a fault has occurred libceph: separate non-locked fault handling libceph: encapsulate connection backoff libceph: eliminate sparse warnings ceph: eliminate sparse warnings in fs code rbd: eliminate sparse warnings libceph: define connection flag helpers rbd: normalize dout() calls rbd: barriers are hard rbd: ignore zero-length requests ...
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/addr.c38
-rw-r--r--fs/ceph/caps.c32
-rw-r--r--fs/ceph/file.c8
-rw-r--r--fs/ceph/ioctl.c6
-rw-r--r--fs/ceph/mds_client.c33
-rw-r--r--fs/ceph/mds_client.h6
-rw-r--r--fs/ceph/mdsmap.c12
-rw-r--r--fs/ceph/strings.c4
-rw-r--r--fs/ceph/super.c7
-rw-r--r--fs/ceph/super.h10
-rw-r--r--fs/ceph/xattr.c214
11 files changed, 265 insertions, 105 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index d4f81edd9a5d..a60ea977af6f 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -236,16 +236,10 @@ static int ceph_readpage(struct file *filp, struct page *page)
236static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) 236static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
237{ 237{
238 struct inode *inode = req->r_inode; 238 struct inode *inode = req->r_inode;
239 struct ceph_osd_reply_head *replyhead; 239 int rc = req->r_result;
240 int rc, bytes; 240 int bytes = le32_to_cpu(msg->hdr.data_len);
241 int i; 241 int i;
242 242
243 /* parse reply */
244 replyhead = msg->front.iov_base;
245 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
246 rc = le32_to_cpu(replyhead->result);
247 bytes = le32_to_cpu(msg->hdr.data_len);
248
249 dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); 243 dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
250 244
251 /* unlock all pages, zeroing any data we didn't read */ 245 /* unlock all pages, zeroing any data we didn't read */
@@ -315,7 +309,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
315 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 309 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
316 NULL, 0, 310 NULL, 0,
317 ci->i_truncate_seq, ci->i_truncate_size, 311 ci->i_truncate_seq, ci->i_truncate_size,
318 NULL, false, 1, 0); 312 NULL, false, 0);
319 if (IS_ERR(req)) 313 if (IS_ERR(req))
320 return PTR_ERR(req); 314 return PTR_ERR(req);
321 315
@@ -492,8 +486,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
492 &ci->i_layout, snapc, 486 &ci->i_layout, snapc,
493 page_off, len, 487 page_off, len,
494 ci->i_truncate_seq, ci->i_truncate_size, 488 ci->i_truncate_seq, ci->i_truncate_size,
495 &inode->i_mtime, 489 &inode->i_mtime, &page, 1);
496 &page, 1, 0, 0, true);
497 if (err < 0) { 490 if (err < 0) {
498 dout("writepage setting page/mapping error %d %p\n", err, page); 491 dout("writepage setting page/mapping error %d %p\n", err, page);
499 SetPageError(page); 492 SetPageError(page);
@@ -554,27 +547,18 @@ static void writepages_finish(struct ceph_osd_request *req,
554 struct ceph_msg *msg) 547 struct ceph_msg *msg)
555{ 548{
556 struct inode *inode = req->r_inode; 549 struct inode *inode = req->r_inode;
557 struct ceph_osd_reply_head *replyhead;
558 struct ceph_osd_op *op;
559 struct ceph_inode_info *ci = ceph_inode(inode); 550 struct ceph_inode_info *ci = ceph_inode(inode);
560 unsigned wrote; 551 unsigned wrote;
561 struct page *page; 552 struct page *page;
562 int i; 553 int i;
563 struct ceph_snap_context *snapc = req->r_snapc; 554 struct ceph_snap_context *snapc = req->r_snapc;
564 struct address_space *mapping = inode->i_mapping; 555 struct address_space *mapping = inode->i_mapping;
565 __s32 rc = -EIO; 556 int rc = req->r_result;
566 u64 bytes = 0; 557 u64 bytes = le64_to_cpu(req->r_request_ops[0].extent.length);
567 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 558 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
568 long writeback_stat; 559 long writeback_stat;
569 unsigned issued = ceph_caps_issued(ci); 560 unsigned issued = ceph_caps_issued(ci);
570 561
571 /* parse reply */
572 replyhead = msg->front.iov_base;
573 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
574 op = (void *)(replyhead + 1);
575 rc = le32_to_cpu(replyhead->result);
576 bytes = le64_to_cpu(op->extent.length);
577
578 if (rc >= 0) { 562 if (rc >= 0) {
579 /* 563 /*
580 * Assume we wrote the pages we originally sent. The 564 * Assume we wrote the pages we originally sent. The
@@ -741,8 +725,6 @@ retry:
741 struct page *page; 725 struct page *page;
742 int want; 726 int want;
743 u64 offset, len; 727 u64 offset, len;
744 struct ceph_osd_request_head *reqhead;
745 struct ceph_osd_op *op;
746 long writeback_stat; 728 long writeback_stat;
747 729
748 next = 0; 730 next = 0;
@@ -838,7 +820,7 @@ get_more_pages:
838 snapc, do_sync, 820 snapc, do_sync,
839 ci->i_truncate_seq, 821 ci->i_truncate_seq,
840 ci->i_truncate_size, 822 ci->i_truncate_size,
841 &inode->i_mtime, true, 1, 0); 823 &inode->i_mtime, true, 0);
842 824
843 if (IS_ERR(req)) { 825 if (IS_ERR(req)) {
844 rc = PTR_ERR(req); 826 rc = PTR_ERR(req);
@@ -906,10 +888,8 @@ get_more_pages:
906 888
907 /* revise final length, page count */ 889 /* revise final length, page count */
908 req->r_num_pages = locked_pages; 890 req->r_num_pages = locked_pages;
909 reqhead = req->r_request->front.iov_base; 891 req->r_request_ops[0].extent.length = cpu_to_le64(len);
910 op = (void *)(reqhead + 1); 892 req->r_request_ops[0].payload_len = cpu_to_le32(len);
911 op->extent.length = cpu_to_le64(len);
912 op->payload_len = cpu_to_le32(len);
913 req->r_request->hdr.data_len = cpu_to_le32(len); 893 req->r_request->hdr.data_len = cpu_to_le32(len);
914 894
915 rc = ceph_osdc_start_request(&fsc->client->osdc, req, true); 895 rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index ae2be696eb5b..78e2f575247d 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -611,8 +611,16 @@ retry:
611 611
612 if (flags & CEPH_CAP_FLAG_AUTH) 612 if (flags & CEPH_CAP_FLAG_AUTH)
613 ci->i_auth_cap = cap; 613 ci->i_auth_cap = cap;
614 else if (ci->i_auth_cap == cap) 614 else if (ci->i_auth_cap == cap) {
615 ci->i_auth_cap = NULL; 615 ci->i_auth_cap = NULL;
616 spin_lock(&mdsc->cap_dirty_lock);
617 if (!list_empty(&ci->i_dirty_item)) {
618 dout(" moving %p to cap_dirty_migrating\n", inode);
619 list_move(&ci->i_dirty_item,
620 &mdsc->cap_dirty_migrating);
621 }
622 spin_unlock(&mdsc->cap_dirty_lock);
623 }
616 624
617 dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n", 625 dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n",
618 inode, ceph_vinop(inode), cap, ceph_cap_string(issued), 626 inode, ceph_vinop(inode), cap, ceph_cap_string(issued),
@@ -1460,7 +1468,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
1460 struct ceph_mds_client *mdsc = fsc->mdsc; 1468 struct ceph_mds_client *mdsc = fsc->mdsc;
1461 struct inode *inode = &ci->vfs_inode; 1469 struct inode *inode = &ci->vfs_inode;
1462 struct ceph_cap *cap; 1470 struct ceph_cap *cap;
1463 int file_wanted, used; 1471 int file_wanted, used, cap_used;
1464 int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ 1472 int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */
1465 int issued, implemented, want, retain, revoking, flushing = 0; 1473 int issued, implemented, want, retain, revoking, flushing = 0;
1466 int mds = -1; /* keep track of how far we've gone through i_caps list 1474 int mds = -1; /* keep track of how far we've gone through i_caps list
@@ -1563,9 +1571,14 @@ retry_locked:
1563 1571
1564 /* NOTE: no side-effects allowed, until we take s_mutex */ 1572 /* NOTE: no side-effects allowed, until we take s_mutex */
1565 1573
1574 cap_used = used;
1575 if (ci->i_auth_cap && cap != ci->i_auth_cap)
1576 cap_used &= ~ci->i_auth_cap->issued;
1577
1566 revoking = cap->implemented & ~cap->issued; 1578 revoking = cap->implemented & ~cap->issued;
1567 dout(" mds%d cap %p issued %s implemented %s revoking %s\n", 1579 dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n",
1568 cap->mds, cap, ceph_cap_string(cap->issued), 1580 cap->mds, cap, ceph_cap_string(cap->issued),
1581 ceph_cap_string(cap_used),
1569 ceph_cap_string(cap->implemented), 1582 ceph_cap_string(cap->implemented),
1570 ceph_cap_string(revoking)); 1583 ceph_cap_string(revoking));
1571 1584
@@ -1593,7 +1606,7 @@ retry_locked:
1593 } 1606 }
1594 1607
1595 /* completed revocation? going down and there are no caps? */ 1608 /* completed revocation? going down and there are no caps? */
1596 if (revoking && (revoking & used) == 0) { 1609 if (revoking && (revoking & cap_used) == 0) {
1597 dout("completed revocation of %s\n", 1610 dout("completed revocation of %s\n",
1598 ceph_cap_string(cap->implemented & ~cap->issued)); 1611 ceph_cap_string(cap->implemented & ~cap->issued));
1599 goto ack; 1612 goto ack;
@@ -1670,8 +1683,8 @@ ack:
1670 sent++; 1683 sent++;
1671 1684
1672 /* __send_cap drops i_ceph_lock */ 1685 /* __send_cap drops i_ceph_lock */
1673 delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, used, want, 1686 delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
1674 retain, flushing, NULL); 1687 want, retain, flushing, NULL);
1675 goto retry; /* retake i_ceph_lock and restart our cap scan. */ 1688 goto retry; /* retake i_ceph_lock and restart our cap scan. */
1676 } 1689 }
1677 1690
@@ -2417,7 +2430,9 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2417 dout("mds wanted %s -> %s\n", 2430 dout("mds wanted %s -> %s\n",
2418 ceph_cap_string(le32_to_cpu(grant->wanted)), 2431 ceph_cap_string(le32_to_cpu(grant->wanted)),
2419 ceph_cap_string(wanted)); 2432 ceph_cap_string(wanted));
2420 grant->wanted = cpu_to_le32(wanted); 2433 /* imported cap may not have correct mds_wanted */
2434 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT)
2435 check_caps = 1;
2421 } 2436 }
2422 2437
2423 cap->seq = seq; 2438 cap->seq = seq;
@@ -2821,6 +2836,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2821 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, 2836 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
2822 (unsigned)seq); 2837 (unsigned)seq);
2823 2838
2839 if (op == CEPH_CAP_OP_IMPORT)
2840 ceph_add_cap_releases(mdsc, session);
2841
2824 /* lookup ino */ 2842 /* lookup ino */
2825 inode = ceph_find_inode(sb, vino); 2843 inode = ceph_find_inode(sb, vino);
2826 ci = ceph_inode(inode); 2844 ci = ceph_inode(inode);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 11b57c2c8f15..bf338d9b67e3 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -243,6 +243,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
243 err = ceph_mdsc_do_request(mdsc, 243 err = ceph_mdsc_do_request(mdsc,
244 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, 244 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
245 req); 245 req);
246 if (err)
247 goto out_err;
248
246 err = ceph_handle_snapdir(req, dentry, err); 249 err = ceph_handle_snapdir(req, dentry, err);
247 if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) 250 if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
248 err = ceph_handle_notrace_create(dir, dentry); 251 err = ceph_handle_notrace_create(dir, dentry);
@@ -263,6 +266,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
263 err = finish_no_open(file, dn); 266 err = finish_no_open(file, dn);
264 } else { 267 } else {
265 dout("atomic_open finish_open on dn %p\n", dn); 268 dout("atomic_open finish_open on dn %p\n", dn);
269 if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
270 *opened |= FILE_CREATED;
271 }
266 err = finish_open(file, dentry, ceph_open, opened); 272 err = finish_open(file, dentry, ceph_open, opened);
267 } 273 }
268 274
@@ -535,7 +541,7 @@ more:
535 ci->i_snap_realm->cached_context, 541 ci->i_snap_realm->cached_context,
536 do_sync, 542 do_sync,
537 ci->i_truncate_seq, ci->i_truncate_size, 543 ci->i_truncate_seq, ci->i_truncate_size,
538 &mtime, false, 2, page_align); 544 &mtime, false, page_align);
539 if (IS_ERR(req)) 545 if (IS_ERR(req))
540 return PTR_ERR(req); 546 return PTR_ERR(req);
541 547
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index f5ed767806df..4a989345b37b 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -185,7 +185,6 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
185 &ceph_sb_to_client(inode->i_sb)->client->osdc; 185 &ceph_sb_to_client(inode->i_sb)->client->osdc;
186 u64 len = 1, olen; 186 u64 len = 1, olen;
187 u64 tmp; 187 u64 tmp;
188 struct ceph_object_layout ol;
189 struct ceph_pg pgid; 188 struct ceph_pg pgid;
190 int r; 189 int r;
191 190
@@ -194,7 +193,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
194 return -EFAULT; 193 return -EFAULT;
195 194
196 down_read(&osdc->map_sem); 195 down_read(&osdc->map_sem);
197 r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, &len, 196 r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len,
198 &dl.object_no, &dl.object_offset, 197 &dl.object_no, &dl.object_offset,
199 &olen); 198 &olen);
200 if (r < 0) 199 if (r < 0)
@@ -209,10 +208,9 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
209 208
210 snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx", 209 snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx",
211 ceph_ino(inode), dl.object_no); 210 ceph_ino(inode), dl.object_no);
212 ceph_calc_object_layout(&ol, dl.object_name, &ci->i_layout, 211 ceph_calc_object_layout(&pgid, dl.object_name, &ci->i_layout,
213 osdc->osdmap); 212 osdc->osdmap);
214 213
215 pgid = ol.ol_pgid;
216 dl.osd = ceph_calc_pg_primary(osdc->osdmap, pgid); 214 dl.osd = ceph_calc_pg_primary(osdc->osdmap, pgid);
217 if (dl.osd >= 0) { 215 if (dl.osd >= 0) {
218 struct ceph_entity_addr *a = 216 struct ceph_entity_addr *a =
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 7a3dfe0a9a80..442880d099c9 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -233,6 +233,30 @@ bad:
233} 233}
234 234
235/* 235/*
236 * parse create results
237 */
238static int parse_reply_info_create(void **p, void *end,
239 struct ceph_mds_reply_info_parsed *info,
240 int features)
241{
242 if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
243 if (*p == end) {
244 info->has_create_ino = false;
245 } else {
246 info->has_create_ino = true;
247 info->ino = ceph_decode_64(p);
248 }
249 }
250
251 if (unlikely(*p != end))
252 goto bad;
253 return 0;
254
255bad:
256 return -EIO;
257}
258
259/*
236 * parse extra results 260 * parse extra results
237 */ 261 */
238static int parse_reply_info_extra(void **p, void *end, 262static int parse_reply_info_extra(void **p, void *end,
@@ -241,8 +265,12 @@ static int parse_reply_info_extra(void **p, void *end,
241{ 265{
242 if (info->head->op == CEPH_MDS_OP_GETFILELOCK) 266 if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
243 return parse_reply_info_filelock(p, end, info, features); 267 return parse_reply_info_filelock(p, end, info, features);
244 else 268 else if (info->head->op == CEPH_MDS_OP_READDIR)
245 return parse_reply_info_dir(p, end, info, features); 269 return parse_reply_info_dir(p, end, info, features);
270 else if (info->head->op == CEPH_MDS_OP_CREATE)
271 return parse_reply_info_create(p, end, info, features);
272 else
273 return -EIO;
246} 274}
247 275
248/* 276/*
@@ -2170,7 +2198,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2170 mutex_lock(&req->r_fill_mutex); 2198 mutex_lock(&req->r_fill_mutex);
2171 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); 2199 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2172 if (err == 0) { 2200 if (err == 0) {
2173 if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK && 2201 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2202 req->r_op == CEPH_MDS_OP_LSSNAP) &&
2174 rinfo->dir_nr) 2203 rinfo->dir_nr)
2175 ceph_readdir_prepopulate(req, req->r_session); 2204 ceph_readdir_prepopulate(req, req->r_session);
2176 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 2205 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index ff4188bf6199..c2a19fbbe517 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -74,6 +74,12 @@ struct ceph_mds_reply_info_parsed {
74 struct ceph_mds_reply_info_in *dir_in; 74 struct ceph_mds_reply_info_in *dir_in;
75 u8 dir_complete, dir_end; 75 u8 dir_complete, dir_end;
76 }; 76 };
77
78 /* for create results */
79 struct {
80 bool has_create_ino;
81 u64 ino;
82 };
77 }; 83 };
78 84
79 /* encoded blob describing snapshot contexts for certain 85 /* encoded blob describing snapshot contexts for certain
diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c
index 73b7d44e8a35..0d3c9240c61b 100644
--- a/fs/ceph/mdsmap.c
+++ b/fs/ceph/mdsmap.c
@@ -59,6 +59,10 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
59 return ERR_PTR(-ENOMEM); 59 return ERR_PTR(-ENOMEM);
60 60
61 ceph_decode_16_safe(p, end, version, bad); 61 ceph_decode_16_safe(p, end, version, bad);
62 if (version > 3) {
63 pr_warning("got mdsmap version %d > 3, failing", version);
64 goto bad;
65 }
62 66
63 ceph_decode_need(p, end, 8*sizeof(u32) + sizeof(u64), bad); 67 ceph_decode_need(p, end, 8*sizeof(u32) + sizeof(u64), bad);
64 m->m_epoch = ceph_decode_32(p); 68 m->m_epoch = ceph_decode_32(p);
@@ -144,13 +148,13 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
144 /* pg_pools */ 148 /* pg_pools */
145 ceph_decode_32_safe(p, end, n, bad); 149 ceph_decode_32_safe(p, end, n, bad);
146 m->m_num_data_pg_pools = n; 150 m->m_num_data_pg_pools = n;
147 m->m_data_pg_pools = kcalloc(n, sizeof(u32), GFP_NOFS); 151 m->m_data_pg_pools = kcalloc(n, sizeof(u64), GFP_NOFS);
148 if (!m->m_data_pg_pools) 152 if (!m->m_data_pg_pools)
149 goto badmem; 153 goto badmem;
150 ceph_decode_need(p, end, sizeof(u32)*(n+1), bad); 154 ceph_decode_need(p, end, sizeof(u64)*(n+1), bad);
151 for (i = 0; i < n; i++) 155 for (i = 0; i < n; i++)
152 m->m_data_pg_pools[i] = ceph_decode_32(p); 156 m->m_data_pg_pools[i] = ceph_decode_64(p);
153 m->m_cas_pg_pool = ceph_decode_32(p); 157 m->m_cas_pg_pool = ceph_decode_64(p);
154 158
155 /* ok, we don't care about the rest. */ 159 /* ok, we don't care about the rest. */
156 dout("mdsmap_decode success epoch %u\n", m->m_epoch); 160 dout("mdsmap_decode success epoch %u\n", m->m_epoch);
diff --git a/fs/ceph/strings.c b/fs/ceph/strings.c
index cd5097d7c804..89fa4a940a0f 100644
--- a/fs/ceph/strings.c
+++ b/fs/ceph/strings.c
@@ -15,6 +15,7 @@ const char *ceph_mds_state_name(int s)
15 case CEPH_MDS_STATE_BOOT: return "up:boot"; 15 case CEPH_MDS_STATE_BOOT: return "up:boot";
16 case CEPH_MDS_STATE_STANDBY: return "up:standby"; 16 case CEPH_MDS_STATE_STANDBY: return "up:standby";
17 case CEPH_MDS_STATE_STANDBY_REPLAY: return "up:standby-replay"; 17 case CEPH_MDS_STATE_STANDBY_REPLAY: return "up:standby-replay";
18 case CEPH_MDS_STATE_REPLAYONCE: return "up:oneshot-replay";
18 case CEPH_MDS_STATE_CREATING: return "up:creating"; 19 case CEPH_MDS_STATE_CREATING: return "up:creating";
19 case CEPH_MDS_STATE_STARTING: return "up:starting"; 20 case CEPH_MDS_STATE_STARTING: return "up:starting";
20 /* up and in */ 21 /* up and in */
@@ -50,10 +51,13 @@ const char *ceph_mds_op_name(int op)
50 case CEPH_MDS_OP_LOOKUP: return "lookup"; 51 case CEPH_MDS_OP_LOOKUP: return "lookup";
51 case CEPH_MDS_OP_LOOKUPHASH: return "lookuphash"; 52 case CEPH_MDS_OP_LOOKUPHASH: return "lookuphash";
52 case CEPH_MDS_OP_LOOKUPPARENT: return "lookupparent"; 53 case CEPH_MDS_OP_LOOKUPPARENT: return "lookupparent";
54 case CEPH_MDS_OP_LOOKUPINO: return "lookupino";
53 case CEPH_MDS_OP_GETATTR: return "getattr"; 55 case CEPH_MDS_OP_GETATTR: return "getattr";
54 case CEPH_MDS_OP_SETXATTR: return "setxattr"; 56 case CEPH_MDS_OP_SETXATTR: return "setxattr";
55 case CEPH_MDS_OP_SETATTR: return "setattr"; 57 case CEPH_MDS_OP_SETATTR: return "setattr";
56 case CEPH_MDS_OP_RMXATTR: return "rmxattr"; 58 case CEPH_MDS_OP_RMXATTR: return "rmxattr";
59 case CEPH_MDS_OP_SETLAYOUT: return "setlayou";
60 case CEPH_MDS_OP_SETDIRLAYOUT: return "setdirlayout";
57 case CEPH_MDS_OP_READDIR: return "readdir"; 61 case CEPH_MDS_OP_READDIR: return "readdir";
58 case CEPH_MDS_OP_MKNOD: return "mknod"; 62 case CEPH_MDS_OP_MKNOD: return "mknod";
59 case CEPH_MDS_OP_LINK: return "link"; 63 case CEPH_MDS_OP_LINK: return "link";
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index e86aa9948124..9fe17c6c2876 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -71,8 +71,14 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
71 /* 71 /*
72 * express utilization in terms of large blocks to avoid 72 * express utilization in terms of large blocks to avoid
73 * overflow on 32-bit machines. 73 * overflow on 32-bit machines.
74 *
75 * NOTE: for the time being, we make bsize == frsize to humor
76 * not-yet-ancient versions of glibc that are broken.
77 * Someday, we will probably want to report a real block
78 * size... whatever that may mean for a network file system!
74 */ 79 */
75 buf->f_bsize = 1 << CEPH_BLOCK_SHIFT; 80 buf->f_bsize = 1 << CEPH_BLOCK_SHIFT;
81 buf->f_frsize = 1 << CEPH_BLOCK_SHIFT;
76 buf->f_blocks = le64_to_cpu(st.kb) >> (CEPH_BLOCK_SHIFT-10); 82 buf->f_blocks = le64_to_cpu(st.kb) >> (CEPH_BLOCK_SHIFT-10);
77 buf->f_bfree = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10); 83 buf->f_bfree = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10);
78 buf->f_bavail = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10); 84 buf->f_bavail = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10);
@@ -80,7 +86,6 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
80 buf->f_files = le64_to_cpu(st.num_objects); 86 buf->f_files = le64_to_cpu(st.num_objects);
81 buf->f_ffree = -1; 87 buf->f_ffree = -1;
82 buf->f_namelen = NAME_MAX; 88 buf->f_namelen = NAME_MAX;
83 buf->f_frsize = PAGE_CACHE_SIZE;
84 89
85 /* leave fsid little-endian, regardless of host endianness */ 90 /* leave fsid little-endian, regardless of host endianness */
86 fsid = *(u64 *)(&monmap->fsid) ^ *((u64 *)&monmap->fsid + 1); 91 fsid = *(u64 *)(&monmap->fsid) ^ *((u64 *)&monmap->fsid + 1);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index f053bbd1886f..c7b309723dcc 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -21,7 +21,7 @@
21 21
22/* large granularity for statfs utilization stats to facilitate 22/* large granularity for statfs utilization stats to facilitate
23 * large volume sizes on 32-bit machines. */ 23 * large volume sizes on 32-bit machines. */
24#define CEPH_BLOCK_SHIFT 20 /* 1 MB */ 24#define CEPH_BLOCK_SHIFT 22 /* 4 MB */
25#define CEPH_BLOCK (1 << CEPH_BLOCK_SHIFT) 25#define CEPH_BLOCK (1 << CEPH_BLOCK_SHIFT)
26 26
27#define CEPH_MOUNT_OPT_DIRSTAT (1<<4) /* `cat dirname` for stats */ 27#define CEPH_MOUNT_OPT_DIRSTAT (1<<4) /* `cat dirname` for stats */
@@ -798,13 +798,7 @@ extern int ceph_mmap(struct file *file, struct vm_area_struct *vma);
798/* file.c */ 798/* file.c */
799extern const struct file_operations ceph_file_fops; 799extern const struct file_operations ceph_file_fops;
800extern const struct address_space_operations ceph_aops; 800extern const struct address_space_operations ceph_aops;
801extern int ceph_copy_to_page_vector(struct page **pages, 801
802 const char *data,
803 loff_t off, size_t len);
804extern int ceph_copy_from_page_vector(struct page **pages,
805 char *data,
806 loff_t off, size_t len);
807extern struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags);
808extern int ceph_open(struct inode *inode, struct file *file); 802extern int ceph_open(struct inode *inode, struct file *file);
809extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry, 803extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
810 struct file *file, unsigned flags, umode_t mode, 804 struct file *file, unsigned flags, umode_t mode,
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 2c2ae5be9902..9b6b2b6dd164 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -29,9 +29,94 @@ struct ceph_vxattr {
29 size_t name_size; /* strlen(name) + 1 (for '\0') */ 29 size_t name_size; /* strlen(name) + 1 (for '\0') */
30 size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val, 30 size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val,
31 size_t size); 31 size_t size);
32 bool readonly; 32 bool readonly, hidden;
33 bool (*exists_cb)(struct ceph_inode_info *ci);
33}; 34};
34 35
36/* layouts */
37
38static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci)
39{
40 size_t s;
41 char *p = (char *)&ci->i_layout;
42
43 for (s = 0; s < sizeof(ci->i_layout); s++, p++)
44 if (*p)
45 return true;
46 return false;
47}
48
49static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
50 size_t size)
51{
52 int ret;
53 struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
54 struct ceph_osd_client *osdc = &fsc->client->osdc;
55 s64 pool = ceph_file_layout_pg_pool(ci->i_layout);
56 const char *pool_name;
57
58 dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode);
59 down_read(&osdc->map_sem);
60 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
61 if (pool_name)
62 ret = snprintf(val, size,
63 "stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%s",
64 (unsigned long long)ceph_file_layout_su(ci->i_layout),
65 (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
66 (unsigned long long)ceph_file_layout_object_size(ci->i_layout),
67 pool_name);
68 else
69 ret = snprintf(val, size,
70 "stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%lld",
71 (unsigned long long)ceph_file_layout_su(ci->i_layout),
72 (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
73 (unsigned long long)ceph_file_layout_object_size(ci->i_layout),
74 (unsigned long long)pool);
75
76 up_read(&osdc->map_sem);
77 return ret;
78}
79
80static size_t ceph_vxattrcb_layout_stripe_unit(struct ceph_inode_info *ci,
81 char *val, size_t size)
82{
83 return snprintf(val, size, "%lld",
84 (unsigned long long)ceph_file_layout_su(ci->i_layout));
85}
86
87static size_t ceph_vxattrcb_layout_stripe_count(struct ceph_inode_info *ci,
88 char *val, size_t size)
89{
90 return snprintf(val, size, "%lld",
91 (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout));
92}
93
94static size_t ceph_vxattrcb_layout_object_size(struct ceph_inode_info *ci,
95 char *val, size_t size)
96{
97 return snprintf(val, size, "%lld",
98 (unsigned long long)ceph_file_layout_object_size(ci->i_layout));
99}
100
101static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
102 char *val, size_t size)
103{
104 int ret;
105 struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
106 struct ceph_osd_client *osdc = &fsc->client->osdc;
107 s64 pool = ceph_file_layout_pg_pool(ci->i_layout);
108 const char *pool_name;
109
110 down_read(&osdc->map_sem);
111 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
112 if (pool_name)
113 ret = snprintf(val, size, "%s", pool_name);
114 else
115 ret = snprintf(val, size, "%lld", (unsigned long long)pool);
116 up_read(&osdc->map_sem);
117 return ret;
118}
119
35/* directories */ 120/* directories */
36 121
37static size_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val, 122static size_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val,
@@ -83,17 +168,43 @@ static size_t ceph_vxattrcb_dir_rctime(struct ceph_inode_info *ci, char *val,
83 (long)ci->i_rctime.tv_nsec); 168 (long)ci->i_rctime.tv_nsec);
84} 169}
85 170
86#define CEPH_XATTR_NAME(_type, _name) XATTR_CEPH_PREFIX #_type "." #_name
87 171
88#define XATTR_NAME_CEPH(_type, _name) \ 172#define CEPH_XATTR_NAME(_type, _name) XATTR_CEPH_PREFIX #_type "." #_name
89 { \ 173#define CEPH_XATTR_NAME2(_type, _name, _name2) \
90 .name = CEPH_XATTR_NAME(_type, _name), \ 174 XATTR_CEPH_PREFIX #_type "." #_name "." #_name2
91 .name_size = sizeof (CEPH_XATTR_NAME(_type, _name)), \ 175
92 .getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \ 176#define XATTR_NAME_CEPH(_type, _name) \
93 .readonly = true, \ 177 { \
94 } 178 .name = CEPH_XATTR_NAME(_type, _name), \
179 .name_size = sizeof (CEPH_XATTR_NAME(_type, _name)), \
180 .getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \
181 .readonly = true, \
182 .hidden = false, \
183 .exists_cb = NULL, \
184 }
185#define XATTR_LAYOUT_FIELD(_type, _name, _field) \
186 { \
187 .name = CEPH_XATTR_NAME2(_type, _name, _field), \
188 .name_size = sizeof (CEPH_XATTR_NAME2(_type, _name, _field)), \
189 .getxattr_cb = ceph_vxattrcb_ ## _name ## _ ## _field, \
190 .readonly = false, \
191 .hidden = true, \
192 .exists_cb = ceph_vxattrcb_layout_exists, \
193 }
95 194
96static struct ceph_vxattr ceph_dir_vxattrs[] = { 195static struct ceph_vxattr ceph_dir_vxattrs[] = {
196 {
197 .name = "ceph.dir.layout",
198 .name_size = sizeof("ceph.dir.layout"),
199 .getxattr_cb = ceph_vxattrcb_layout,
200 .readonly = false,
201 .hidden = false,
202 .exists_cb = ceph_vxattrcb_layout_exists,
203 },
204 XATTR_LAYOUT_FIELD(dir, layout, stripe_unit),
205 XATTR_LAYOUT_FIELD(dir, layout, stripe_count),
206 XATTR_LAYOUT_FIELD(dir, layout, object_size),
207 XATTR_LAYOUT_FIELD(dir, layout, pool),
97 XATTR_NAME_CEPH(dir, entries), 208 XATTR_NAME_CEPH(dir, entries),
98 XATTR_NAME_CEPH(dir, files), 209 XATTR_NAME_CEPH(dir, files),
99 XATTR_NAME_CEPH(dir, subdirs), 210 XATTR_NAME_CEPH(dir, subdirs),
@@ -102,35 +213,26 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = {
102 XATTR_NAME_CEPH(dir, rsubdirs), 213 XATTR_NAME_CEPH(dir, rsubdirs),
103 XATTR_NAME_CEPH(dir, rbytes), 214 XATTR_NAME_CEPH(dir, rbytes),
104 XATTR_NAME_CEPH(dir, rctime), 215 XATTR_NAME_CEPH(dir, rctime),
105 { 0 } /* Required table terminator */ 216 { .name = NULL, 0 } /* Required table terminator */
106}; 217};
107static size_t ceph_dir_vxattrs_name_size; /* total size of all names */ 218static size_t ceph_dir_vxattrs_name_size; /* total size of all names */
108 219
109/* files */ 220/* files */
110 221
111static size_t ceph_vxattrcb_file_layout(struct ceph_inode_info *ci, char *val,
112 size_t size)
113{
114 int ret;
115
116 ret = snprintf(val, size,
117 "chunk_bytes=%lld\nstripe_count=%lld\nobject_size=%lld\n",
118 (unsigned long long)ceph_file_layout_su(ci->i_layout),
119 (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
120 (unsigned long long)ceph_file_layout_object_size(ci->i_layout));
121 return ret;
122}
123
124static struct ceph_vxattr ceph_file_vxattrs[] = { 222static struct ceph_vxattr ceph_file_vxattrs[] = {
125 XATTR_NAME_CEPH(file, layout),
126 /* The following extended attribute name is deprecated */
127 { 223 {
128 .name = XATTR_CEPH_PREFIX "layout", 224 .name = "ceph.file.layout",
129 .name_size = sizeof (XATTR_CEPH_PREFIX "layout"), 225 .name_size = sizeof("ceph.file.layout"),
130 .getxattr_cb = ceph_vxattrcb_file_layout, 226 .getxattr_cb = ceph_vxattrcb_layout,
131 .readonly = true, 227 .readonly = false,
228 .hidden = false,
229 .exists_cb = ceph_vxattrcb_layout_exists,
132 }, 230 },
133 { 0 } /* Required table terminator */ 231 XATTR_LAYOUT_FIELD(file, layout, stripe_unit),
232 XATTR_LAYOUT_FIELD(file, layout, stripe_count),
233 XATTR_LAYOUT_FIELD(file, layout, object_size),
234 XATTR_LAYOUT_FIELD(file, layout, pool),
235 { .name = NULL, 0 } /* Required table terminator */
134}; 236};
135static size_t ceph_file_vxattrs_name_size; /* total size of all names */ 237static size_t ceph_file_vxattrs_name_size; /* total size of all names */
136 238
@@ -164,7 +266,8 @@ static size_t __init vxattrs_name_size(struct ceph_vxattr *vxattrs)
164 size_t size = 0; 266 size_t size = 0;
165 267
166 for (vxattr = vxattrs; vxattr->name; vxattr++) 268 for (vxattr = vxattrs; vxattr->name; vxattr++)
167 size += vxattr->name_size; 269 if (!vxattr->hidden)
270 size += vxattr->name_size;
168 271
169 return size; 272 return size;
170} 273}
@@ -572,13 +675,17 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
572 if (!ceph_is_valid_xattr(name)) 675 if (!ceph_is_valid_xattr(name))
573 return -ENODATA; 676 return -ENODATA;
574 677
575 /* let's see if a virtual xattr was requested */
576 vxattr = ceph_match_vxattr(inode, name);
577
578 spin_lock(&ci->i_ceph_lock); 678 spin_lock(&ci->i_ceph_lock);
579 dout("getxattr %p ver=%lld index_ver=%lld\n", inode, 679 dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
580 ci->i_xattrs.version, ci->i_xattrs.index_version); 680 ci->i_xattrs.version, ci->i_xattrs.index_version);
581 681
682 /* let's see if a virtual xattr was requested */
683 vxattr = ceph_match_vxattr(inode, name);
684 if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) {
685 err = vxattr->getxattr_cb(ci, value, size);
686 goto out;
687 }
688
582 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) && 689 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) &&
583 (ci->i_xattrs.index_version >= ci->i_xattrs.version)) { 690 (ci->i_xattrs.index_version >= ci->i_xattrs.version)) {
584 goto get_xattr; 691 goto get_xattr;
@@ -592,11 +699,6 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
592 699
593 spin_lock(&ci->i_ceph_lock); 700 spin_lock(&ci->i_ceph_lock);
594 701
595 if (vxattr && vxattr->readonly) {
596 err = vxattr->getxattr_cb(ci, value, size);
597 goto out;
598 }
599
600 err = __build_xattrs(inode); 702 err = __build_xattrs(inode);
601 if (err < 0) 703 if (err < 0)
602 goto out; 704 goto out;
@@ -604,11 +706,8 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
604get_xattr: 706get_xattr:
605 err = -ENODATA; /* == ENOATTR */ 707 err = -ENODATA; /* == ENOATTR */
606 xattr = __get_xattr(ci, name); 708 xattr = __get_xattr(ci, name);
607 if (!xattr) { 709 if (!xattr)
608 if (vxattr)
609 err = vxattr->getxattr_cb(ci, value, size);
610 goto out; 710 goto out;
611 }
612 711
613 err = -ERANGE; 712 err = -ERANGE;
614 if (size && size < xattr->val_len) 713 if (size && size < xattr->val_len)
@@ -664,23 +763,30 @@ list_xattr:
664 vir_namelen = ceph_vxattrs_name_size(vxattrs); 763 vir_namelen = ceph_vxattrs_name_size(vxattrs);
665 764
666 /* adding 1 byte per each variable due to the null termination */ 765 /* adding 1 byte per each variable due to the null termination */
667 namelen = vir_namelen + ci->i_xattrs.names_size + ci->i_xattrs.count; 766 namelen = ci->i_xattrs.names_size + ci->i_xattrs.count;
668 err = -ERANGE; 767 err = -ERANGE;
669 if (size && namelen > size) 768 if (size && vir_namelen + namelen > size)
670 goto out; 769 goto out;
671 770
672 err = namelen; 771 err = namelen + vir_namelen;
673 if (size == 0) 772 if (size == 0)
674 goto out; 773 goto out;
675 774
676 names = __copy_xattr_names(ci, names); 775 names = __copy_xattr_names(ci, names);
677 776
678 /* virtual xattr names, too */ 777 /* virtual xattr names, too */
679 if (vxattrs) 778 err = namelen;
779 if (vxattrs) {
680 for (i = 0; vxattrs[i].name; i++) { 780 for (i = 0; vxattrs[i].name; i++) {
681 len = sprintf(names, "%s", vxattrs[i].name); 781 if (!vxattrs[i].hidden &&
682 names += len + 1; 782 !(vxattrs[i].exists_cb &&
783 !vxattrs[i].exists_cb(ci))) {
784 len = sprintf(names, "%s", vxattrs[i].name);
785 names += len + 1;
786 err += len + 1;
787 }
683 } 788 }
789 }
684 790
685out: 791out:
686 spin_unlock(&ci->i_ceph_lock); 792 spin_unlock(&ci->i_ceph_lock);
@@ -782,6 +888,10 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
782 if (vxattr && vxattr->readonly) 888 if (vxattr && vxattr->readonly)
783 return -EOPNOTSUPP; 889 return -EOPNOTSUPP;
784 890
891 /* pass any unhandled ceph.* xattrs through to the MDS */
892 if (!strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN))
893 goto do_sync_unlocked;
894
785 /* preallocate memory for xattr name, value, index node */ 895 /* preallocate memory for xattr name, value, index node */
786 err = -ENOMEM; 896 err = -ENOMEM;
787 newname = kmemdup(name, name_len + 1, GFP_NOFS); 897 newname = kmemdup(name, name_len + 1, GFP_NOFS);
@@ -838,6 +948,7 @@ retry:
838 948
839do_sync: 949do_sync:
840 spin_unlock(&ci->i_ceph_lock); 950 spin_unlock(&ci->i_ceph_lock);
951do_sync_unlocked:
841 err = ceph_sync_setxattr(dentry, name, value, size, flags); 952 err = ceph_sync_setxattr(dentry, name, value, size, flags);
842out: 953out:
843 kfree(newname); 954 kfree(newname);
@@ -892,6 +1003,10 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
892 if (vxattr && vxattr->readonly) 1003 if (vxattr && vxattr->readonly)
893 return -EOPNOTSUPP; 1004 return -EOPNOTSUPP;
894 1005
1006 /* pass any unhandled ceph.* xattrs through to the MDS */
1007 if (!strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN))
1008 goto do_sync_unlocked;
1009
895 err = -ENOMEM; 1010 err = -ENOMEM;
896 spin_lock(&ci->i_ceph_lock); 1011 spin_lock(&ci->i_ceph_lock);
897retry: 1012retry:
@@ -931,6 +1046,7 @@ retry:
931 return err; 1046 return err;
932do_sync: 1047do_sync:
933 spin_unlock(&ci->i_ceph_lock); 1048 spin_unlock(&ci->i_ceph_lock);
1049do_sync_unlocked:
934 err = ceph_send_removexattr(dentry, name); 1050 err = ceph_send_removexattr(dentry, name);
935out: 1051out:
936 return err; 1052 return err;