aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2016-05-26 17:10:32 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2016-05-26 17:10:32 -0400
commita10c38a4f385f5d7c173a263ff6bb2d36021b3bb (patch)
tree3cbaa916940b36a9fdb27c8a231e1488fbc352d6 /fs
parentea8ea737c46cffa5d0ee74309f81e55a7e5e9c2a (diff)
parente536030934aebf049fe6aaebc58dd37aeee21840 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil: "This changeset has a few main parts: - Ilya has finished a huge refactoring effort to sync up the client-side logic in libceph with the user-space client code, which has evolved significantly over the last couple years, with lots of additional behaviors (e.g., how requests are handled when cluster is full and transitions from full to non-full). This structure of the code is more closely aligned with userspace now such that it will be much easier to maintain going forward when behavior changes take place. There are some locking improvements bundled in as well. - Zheng adds multi-filesystem support (multiple namespaces within the same Ceph cluster) - Zheng has changed the readdir offsets and directory enumeration so that dentry offsets are hash-based and therefore stable across directory fragmentation events on the MDS. - Zheng has a smorgasbord of bug fixes across fs/ceph" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (71 commits) ceph: fix wake_up_session_cb() ceph: don't use truncate_pagecache() to invalidate read cache ceph: SetPageError() for writeback pages if writepages fails ceph: handle interrupted ceph_writepage() ceph: make ceph_update_writeable_page() uninterruptible libceph: make ceph_osdc_wait_request() uninterruptible ceph: handle -EAGAIN returned by ceph_update_writeable_page() ceph: make fault/page_mkwrite return VM_FAULT_OOM for -ENOMEM ceph: block non-fatal signals for fault/page_mkwrite ceph: make logical calculation functions return bool ceph: tolerate bad i_size for symlink inode ceph: improve fragtree change detection ceph: keep leaf frag when updating fragtree ceph: fix dir_auth check in ceph_fill_dirfrag() ceph: don't assume frag tree splits in mds reply are sorted ceph: fix inode reference leak ceph: using hash value to compose dentry offset ceph: don't forbid marking directory complete after forward seek ceph: record 'offset' for each entry of readdir result ceph: define 'end/complete' in readdir reply as bit flags ...
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/addr.c214
-rw-r--r--fs/ceph/cache.c2
-rw-r--r--fs/ceph/caps.c51
-rw-r--r--fs/ceph/debugfs.c2
-rw-r--r--fs/ceph/dir.c376
-rw-r--r--fs/ceph/file.c89
-rw-r--r--fs/ceph/inode.c159
-rw-r--r--fs/ceph/ioctl.c14
-rw-r--r--fs/ceph/mds_client.c140
-rw-r--r--fs/ceph/mds_client.h17
-rw-r--r--fs/ceph/mdsmap.c43
-rw-r--r--fs/ceph/super.c47
-rw-r--r--fs/ceph/super.h12
-rw-r--r--fs/ceph/xattr.c25
14 files changed, 773 insertions, 418 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 43098cd9602b..eeb71e5de27a 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -257,12 +257,12 @@ static int ceph_readpage(struct file *filp, struct page *page)
257/* 257/*
258 * Finish an async read(ahead) op. 258 * Finish an async read(ahead) op.
259 */ 259 */
260static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) 260static void finish_read(struct ceph_osd_request *req)
261{ 261{
262 struct inode *inode = req->r_inode; 262 struct inode *inode = req->r_inode;
263 struct ceph_osd_data *osd_data; 263 struct ceph_osd_data *osd_data;
264 int rc = req->r_result; 264 int rc = req->r_result <= 0 ? req->r_result : 0;
265 int bytes = le32_to_cpu(msg->hdr.data_len); 265 int bytes = req->r_result >= 0 ? req->r_result : 0;
266 int num_pages; 266 int num_pages;
267 int i; 267 int i;
268 268
@@ -376,8 +376,6 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
376 req->r_callback = finish_read; 376 req->r_callback = finish_read;
377 req->r_inode = inode; 377 req->r_inode = inode;
378 378
379 ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
380
381 dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len); 379 dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
382 ret = ceph_osdc_start_request(osdc, req, false); 380 ret = ceph_osdc_start_request(osdc, req, false);
383 if (ret < 0) 381 if (ret < 0)
@@ -546,11 +544,21 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
546 truncate_seq, truncate_size, 544 truncate_seq, truncate_size,
547 &inode->i_mtime, &page, 1); 545 &inode->i_mtime, &page, 1);
548 if (err < 0) { 546 if (err < 0) {
549 dout("writepage setting page/mapping error %d %p\n", err, page); 547 struct writeback_control tmp_wbc;
548 if (!wbc)
549 wbc = &tmp_wbc;
550 if (err == -ERESTARTSYS) {
551 /* killed by SIGKILL */
552 dout("writepage interrupted page %p\n", page);
553 redirty_page_for_writepage(wbc, page);
554 end_page_writeback(page);
555 goto out;
556 }
557 dout("writepage setting page/mapping error %d %p\n",
558 err, page);
550 SetPageError(page); 559 SetPageError(page);
551 mapping_set_error(&inode->i_data, err); 560 mapping_set_error(&inode->i_data, err);
552 if (wbc) 561 wbc->pages_skipped++;
553 wbc->pages_skipped++;
554 } else { 562 } else {
555 dout("writepage cleaned page %p\n", page); 563 dout("writepage cleaned page %p\n", page);
556 err = 0; /* vfs expects us to return 0 */ 564 err = 0; /* vfs expects us to return 0 */
@@ -571,12 +579,16 @@ static int ceph_writepage(struct page *page, struct writeback_control *wbc)
571 BUG_ON(!inode); 579 BUG_ON(!inode);
572 ihold(inode); 580 ihold(inode);
573 err = writepage_nounlock(page, wbc); 581 err = writepage_nounlock(page, wbc);
582 if (err == -ERESTARTSYS) {
583 /* direct memory reclaimer was killed by SIGKILL. return 0
584 * to prevent caller from setting mapping/page error */
585 err = 0;
586 }
574 unlock_page(page); 587 unlock_page(page);
575 iput(inode); 588 iput(inode);
576 return err; 589 return err;
577} 590}
578 591
579
580/* 592/*
581 * lame release_pages helper. release_pages() isn't exported to 593 * lame release_pages helper. release_pages() isn't exported to
582 * modules. 594 * modules.
@@ -600,8 +612,7 @@ static void ceph_release_pages(struct page **pages, int num)
600 * If we get an error, set the mapping error bit, but not the individual 612 * If we get an error, set the mapping error bit, but not the individual
601 * page error bits. 613 * page error bits.
602 */ 614 */
603static void writepages_finish(struct ceph_osd_request *req, 615static void writepages_finish(struct ceph_osd_request *req)
604 struct ceph_msg *msg)
605{ 616{
606 struct inode *inode = req->r_inode; 617 struct inode *inode = req->r_inode;
607 struct ceph_inode_info *ci = ceph_inode(inode); 618 struct ceph_inode_info *ci = ceph_inode(inode);
@@ -615,7 +626,6 @@ static void writepages_finish(struct ceph_osd_request *req,
615 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 626 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
616 bool remove_page; 627 bool remove_page;
617 628
618
619 dout("writepages_finish %p rc %d\n", inode, rc); 629 dout("writepages_finish %p rc %d\n", inode, rc);
620 if (rc < 0) 630 if (rc < 0)
621 mapping_set_error(mapping, rc); 631 mapping_set_error(mapping, rc);
@@ -650,6 +660,9 @@ static void writepages_finish(struct ceph_osd_request *req,
650 clear_bdi_congested(&fsc->backing_dev_info, 660 clear_bdi_congested(&fsc->backing_dev_info,
651 BLK_RW_ASYNC); 661 BLK_RW_ASYNC);
652 662
663 if (rc < 0)
664 SetPageError(page);
665
653 ceph_put_snap_context(page_snap_context(page)); 666 ceph_put_snap_context(page_snap_context(page));
654 page->private = 0; 667 page->private = 0;
655 ClearPagePrivate(page); 668 ClearPagePrivate(page);
@@ -718,8 +731,11 @@ static int ceph_writepages_start(struct address_space *mapping,
718 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); 731 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
719 732
720 if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 733 if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
721 pr_warn("writepage_start %p on forced umount\n", inode); 734 if (ci->i_wrbuffer_ref > 0) {
722 truncate_pagecache(inode, 0); 735 pr_warn_ratelimited(
736 "writepage_start %p %lld forced umount\n",
737 inode, ceph_ino(inode));
738 }
723 mapping_set_error(mapping, -EIO); 739 mapping_set_error(mapping, -EIO);
724 return -EIO; /* we're in a forced umount, don't write! */ 740 return -EIO; /* we're in a forced umount, don't write! */
725 } 741 }
@@ -1063,10 +1079,7 @@ new_request:
1063 pages = NULL; 1079 pages = NULL;
1064 } 1080 }
1065 1081
1066 vino = ceph_vino(inode); 1082 req->r_mtime = inode->i_mtime;
1067 ceph_osdc_build_request(req, offset, snapc, vino.snap,
1068 &inode->i_mtime);
1069
1070 rc = ceph_osdc_start_request(&fsc->client->osdc, req, true); 1083 rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
1071 BUG_ON(rc); 1084 BUG_ON(rc);
1072 req = NULL; 1085 req = NULL;
@@ -1099,8 +1112,7 @@ release_pvec_pages:
1099 mapping->writeback_index = index; 1112 mapping->writeback_index = index;
1100 1113
1101out: 1114out:
1102 if (req) 1115 ceph_osdc_put_request(req);
1103 ceph_osdc_put_request(req);
1104 ceph_put_snap_context(snapc); 1116 ceph_put_snap_context(snapc);
1105 dout("writepages done, rc = %d\n", rc); 1117 dout("writepages done, rc = %d\n", rc);
1106 return rc; 1118 return rc;
@@ -1134,6 +1146,7 @@ static int ceph_update_writeable_page(struct file *file,
1134 struct page *page) 1146 struct page *page)
1135{ 1147{
1136 struct inode *inode = file_inode(file); 1148 struct inode *inode = file_inode(file);
1149 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1137 struct ceph_inode_info *ci = ceph_inode(inode); 1150 struct ceph_inode_info *ci = ceph_inode(inode);
1138 loff_t page_off = pos & PAGE_MASK; 1151 loff_t page_off = pos & PAGE_MASK;
1139 int pos_in_page = pos & ~PAGE_MASK; 1152 int pos_in_page = pos & ~PAGE_MASK;
@@ -1142,6 +1155,12 @@ static int ceph_update_writeable_page(struct file *file,
1142 int r; 1155 int r;
1143 struct ceph_snap_context *snapc, *oldest; 1156 struct ceph_snap_context *snapc, *oldest;
1144 1157
1158 if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1159 dout(" page %p forced umount\n", page);
1160 unlock_page(page);
1161 return -EIO;
1162 }
1163
1145retry_locked: 1164retry_locked:
1146 /* writepages currently holds page lock, but if we change that later, */ 1165 /* writepages currently holds page lock, but if we change that later, */
1147 wait_on_page_writeback(page); 1166 wait_on_page_writeback(page);
@@ -1165,7 +1184,7 @@ retry_locked:
1165 snapc = ceph_get_snap_context(snapc); 1184 snapc = ceph_get_snap_context(snapc);
1166 unlock_page(page); 1185 unlock_page(page);
1167 ceph_queue_writeback(inode); 1186 ceph_queue_writeback(inode);
1168 r = wait_event_interruptible(ci->i_cap_wq, 1187 r = wait_event_killable(ci->i_cap_wq,
1169 context_is_writeable_or_written(inode, snapc)); 1188 context_is_writeable_or_written(inode, snapc));
1170 ceph_put_snap_context(snapc); 1189 ceph_put_snap_context(snapc);
1171 if (r == -ERESTARTSYS) 1190 if (r == -ERESTARTSYS)
@@ -1311,6 +1330,17 @@ const struct address_space_operations ceph_aops = {
1311 .direct_IO = ceph_direct_io, 1330 .direct_IO = ceph_direct_io,
1312}; 1331};
1313 1332
1333static void ceph_block_sigs(sigset_t *oldset)
1334{
1335 sigset_t mask;
1336 siginitsetinv(&mask, sigmask(SIGKILL));
1337 sigprocmask(SIG_BLOCK, &mask, oldset);
1338}
1339
1340static void ceph_restore_sigs(sigset_t *oldset)
1341{
1342 sigprocmask(SIG_SETMASK, oldset, NULL);
1343}
1314 1344
1315/* 1345/*
1316 * vm ops 1346 * vm ops
@@ -1323,6 +1353,9 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1323 struct page *pinned_page = NULL; 1353 struct page *pinned_page = NULL;
1324 loff_t off = vmf->pgoff << PAGE_SHIFT; 1354 loff_t off = vmf->pgoff << PAGE_SHIFT;
1325 int want, got, ret; 1355 int want, got, ret;
1356 sigset_t oldset;
1357
1358 ceph_block_sigs(&oldset);
1326 1359
1327 dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n", 1360 dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n",
1328 inode, ceph_vinop(inode), off, (size_t)PAGE_SIZE); 1361 inode, ceph_vinop(inode), off, (size_t)PAGE_SIZE);
@@ -1330,17 +1363,12 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1330 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; 1363 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
1331 else 1364 else
1332 want = CEPH_CAP_FILE_CACHE; 1365 want = CEPH_CAP_FILE_CACHE;
1333 while (1) { 1366
1334 got = 0; 1367 got = 0;
1335 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, 1368 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
1336 -1, &got, &pinned_page); 1369 if (ret < 0)
1337 if (ret == 0) 1370 goto out_restore;
1338 break; 1371
1339 if (ret != -ERESTARTSYS) {
1340 WARN_ON(1);
1341 return VM_FAULT_SIGBUS;
1342 }
1343 }
1344 dout("filemap_fault %p %llu~%zd got cap refs on %s\n", 1372 dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
1345 inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got)); 1373 inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got));
1346 1374
@@ -1357,7 +1385,7 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1357 ceph_put_cap_refs(ci, got); 1385 ceph_put_cap_refs(ci, got);
1358 1386
1359 if (ret != -EAGAIN) 1387 if (ret != -EAGAIN)
1360 return ret; 1388 goto out_restore;
1361 1389
1362 /* read inline data */ 1390 /* read inline data */
1363 if (off >= PAGE_SIZE) { 1391 if (off >= PAGE_SIZE) {
@@ -1371,15 +1399,18 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1371 ~__GFP_FS)); 1399 ~__GFP_FS));
1372 if (!page) { 1400 if (!page) {
1373 ret = VM_FAULT_OOM; 1401 ret = VM_FAULT_OOM;
1374 goto out; 1402 goto out_inline;
1375 } 1403 }
1376 ret1 = __ceph_do_getattr(inode, page, 1404 ret1 = __ceph_do_getattr(inode, page,
1377 CEPH_STAT_CAP_INLINE_DATA, true); 1405 CEPH_STAT_CAP_INLINE_DATA, true);
1378 if (ret1 < 0 || off >= i_size_read(inode)) { 1406 if (ret1 < 0 || off >= i_size_read(inode)) {
1379 unlock_page(page); 1407 unlock_page(page);
1380 put_page(page); 1408 put_page(page);
1381 ret = VM_FAULT_SIGBUS; 1409 if (ret1 < 0)
1382 goto out; 1410 ret = ret1;
1411 else
1412 ret = VM_FAULT_SIGBUS;
1413 goto out_inline;
1383 } 1414 }
1384 if (ret1 < PAGE_SIZE) 1415 if (ret1 < PAGE_SIZE)
1385 zero_user_segment(page, ret1, PAGE_SIZE); 1416 zero_user_segment(page, ret1, PAGE_SIZE);
@@ -1388,10 +1419,15 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1388 SetPageUptodate(page); 1419 SetPageUptodate(page);
1389 vmf->page = page; 1420 vmf->page = page;
1390 ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED; 1421 ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
1422out_inline:
1423 dout("filemap_fault %p %llu~%zd read inline data ret %d\n",
1424 inode, off, (size_t)PAGE_SIZE, ret);
1391 } 1425 }
1392out: 1426out_restore:
1393 dout("filemap_fault %p %llu~%zd read inline data ret %d\n", 1427 ceph_restore_sigs(&oldset);
1394 inode, off, (size_t)PAGE_SIZE, ret); 1428 if (ret < 0)
1429 ret = (ret == -ENOMEM) ? VM_FAULT_OOM : VM_FAULT_SIGBUS;
1430
1395 return ret; 1431 return ret;
1396} 1432}
1397 1433
@@ -1409,10 +1445,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1409 loff_t size = i_size_read(inode); 1445 loff_t size = i_size_read(inode);
1410 size_t len; 1446 size_t len;
1411 int want, got, ret; 1447 int want, got, ret;
1448 sigset_t oldset;
1412 1449
1413 prealloc_cf = ceph_alloc_cap_flush(); 1450 prealloc_cf = ceph_alloc_cap_flush();
1414 if (!prealloc_cf) 1451 if (!prealloc_cf)
1415 return VM_FAULT_SIGBUS; 1452 return VM_FAULT_OOM;
1453
1454 ceph_block_sigs(&oldset);
1416 1455
1417 if (ci->i_inline_version != CEPH_INLINE_NONE) { 1456 if (ci->i_inline_version != CEPH_INLINE_NONE) {
1418 struct page *locked_page = NULL; 1457 struct page *locked_page = NULL;
@@ -1423,10 +1462,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1423 ret = ceph_uninline_data(vma->vm_file, locked_page); 1462 ret = ceph_uninline_data(vma->vm_file, locked_page);
1424 if (locked_page) 1463 if (locked_page)
1425 unlock_page(locked_page); 1464 unlock_page(locked_page);
1426 if (ret < 0) { 1465 if (ret < 0)
1427 ret = VM_FAULT_SIGBUS;
1428 goto out_free; 1466 goto out_free;
1429 }
1430 } 1467 }
1431 1468
1432 if (off + PAGE_SIZE <= size) 1469 if (off + PAGE_SIZE <= size)
@@ -1440,45 +1477,36 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1440 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; 1477 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1441 else 1478 else
1442 want = CEPH_CAP_FILE_BUFFER; 1479 want = CEPH_CAP_FILE_BUFFER;
1443 while (1) { 1480
1444 got = 0; 1481 got = 0;
1445 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len, 1482 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
1446 &got, NULL); 1483 &got, NULL);
1447 if (ret == 0) 1484 if (ret < 0)
1448 break; 1485 goto out_free;
1449 if (ret != -ERESTARTSYS) { 1486
1450 WARN_ON(1);
1451 ret = VM_FAULT_SIGBUS;
1452 goto out_free;
1453 }
1454 }
1455 dout("page_mkwrite %p %llu~%zd got cap refs on %s\n", 1487 dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
1456 inode, off, len, ceph_cap_string(got)); 1488 inode, off, len, ceph_cap_string(got));
1457 1489
1458 /* Update time before taking page lock */ 1490 /* Update time before taking page lock */
1459 file_update_time(vma->vm_file); 1491 file_update_time(vma->vm_file);
1460 1492
1461 lock_page(page); 1493 do {
1494 lock_page(page);
1462 1495
1463 ret = VM_FAULT_NOPAGE; 1496 if ((off > size) || (page->mapping != inode->i_mapping)) {
1464 if ((off > size) || 1497 unlock_page(page);
1465 (page->mapping != inode->i_mapping)) { 1498 ret = VM_FAULT_NOPAGE;
1466 unlock_page(page); 1499 break;
1467 goto out; 1500 }
1468 } 1501
1502 ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
1503 if (ret >= 0) {
1504 /* success. we'll keep the page locked. */
1505 set_page_dirty(page);
1506 ret = VM_FAULT_LOCKED;
1507 }
1508 } while (ret == -EAGAIN);
1469 1509
1470 ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
1471 if (ret >= 0) {
1472 /* success. we'll keep the page locked. */
1473 set_page_dirty(page);
1474 ret = VM_FAULT_LOCKED;
1475 } else {
1476 if (ret == -ENOMEM)
1477 ret = VM_FAULT_OOM;
1478 else
1479 ret = VM_FAULT_SIGBUS;
1480 }
1481out:
1482 if (ret == VM_FAULT_LOCKED || 1510 if (ret == VM_FAULT_LOCKED ||
1483 ci->i_inline_version != CEPH_INLINE_NONE) { 1511 ci->i_inline_version != CEPH_INLINE_NONE) {
1484 int dirty; 1512 int dirty;
@@ -1495,8 +1523,10 @@ out:
1495 inode, off, len, ceph_cap_string(got), ret); 1523 inode, off, len, ceph_cap_string(got), ret);
1496 ceph_put_cap_refs(ci, got); 1524 ceph_put_cap_refs(ci, got);
1497out_free: 1525out_free:
1526 ceph_restore_sigs(&oldset);
1498 ceph_free_cap_flush(prealloc_cf); 1527 ceph_free_cap_flush(prealloc_cf);
1499 1528 if (ret < 0)
1529 ret = (ret == -ENOMEM) ? VM_FAULT_OOM : VM_FAULT_SIGBUS;
1500 return ret; 1530 return ret;
1501} 1531}
1502 1532
@@ -1614,7 +1644,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
1614 goto out; 1644 goto out;
1615 } 1645 }
1616 1646
1617 ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime); 1647 req->r_mtime = inode->i_mtime;
1618 err = ceph_osdc_start_request(&fsc->client->osdc, req, false); 1648 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1619 if (!err) 1649 if (!err)
1620 err = ceph_osdc_wait_request(&fsc->client->osdc, req); 1650 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
@@ -1657,7 +1687,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
1657 goto out_put; 1687 goto out_put;
1658 } 1688 }
1659 1689
1660 ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime); 1690 req->r_mtime = inode->i_mtime;
1661 err = ceph_osdc_start_request(&fsc->client->osdc, req, false); 1691 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1662 if (!err) 1692 if (!err)
1663 err = ceph_osdc_wait_request(&fsc->client->osdc, req); 1693 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
@@ -1758,9 +1788,11 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
1758 rd_req->r_flags = CEPH_OSD_FLAG_READ; 1788 rd_req->r_flags = CEPH_OSD_FLAG_READ;
1759 osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0); 1789 osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
1760 rd_req->r_base_oloc.pool = pool; 1790 rd_req->r_base_oloc.pool = pool;
1761 snprintf(rd_req->r_base_oid.name, sizeof(rd_req->r_base_oid.name), 1791 ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino);
1762 "%llx.00000000", ci->i_vino.ino); 1792
1763 rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name); 1793 err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
1794 if (err)
1795 goto out_unlock;
1764 1796
1765 wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL, 1797 wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1766 1, false, GFP_NOFS); 1798 1, false, GFP_NOFS);
@@ -1769,11 +1801,14 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
1769 goto out_unlock; 1801 goto out_unlock;
1770 } 1802 }
1771 1803
1772 wr_req->r_flags = CEPH_OSD_FLAG_WRITE | 1804 wr_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ACK;
1773 CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
1774 osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL); 1805 osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
1775 wr_req->r_base_oloc.pool = pool; 1806 ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc);
1776 wr_req->r_base_oid = rd_req->r_base_oid; 1807 ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid);
1808
1809 err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS);
1810 if (err)
1811 goto out_unlock;
1777 1812
1778 /* one page should be large enough for STAT data */ 1813 /* one page should be large enough for STAT data */
1779 pages = ceph_alloc_page_vector(1, GFP_KERNEL); 1814 pages = ceph_alloc_page_vector(1, GFP_KERNEL);
@@ -1784,12 +1819,9 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
1784 1819
1785 osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE, 1820 osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
1786 0, false, true); 1821 0, false, true);
1787 ceph_osdc_build_request(rd_req, 0, NULL, CEPH_NOSNAP,
1788 &ci->vfs_inode.i_mtime);
1789 err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false); 1822 err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
1790 1823
1791 ceph_osdc_build_request(wr_req, 0, NULL, CEPH_NOSNAP, 1824 wr_req->r_mtime = ci->vfs_inode.i_mtime;
1792 &ci->vfs_inode.i_mtime);
1793 err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false); 1825 err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
1794 1826
1795 if (!err) 1827 if (!err)
@@ -1823,10 +1855,8 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
1823out_unlock: 1855out_unlock:
1824 up_write(&mdsc->pool_perm_rwsem); 1856 up_write(&mdsc->pool_perm_rwsem);
1825 1857
1826 if (rd_req) 1858 ceph_osdc_put_request(rd_req);
1827 ceph_osdc_put_request(rd_req); 1859 ceph_osdc_put_request(wr_req);
1828 if (wr_req)
1829 ceph_osdc_put_request(wr_req);
1830out: 1860out:
1831 if (!err) 1861 if (!err)
1832 err = have; 1862 err = have;
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index a351480dbabc..c052b5bf219b 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -236,7 +236,7 @@ static void ceph_vfs_readpage_complete_unlock(struct page *page, void *data, int
236 unlock_page(page); 236 unlock_page(page);
237} 237}
238 238
239static inline int cache_valid(struct ceph_inode_info *ci) 239static inline bool cache_valid(struct ceph_inode_info *ci)
240{ 240{
241 return ((ceph_caps_issued(ci) & CEPH_CAP_FILE_CACHE) && 241 return ((ceph_caps_issued(ci) & CEPH_CAP_FILE_CACHE) &&
242 (ci->i_fscache_gen == ci->i_rdcache_gen)); 242 (ci->i_fscache_gen == ci->i_rdcache_gen));
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index cfaeef18cbca..c17b5d76d75e 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1656,7 +1656,7 @@ retry_locked:
1656 */ 1656 */
1657 if ((!is_delayed || mdsc->stopping) && 1657 if ((!is_delayed || mdsc->stopping) &&
1658 !S_ISDIR(inode->i_mode) && /* ignore readdir cache */ 1658 !S_ISDIR(inode->i_mode) && /* ignore readdir cache */
1659 ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ 1659 !(ci->i_wb_ref || ci->i_wrbuffer_ref) && /* no dirty pages... */
1660 inode->i_data.nrpages && /* have cached pages */ 1660 inode->i_data.nrpages && /* have cached pages */
1661 (revoking & (CEPH_CAP_FILE_CACHE| 1661 (revoking & (CEPH_CAP_FILE_CACHE|
1662 CEPH_CAP_FILE_LAZYIO)) && /* or revoking cache */ 1662 CEPH_CAP_FILE_LAZYIO)) && /* or revoking cache */
@@ -1698,8 +1698,8 @@ retry_locked:
1698 1698
1699 revoking = cap->implemented & ~cap->issued; 1699 revoking = cap->implemented & ~cap->issued;
1700 dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n", 1700 dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n",
1701 cap->mds, cap, ceph_cap_string(cap->issued), 1701 cap->mds, cap, ceph_cap_string(cap_used),
1702 ceph_cap_string(cap_used), 1702 ceph_cap_string(cap->issued),
1703 ceph_cap_string(cap->implemented), 1703 ceph_cap_string(cap->implemented),
1704 ceph_cap_string(revoking)); 1704 ceph_cap_string(revoking));
1705 1705
@@ -2317,7 +2317,7 @@ again:
2317 2317
2318 /* make sure file is actually open */ 2318 /* make sure file is actually open */
2319 file_wanted = __ceph_caps_file_wanted(ci); 2319 file_wanted = __ceph_caps_file_wanted(ci);
2320 if ((file_wanted & need) == 0) { 2320 if ((file_wanted & need) != need) {
2321 dout("try_get_cap_refs need %s file_wanted %s, EBADF\n", 2321 dout("try_get_cap_refs need %s file_wanted %s, EBADF\n",
2322 ceph_cap_string(need), ceph_cap_string(file_wanted)); 2322 ceph_cap_string(need), ceph_cap_string(file_wanted));
2323 *err = -EBADF; 2323 *err = -EBADF;
@@ -2412,12 +2412,26 @@ again:
2412 goto out_unlock; 2412 goto out_unlock;
2413 } 2413 }
2414 2414
2415 if (!__ceph_is_any_caps(ci) && 2415 if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) {
2416 ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2416 int mds_wanted;
2417 dout("get_cap_refs %p forced umount\n", inode); 2417 if (ACCESS_ONCE(mdsc->fsc->mount_state) ==
2418 *err = -EIO; 2418 CEPH_MOUNT_SHUTDOWN) {
2419 ret = 1; 2419 dout("get_cap_refs %p forced umount\n", inode);
2420 goto out_unlock; 2420 *err = -EIO;
2421 ret = 1;
2422 goto out_unlock;
2423 }
2424 mds_wanted = __ceph_caps_mds_wanted(ci);
2425 if ((mds_wanted & need) != need) {
2426 dout("get_cap_refs %p caps were dropped"
2427 " (session killed?)\n", inode);
2428 *err = -ESTALE;
2429 ret = 1;
2430 goto out_unlock;
2431 }
2432 if ((mds_wanted & file_wanted) ==
2433 (file_wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR)))
2434 ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED;
2421 } 2435 }
2422 2436
2423 dout("get_cap_refs %p have %s needed %s\n", inode, 2437 dout("get_cap_refs %p have %s needed %s\n", inode,
@@ -2487,7 +2501,7 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
2487 if (err == -EAGAIN) 2501 if (err == -EAGAIN)
2488 continue; 2502 continue;
2489 if (err < 0) 2503 if (err < 0)
2490 return err; 2504 ret = err;
2491 } else { 2505 } else {
2492 ret = wait_event_interruptible(ci->i_cap_wq, 2506 ret = wait_event_interruptible(ci->i_cap_wq,
2493 try_get_cap_refs(ci, need, want, endoff, 2507 try_get_cap_refs(ci, need, want, endoff,
@@ -2496,8 +2510,15 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
2496 continue; 2510 continue;
2497 if (err < 0) 2511 if (err < 0)
2498 ret = err; 2512 ret = err;
2499 if (ret < 0) 2513 }
2500 return ret; 2514 if (ret < 0) {
2515 if (err == -ESTALE) {
2516 /* session was killed, try renew caps */
2517 ret = ceph_renew_caps(&ci->vfs_inode);
2518 if (ret == 0)
2519 continue;
2520 }
2521 return ret;
2501 } 2522 }
2502 2523
2503 if (ci->i_inline_version != CEPH_INLINE_NONE && 2524 if (ci->i_inline_version != CEPH_INLINE_NONE &&
@@ -2807,7 +2828,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
2807 if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */ 2828 if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */
2808 ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && 2829 ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
2809 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && 2830 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
2810 !ci->i_wrbuffer_ref) { 2831 !(ci->i_wrbuffer_ref || ci->i_wb_ref)) {
2811 if (try_nonblocking_invalidate(inode)) { 2832 if (try_nonblocking_invalidate(inode)) {
2812 /* there were locked pages.. invalidate later 2833 /* there were locked pages.. invalidate later
2813 in a separate thread. */ 2834 in a separate thread. */
@@ -3226,6 +3247,8 @@ retry:
3226 3247
3227 if (target < 0) { 3248 if (target < 0) {
3228 __ceph_remove_cap(cap, false); 3249 __ceph_remove_cap(cap, false);
3250 if (!ci->i_auth_cap)
3251 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
3229 goto out_unlock; 3252 goto out_unlock;
3230 } 3253 }
3231 3254
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 31f831471ed2..39ff678e567f 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -109,7 +109,7 @@ static int mdsc_show(struct seq_file *s, void *p)
109 path ? path : ""); 109 path ? path : "");
110 spin_unlock(&req->r_old_dentry->d_lock); 110 spin_unlock(&req->r_old_dentry->d_lock);
111 kfree(path); 111 kfree(path);
112 } else if (req->r_path2) { 112 } else if (req->r_path2 && req->r_op != CEPH_MDS_OP_SYMLINK) {
113 if (req->r_ino2.ino) 113 if (req->r_ino2.ino)
114 seq_printf(s, " #%llx/%s", req->r_ino2.ino, 114 seq_printf(s, " #%llx/%s", req->r_ino2.ino,
115 req->r_path2); 115 req->r_path2);
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 3ab1192d2029..6e0fedf6713b 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -70,16 +70,42 @@ out_unlock:
70} 70}
71 71
72/* 72/*
73 * for readdir, we encode the directory frag and offset within that 73 * for f_pos for readdir:
74 * frag into f_pos. 74 * - hash order:
75 * (0xff << 52) | ((24 bits hash) << 28) |
76 * (the nth entry has hash collision);
77 * - frag+name order;
78 * ((frag value) << 28) | (the nth entry in frag);
75 */ 79 */
80#define OFFSET_BITS 28
81#define OFFSET_MASK ((1 << OFFSET_BITS) - 1)
82#define HASH_ORDER (0xffull << (OFFSET_BITS + 24))
83loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order)
84{
85 loff_t fpos = ((loff_t)high << 28) | (loff_t)off;
86 if (hash_order)
87 fpos |= HASH_ORDER;
88 return fpos;
89}
90
91static bool is_hash_order(loff_t p)
92{
93 return (p & HASH_ORDER) == HASH_ORDER;
94}
95
76static unsigned fpos_frag(loff_t p) 96static unsigned fpos_frag(loff_t p)
77{ 97{
78 return p >> 32; 98 return p >> OFFSET_BITS;
79} 99}
100
101static unsigned fpos_hash(loff_t p)
102{
103 return ceph_frag_value(fpos_frag(p));
104}
105
80static unsigned fpos_off(loff_t p) 106static unsigned fpos_off(loff_t p)
81{ 107{
82 return p & 0xffffffff; 108 return p & OFFSET_MASK;
83} 109}
84 110
85static int fpos_cmp(loff_t l, loff_t r) 111static int fpos_cmp(loff_t l, loff_t r)
@@ -111,6 +137,50 @@ static int note_last_dentry(struct ceph_file_info *fi, const char *name,
111 return 0; 137 return 0;
112} 138}
113 139
140
141static struct dentry *
142__dcache_find_get_entry(struct dentry *parent, u64 idx,
143 struct ceph_readdir_cache_control *cache_ctl)
144{
145 struct inode *dir = d_inode(parent);
146 struct dentry *dentry;
147 unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1;
148 loff_t ptr_pos = idx * sizeof(struct dentry *);
149 pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT;
150
151 if (ptr_pos >= i_size_read(dir))
152 return NULL;
153
154 if (!cache_ctl->page || ptr_pgoff != page_index(cache_ctl->page)) {
155 ceph_readdir_cache_release(cache_ctl);
156 cache_ctl->page = find_lock_page(&dir->i_data, ptr_pgoff);
157 if (!cache_ctl->page) {
158 dout(" page %lu not found\n", ptr_pgoff);
159 return ERR_PTR(-EAGAIN);
160 }
161 /* reading/filling the cache are serialized by
162 i_mutex, no need to use page lock */
163 unlock_page(cache_ctl->page);
164 cache_ctl->dentries = kmap(cache_ctl->page);
165 }
166
167 cache_ctl->index = idx & idx_mask;
168
169 rcu_read_lock();
170 spin_lock(&parent->d_lock);
171 /* check i_size again here, because empty directory can be
172 * marked as complete while not holding the i_mutex. */
173 if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir))
174 dentry = cache_ctl->dentries[cache_ctl->index];
175 else
176 dentry = NULL;
177 spin_unlock(&parent->d_lock);
178 if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
179 dentry = NULL;
180 rcu_read_unlock();
181 return dentry ? : ERR_PTR(-EAGAIN);
182}
183
114/* 184/*
115 * When possible, we try to satisfy a readdir by peeking at the 185 * When possible, we try to satisfy a readdir by peeking at the
116 * dcache. We make this work by carefully ordering dentries on 186 * dcache. We make this work by carefully ordering dentries on
@@ -130,75 +200,68 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
130 struct inode *dir = d_inode(parent); 200 struct inode *dir = d_inode(parent);
131 struct dentry *dentry, *last = NULL; 201 struct dentry *dentry, *last = NULL;
132 struct ceph_dentry_info *di; 202 struct ceph_dentry_info *di;
133 unsigned nsize = PAGE_SIZE / sizeof(struct dentry *);
134 int err = 0;
135 loff_t ptr_pos = 0;
136 struct ceph_readdir_cache_control cache_ctl = {}; 203 struct ceph_readdir_cache_control cache_ctl = {};
204 u64 idx = 0;
205 int err = 0;
137 206
138 dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos); 207 dout("__dcache_readdir %p v%u at %llx\n", dir, shared_gen, ctx->pos);
208
209 /* search start position */
210 if (ctx->pos > 2) {
211 u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *));
212 while (count > 0) {
213 u64 step = count >> 1;
214 dentry = __dcache_find_get_entry(parent, idx + step,
215 &cache_ctl);
216 if (!dentry) {
217 /* use linar search */
218 idx = 0;
219 break;
220 }
221 if (IS_ERR(dentry)) {
222 err = PTR_ERR(dentry);
223 goto out;
224 }
225 di = ceph_dentry(dentry);
226 spin_lock(&dentry->d_lock);
227 if (fpos_cmp(di->offset, ctx->pos) < 0) {
228 idx += step + 1;
229 count -= step + 1;
230 } else {
231 count = step;
232 }
233 spin_unlock(&dentry->d_lock);
234 dput(dentry);
235 }
139 236
140 /* we can calculate cache index for the first dirfrag */ 237 dout("__dcache_readdir %p cache idx %llu\n", dir, idx);
141 if (ceph_frag_is_leftmost(fpos_frag(ctx->pos))) {
142 cache_ctl.index = fpos_off(ctx->pos) - 2;
143 BUG_ON(cache_ctl.index < 0);
144 ptr_pos = cache_ctl.index * sizeof(struct dentry *);
145 } 238 }
146 239
147 while (true) {
148 pgoff_t pgoff;
149 bool emit_dentry;
150 240
151 if (ptr_pos >= i_size_read(dir)) { 241 for (;;) {
242 bool emit_dentry = false;
243 dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl);
244 if (!dentry) {
152 fi->flags |= CEPH_F_ATEND; 245 fi->flags |= CEPH_F_ATEND;
153 err = 0; 246 err = 0;
154 break; 247 break;
155 } 248 }
156 249 if (IS_ERR(dentry)) {
157 err = -EAGAIN; 250 err = PTR_ERR(dentry);
158 pgoff = ptr_pos >> PAGE_SHIFT; 251 goto out;
159 if (!cache_ctl.page || pgoff != page_index(cache_ctl.page)) {
160 ceph_readdir_cache_release(&cache_ctl);
161 cache_ctl.page = find_lock_page(&dir->i_data, pgoff);
162 if (!cache_ctl.page) {
163 dout(" page %lu not found\n", pgoff);
164 break;
165 }
166 /* reading/filling the cache are serialized by
167 * i_mutex, no need to use page lock */
168 unlock_page(cache_ctl.page);
169 cache_ctl.dentries = kmap(cache_ctl.page);
170 } 252 }
171 253
172 rcu_read_lock();
173 spin_lock(&parent->d_lock);
174 /* check i_size again here, because empty directory can be
175 * marked as complete while not holding the i_mutex. */
176 if (ceph_dir_is_complete_ordered(dir) &&
177 ptr_pos < i_size_read(dir))
178 dentry = cache_ctl.dentries[cache_ctl.index % nsize];
179 else
180 dentry = NULL;
181 spin_unlock(&parent->d_lock);
182 if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
183 dentry = NULL;
184 rcu_read_unlock();
185 if (!dentry)
186 break;
187
188 emit_dentry = false;
189 di = ceph_dentry(dentry); 254 di = ceph_dentry(dentry);
190 spin_lock(&dentry->d_lock); 255 spin_lock(&dentry->d_lock);
191 if (di->lease_shared_gen == shared_gen && 256 if (di->lease_shared_gen == shared_gen &&
192 d_really_is_positive(dentry) && 257 d_really_is_positive(dentry) &&
193 ceph_snap(d_inode(dentry)) != CEPH_SNAPDIR &&
194 ceph_ino(d_inode(dentry)) != CEPH_INO_CEPH &&
195 fpos_cmp(ctx->pos, di->offset) <= 0) { 258 fpos_cmp(ctx->pos, di->offset) <= 0) {
196 emit_dentry = true; 259 emit_dentry = true;
197 } 260 }
198 spin_unlock(&dentry->d_lock); 261 spin_unlock(&dentry->d_lock);
199 262
200 if (emit_dentry) { 263 if (emit_dentry) {
201 dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos, 264 dout(" %llx dentry %p %pd %p\n", di->offset,
202 dentry, dentry, d_inode(dentry)); 265 dentry, dentry, d_inode(dentry));
203 ctx->pos = di->offset; 266 ctx->pos = di->offset;
204 if (!dir_emit(ctx, dentry->d_name.name, 267 if (!dir_emit(ctx, dentry->d_name.name,
@@ -218,10 +281,8 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
218 } else { 281 } else {
219 dput(dentry); 282 dput(dentry);
220 } 283 }
221
222 cache_ctl.index++;
223 ptr_pos += sizeof(struct dentry *);
224 } 284 }
285out:
225 ceph_readdir_cache_release(&cache_ctl); 286 ceph_readdir_cache_release(&cache_ctl);
226 if (last) { 287 if (last) {
227 int ret; 288 int ret;
@@ -235,6 +296,16 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
235 return err; 296 return err;
236} 297}
237 298
299static bool need_send_readdir(struct ceph_file_info *fi, loff_t pos)
300{
301 if (!fi->last_readdir)
302 return true;
303 if (is_hash_order(pos))
304 return !ceph_frag_contains_value(fi->frag, fpos_hash(pos));
305 else
306 return fi->frag != fpos_frag(pos);
307}
308
238static int ceph_readdir(struct file *file, struct dir_context *ctx) 309static int ceph_readdir(struct file *file, struct dir_context *ctx)
239{ 310{
240 struct ceph_file_info *fi = file->private_data; 311 struct ceph_file_info *fi = file->private_data;
@@ -242,13 +313,12 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
242 struct ceph_inode_info *ci = ceph_inode(inode); 313 struct ceph_inode_info *ci = ceph_inode(inode);
243 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 314 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
244 struct ceph_mds_client *mdsc = fsc->mdsc; 315 struct ceph_mds_client *mdsc = fsc->mdsc;
245 unsigned frag = fpos_frag(ctx->pos); 316 int i;
246 int off = fpos_off(ctx->pos);
247 int err; 317 int err;
248 u32 ftype; 318 u32 ftype;
249 struct ceph_mds_reply_info_parsed *rinfo; 319 struct ceph_mds_reply_info_parsed *rinfo;
250 320
251 dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off); 321 dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos);
252 if (fi->flags & CEPH_F_ATEND) 322 if (fi->flags & CEPH_F_ATEND)
253 return 0; 323 return 0;
254 324
@@ -260,7 +330,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
260 inode->i_mode >> 12)) 330 inode->i_mode >> 12))
261 return 0; 331 return 0;
262 ctx->pos = 1; 332 ctx->pos = 1;
263 off = 1;
264 } 333 }
265 if (ctx->pos == 1) { 334 if (ctx->pos == 1) {
266 ino_t ino = parent_ino(file->f_path.dentry); 335 ino_t ino = parent_ino(file->f_path.dentry);
@@ -270,7 +339,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
270 inode->i_mode >> 12)) 339 inode->i_mode >> 12))
271 return 0; 340 return 0;
272 ctx->pos = 2; 341 ctx->pos = 2;
273 off = 2;
274 } 342 }
275 343
276 /* can we use the dcache? */ 344 /* can we use the dcache? */
@@ -285,8 +353,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
285 err = __dcache_readdir(file, ctx, shared_gen); 353 err = __dcache_readdir(file, ctx, shared_gen);
286 if (err != -EAGAIN) 354 if (err != -EAGAIN)
287 return err; 355 return err;
288 frag = fpos_frag(ctx->pos);
289 off = fpos_off(ctx->pos);
290 } else { 356 } else {
291 spin_unlock(&ci->i_ceph_lock); 357 spin_unlock(&ci->i_ceph_lock);
292 } 358 }
@@ -294,8 +360,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
294 /* proceed with a normal readdir */ 360 /* proceed with a normal readdir */
295more: 361more:
296 /* do we have the correct frag content buffered? */ 362 /* do we have the correct frag content buffered? */
297 if (fi->frag != frag || fi->last_readdir == NULL) { 363 if (need_send_readdir(fi, ctx->pos)) {
298 struct ceph_mds_request *req; 364 struct ceph_mds_request *req;
365 unsigned frag;
299 int op = ceph_snap(inode) == CEPH_SNAPDIR ? 366 int op = ceph_snap(inode) == CEPH_SNAPDIR ?
300 CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR; 367 CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
301 368
@@ -305,6 +372,13 @@ more:
305 fi->last_readdir = NULL; 372 fi->last_readdir = NULL;
306 } 373 }
307 374
375 if (is_hash_order(ctx->pos)) {
376 frag = ceph_choose_frag(ci, fpos_hash(ctx->pos),
377 NULL, NULL);
378 } else {
379 frag = fpos_frag(ctx->pos);
380 }
381
308 dout("readdir fetching %llx.%llx frag %x offset '%s'\n", 382 dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
309 ceph_vinop(inode), frag, fi->last_name); 383 ceph_vinop(inode), frag, fi->last_name);
310 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 384 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
@@ -331,6 +405,8 @@ more:
331 req->r_readdir_cache_idx = fi->readdir_cache_idx; 405 req->r_readdir_cache_idx = fi->readdir_cache_idx;
332 req->r_readdir_offset = fi->next_offset; 406 req->r_readdir_offset = fi->next_offset;
333 req->r_args.readdir.frag = cpu_to_le32(frag); 407 req->r_args.readdir.frag = cpu_to_le32(frag);
408 req->r_args.readdir.flags =
409 cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS);
334 410
335 req->r_inode = inode; 411 req->r_inode = inode;
336 ihold(inode); 412 ihold(inode);
@@ -340,22 +416,26 @@ more:
340 ceph_mdsc_put_request(req); 416 ceph_mdsc_put_request(req);
341 return err; 417 return err;
342 } 418 }
343 dout("readdir got and parsed readdir result=%d" 419 dout("readdir got and parsed readdir result=%d on "
344 " on frag %x, end=%d, complete=%d\n", err, frag, 420 "frag %x, end=%d, complete=%d, hash_order=%d\n",
421 err, frag,
345 (int)req->r_reply_info.dir_end, 422 (int)req->r_reply_info.dir_end,
346 (int)req->r_reply_info.dir_complete); 423 (int)req->r_reply_info.dir_complete,
347 424 (int)req->r_reply_info.hash_order);
348 425
349 /* note next offset and last dentry name */
350 rinfo = &req->r_reply_info; 426 rinfo = &req->r_reply_info;
351 if (le32_to_cpu(rinfo->dir_dir->frag) != frag) { 427 if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
352 frag = le32_to_cpu(rinfo->dir_dir->frag); 428 frag = le32_to_cpu(rinfo->dir_dir->frag);
353 off = req->r_readdir_offset; 429 if (!rinfo->hash_order) {
354 fi->next_offset = off; 430 fi->next_offset = req->r_readdir_offset;
431 /* adjust ctx->pos to beginning of frag */
432 ctx->pos = ceph_make_fpos(frag,
433 fi->next_offset,
434 false);
435 }
355 } 436 }
356 437
357 fi->frag = frag; 438 fi->frag = frag;
358 fi->offset = fi->next_offset;
359 fi->last_readdir = req; 439 fi->last_readdir = req;
360 440
361 if (req->r_did_prepopulate) { 441 if (req->r_did_prepopulate) {
@@ -363,7 +443,8 @@ more:
363 if (fi->readdir_cache_idx < 0) { 443 if (fi->readdir_cache_idx < 0) {
364 /* preclude from marking dir ordered */ 444 /* preclude from marking dir ordered */
365 fi->dir_ordered_count = 0; 445 fi->dir_ordered_count = 0;
366 } else if (ceph_frag_is_leftmost(frag) && off == 2) { 446 } else if (ceph_frag_is_leftmost(frag) &&
447 fi->next_offset == 2) {
367 /* note dir version at start of readdir so 448 /* note dir version at start of readdir so
368 * we can tell if any dentries get dropped */ 449 * we can tell if any dentries get dropped */
369 fi->dir_release_count = req->r_dir_release_cnt; 450 fi->dir_release_count = req->r_dir_release_cnt;
@@ -377,65 +458,87 @@ more:
377 fi->dir_release_count = 0; 458 fi->dir_release_count = 0;
378 } 459 }
379 460
380 if (req->r_reply_info.dir_end) { 461 /* note next offset and last dentry name */
381 kfree(fi->last_name); 462 if (rinfo->dir_nr > 0) {
382 fi->last_name = NULL; 463 struct ceph_mds_reply_dir_entry *rde =
383 if (ceph_frag_is_rightmost(frag)) 464 rinfo->dir_entries + (rinfo->dir_nr-1);
384 fi->next_offset = 2; 465 unsigned next_offset = req->r_reply_info.dir_end ?
385 else 466 2 : (fpos_off(rde->offset) + 1);
386 fi->next_offset = 0; 467 err = note_last_dentry(fi, rde->name, rde->name_len,
387 } else { 468 next_offset);
388 err = note_last_dentry(fi,
389 rinfo->dir_dname[rinfo->dir_nr-1],
390 rinfo->dir_dname_len[rinfo->dir_nr-1],
391 fi->next_offset + rinfo->dir_nr);
392 if (err) 469 if (err)
393 return err; 470 return err;
471 } else if (req->r_reply_info.dir_end) {
472 fi->next_offset = 2;
473 /* keep last name */
394 } 474 }
395 } 475 }
396 476
397 rinfo = &fi->last_readdir->r_reply_info; 477 rinfo = &fi->last_readdir->r_reply_info;
398 dout("readdir frag %x num %d off %d chunkoff %d\n", frag, 478 dout("readdir frag %x num %d pos %llx chunk first %llx\n",
399 rinfo->dir_nr, off, fi->offset); 479 fi->frag, rinfo->dir_nr, ctx->pos,
400 480 rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL);
401 ctx->pos = ceph_make_fpos(frag, off); 481
402 while (off >= fi->offset && off - fi->offset < rinfo->dir_nr) { 482 i = 0;
403 struct ceph_mds_reply_inode *in = 483 /* search start position */
404 rinfo->dir_in[off - fi->offset].in; 484 if (rinfo->dir_nr > 0) {
485 int step, nr = rinfo->dir_nr;
486 while (nr > 0) {
487 step = nr >> 1;
488 if (rinfo->dir_entries[i + step].offset < ctx->pos) {
489 i += step + 1;
490 nr -= step + 1;
491 } else {
492 nr = step;
493 }
494 }
495 }
496 for (; i < rinfo->dir_nr; i++) {
497 struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
405 struct ceph_vino vino; 498 struct ceph_vino vino;
406 ino_t ino; 499 ino_t ino;
407 500
408 dout("readdir off %d (%d/%d) -> %lld '%.*s' %p\n", 501 BUG_ON(rde->offset < ctx->pos);
409 off, off - fi->offset, rinfo->dir_nr, ctx->pos, 502
410 rinfo->dir_dname_len[off - fi->offset], 503 ctx->pos = rde->offset;
411 rinfo->dir_dname[off - fi->offset], in); 504 dout("readdir (%d/%d) -> %llx '%.*s' %p\n",
412 BUG_ON(!in); 505 i, rinfo->dir_nr, ctx->pos,
413 ftype = le32_to_cpu(in->mode) >> 12; 506 rde->name_len, rde->name, &rde->inode.in);
414 vino.ino = le64_to_cpu(in->ino); 507
415 vino.snap = le64_to_cpu(in->snapid); 508 BUG_ON(!rde->inode.in);
509 ftype = le32_to_cpu(rde->inode.in->mode) >> 12;
510 vino.ino = le64_to_cpu(rde->inode.in->ino);
511 vino.snap = le64_to_cpu(rde->inode.in->snapid);
416 ino = ceph_vino_to_ino(vino); 512 ino = ceph_vino_to_ino(vino);
417 if (!dir_emit(ctx, 513
418 rinfo->dir_dname[off - fi->offset], 514 if (!dir_emit(ctx, rde->name, rde->name_len,
419 rinfo->dir_dname_len[off - fi->offset], 515 ceph_translate_ino(inode->i_sb, ino), ftype)) {
420 ceph_translate_ino(inode->i_sb, ino), ftype)) {
421 dout("filldir stopping us...\n"); 516 dout("filldir stopping us...\n");
422 return 0; 517 return 0;
423 } 518 }
424 off++;
425 ctx->pos++; 519 ctx->pos++;
426 } 520 }
427 521
428 if (fi->last_name) { 522 if (fi->next_offset > 2) {
429 ceph_mdsc_put_request(fi->last_readdir); 523 ceph_mdsc_put_request(fi->last_readdir);
430 fi->last_readdir = NULL; 524 fi->last_readdir = NULL;
431 goto more; 525 goto more;
432 } 526 }
433 527
434 /* more frags? */ 528 /* more frags? */
435 if (!ceph_frag_is_rightmost(frag)) { 529 if (!ceph_frag_is_rightmost(fi->frag)) {
436 frag = ceph_frag_next(frag); 530 unsigned frag = ceph_frag_next(fi->frag);
437 off = 0; 531 if (is_hash_order(ctx->pos)) {
438 ctx->pos = ceph_make_fpos(frag, off); 532 loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
533 fi->next_offset, true);
534 if (new_pos > ctx->pos)
535 ctx->pos = new_pos;
536 /* keep last_name */
537 } else {
538 ctx->pos = ceph_make_fpos(frag, fi->next_offset, false);
539 kfree(fi->last_name);
540 fi->last_name = NULL;
541 }
439 dout("readdir next frag is %x\n", frag); 542 dout("readdir next frag is %x\n", frag);
440 goto more; 543 goto more;
441 } 544 }
@@ -467,7 +570,7 @@ more:
467 return 0; 570 return 0;
468} 571}
469 572
470static void reset_readdir(struct ceph_file_info *fi, unsigned frag) 573static void reset_readdir(struct ceph_file_info *fi)
471{ 574{
472 if (fi->last_readdir) { 575 if (fi->last_readdir) {
473 ceph_mdsc_put_request(fi->last_readdir); 576 ceph_mdsc_put_request(fi->last_readdir);
@@ -477,18 +580,38 @@ static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
477 fi->last_name = NULL; 580 fi->last_name = NULL;
478 fi->dir_release_count = 0; 581 fi->dir_release_count = 0;
479 fi->readdir_cache_idx = -1; 582 fi->readdir_cache_idx = -1;
480 if (ceph_frag_is_leftmost(frag)) 583 fi->next_offset = 2; /* compensate for . and .. */
481 fi->next_offset = 2; /* compensate for . and .. */
482 else
483 fi->next_offset = 0;
484 fi->flags &= ~CEPH_F_ATEND; 584 fi->flags &= ~CEPH_F_ATEND;
485} 585}
486 586
587/*
588 * discard buffered readdir content on seekdir(0), or seek to new frag,
589 * or seek prior to current chunk
590 */
591static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos)
592{
593 struct ceph_mds_reply_info_parsed *rinfo;
594 loff_t chunk_offset;
595 if (new_pos == 0)
596 return true;
597 if (is_hash_order(new_pos)) {
598 /* no need to reset last_name for a forward seek when
599 * dentries are sotred in hash order */
600 } else if (fi->frag |= fpos_frag(new_pos)) {
601 return true;
602 }
603 rinfo = fi->last_readdir ? &fi->last_readdir->r_reply_info : NULL;
604 if (!rinfo || !rinfo->dir_nr)
605 return true;
606 chunk_offset = rinfo->dir_entries[0].offset;
607 return new_pos < chunk_offset ||
608 is_hash_order(new_pos) != is_hash_order(chunk_offset);
609}
610
487static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) 611static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
488{ 612{
489 struct ceph_file_info *fi = file->private_data; 613 struct ceph_file_info *fi = file->private_data;
490 struct inode *inode = file->f_mapping->host; 614 struct inode *inode = file->f_mapping->host;
491 loff_t old_offset = ceph_make_fpos(fi->frag, fi->next_offset);
492 loff_t retval; 615 loff_t retval;
493 616
494 inode_lock(inode); 617 inode_lock(inode);
@@ -505,25 +628,22 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
505 } 628 }
506 629
507 if (offset >= 0) { 630 if (offset >= 0) {
631 if (need_reset_readdir(fi, offset)) {
632 dout("dir_llseek dropping %p content\n", file);
633 reset_readdir(fi);
634 } else if (is_hash_order(offset) && offset > file->f_pos) {
635 /* for hash offset, we don't know if a forward seek
636 * is within same frag */
637 fi->dir_release_count = 0;
638 fi->readdir_cache_idx = -1;
639 }
640
508 if (offset != file->f_pos) { 641 if (offset != file->f_pos) {
509 file->f_pos = offset; 642 file->f_pos = offset;
510 file->f_version = 0; 643 file->f_version = 0;
511 fi->flags &= ~CEPH_F_ATEND; 644 fi->flags &= ~CEPH_F_ATEND;
512 } 645 }
513 retval = offset; 646 retval = offset;
514
515 if (offset == 0 ||
516 fpos_frag(offset) != fi->frag ||
517 fpos_off(offset) < fi->offset) {
518 /* discard buffered readdir content on seekdir(0), or
519 * seek to new frag, or seek prior to current chunk */
520 dout("dir_llseek dropping %p content\n", file);
521 reset_readdir(fi, fpos_frag(offset));
522 } else if (fpos_cmp(offset, old_offset) > 0) {
523 /* reset dir_release_count if we did a forward seek */
524 fi->dir_release_count = 0;
525 fi->readdir_cache_idx = -1;
526 }
527 } 647 }
528out: 648out:
529 inode_unlock(inode); 649 inode_unlock(inode);
@@ -591,7 +711,7 @@ struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
591 return dentry; 711 return dentry;
592} 712}
593 713
594static int is_root_ceph_dentry(struct inode *inode, struct dentry *dentry) 714static bool is_root_ceph_dentry(struct inode *inode, struct dentry *dentry)
595{ 715{
596 return ceph_ino(inode) == CEPH_INO_ROOT && 716 return ceph_ino(inode) == CEPH_INO_ROOT &&
597 strncmp(dentry->d_name.name, ".ceph", 5) == 0; 717 strncmp(dentry->d_name.name, ".ceph", 5) == 0;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 4f1dc7120916..a888df6f2d71 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -192,6 +192,59 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
192} 192}
193 193
194/* 194/*
195 * try renew caps after session gets killed.
196 */
197int ceph_renew_caps(struct inode *inode)
198{
199 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
200 struct ceph_inode_info *ci = ceph_inode(inode);
201 struct ceph_mds_request *req;
202 int err, flags, wanted;
203
204 spin_lock(&ci->i_ceph_lock);
205 wanted = __ceph_caps_file_wanted(ci);
206 if (__ceph_is_any_real_caps(ci) &&
207 (!(wanted & CEPH_CAP_ANY_WR) == 0 || ci->i_auth_cap)) {
208 int issued = __ceph_caps_issued(ci, NULL);
209 spin_unlock(&ci->i_ceph_lock);
210 dout("renew caps %p want %s issued %s updating mds_wanted\n",
211 inode, ceph_cap_string(wanted), ceph_cap_string(issued));
212 ceph_check_caps(ci, 0, NULL);
213 return 0;
214 }
215 spin_unlock(&ci->i_ceph_lock);
216
217 flags = 0;
218 if ((wanted & CEPH_CAP_FILE_RD) && (wanted & CEPH_CAP_FILE_WR))
219 flags = O_RDWR;
220 else if (wanted & CEPH_CAP_FILE_RD)
221 flags = O_RDONLY;
222 else if (wanted & CEPH_CAP_FILE_WR)
223 flags = O_WRONLY;
224#ifdef O_LAZY
225 if (wanted & CEPH_CAP_FILE_LAZYIO)
226 flags |= O_LAZY;
227#endif
228
229 req = prepare_open_request(inode->i_sb, flags, 0);
230 if (IS_ERR(req)) {
231 err = PTR_ERR(req);
232 goto out;
233 }
234
235 req->r_inode = inode;
236 ihold(inode);
237 req->r_num_caps = 1;
238 req->r_fmode = -1;
239
240 err = ceph_mdsc_do_request(mdsc, NULL, req);
241 ceph_mdsc_put_request(req);
242out:
243 dout("renew caps %p open result=%d\n", inode, err);
244 return err < 0 ? err : 0;
245}
246
247/*
195 * If we already have the requisite capabilities, we can satisfy 248 * If we already have the requisite capabilities, we can satisfy
196 * the open request locally (no need to request new caps from the 249 * the open request locally (no need to request new caps from the
197 * MDS). We do, however, need to inform the MDS (asynchronously) 250 * MDS). We do, however, need to inform the MDS (asynchronously)
@@ -616,8 +669,7 @@ static void ceph_aio_complete(struct inode *inode,
616 kfree(aio_req); 669 kfree(aio_req);
617} 670}
618 671
619static void ceph_aio_complete_req(struct ceph_osd_request *req, 672static void ceph_aio_complete_req(struct ceph_osd_request *req)
620 struct ceph_msg *msg)
621{ 673{
622 int rc = req->r_result; 674 int rc = req->r_result;
623 struct inode *inode = req->r_inode; 675 struct inode *inode = req->r_inode;
@@ -714,14 +766,21 @@ static void ceph_aio_retry_work(struct work_struct *work)
714 req->r_flags = CEPH_OSD_FLAG_ORDERSNAP | 766 req->r_flags = CEPH_OSD_FLAG_ORDERSNAP |
715 CEPH_OSD_FLAG_ONDISK | 767 CEPH_OSD_FLAG_ONDISK |
716 CEPH_OSD_FLAG_WRITE; 768 CEPH_OSD_FLAG_WRITE;
717 req->r_base_oloc = orig_req->r_base_oloc; 769 ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
718 req->r_base_oid = orig_req->r_base_oid; 770 ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
771
772 ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
773 if (ret) {
774 ceph_osdc_put_request(req);
775 req = orig_req;
776 goto out;
777 }
719 778
720 req->r_ops[0] = orig_req->r_ops[0]; 779 req->r_ops[0] = orig_req->r_ops[0];
721 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); 780 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
722 781
723 ceph_osdc_build_request(req, req->r_ops[0].extent.offset, 782 req->r_mtime = aio_req->mtime;
724 snapc, CEPH_NOSNAP, &aio_req->mtime); 783 req->r_data_offset = req->r_ops[0].extent.offset;
725 784
726 ceph_osdc_put_request(orig_req); 785 ceph_osdc_put_request(orig_req);
727 786
@@ -733,7 +792,7 @@ static void ceph_aio_retry_work(struct work_struct *work)
733out: 792out:
734 if (ret < 0) { 793 if (ret < 0) {
735 req->r_result = ret; 794 req->r_result = ret;
736 ceph_aio_complete_req(req, NULL); 795 ceph_aio_complete_req(req);
737 } 796 }
738 797
739 ceph_put_snap_context(snapc); 798 ceph_put_snap_context(snapc);
@@ -764,6 +823,8 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
764 list_add_tail(&req->r_unsafe_item, 823 list_add_tail(&req->r_unsafe_item,
765 &ci->i_unsafe_writes); 824 &ci->i_unsafe_writes);
766 spin_unlock(&ci->i_unsafe_lock); 825 spin_unlock(&ci->i_unsafe_lock);
826
827 complete_all(&req->r_completion);
767 } else { 828 } else {
768 spin_lock(&ci->i_unsafe_lock); 829 spin_lock(&ci->i_unsafe_lock);
769 list_del_init(&req->r_unsafe_item); 830 list_del_init(&req->r_unsafe_item);
@@ -875,14 +936,12 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
875 (pos+len) | (PAGE_SIZE - 1)); 936 (pos+len) | (PAGE_SIZE - 1));
876 937
877 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); 938 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
939 req->r_mtime = mtime;
878 } 940 }
879 941
880
881 osd_req_op_extent_osd_data_pages(req, 0, pages, len, start, 942 osd_req_op_extent_osd_data_pages(req, 0, pages, len, start,
882 false, false); 943 false, false);
883 944
884 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
885
886 if (aio_req) { 945 if (aio_req) {
887 aio_req->total_len += len; 946 aio_req->total_len += len;
888 aio_req->num_reqs++; 947 aio_req->num_reqs++;
@@ -956,7 +1015,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
956 req, false); 1015 req, false);
957 if (ret < 0) { 1016 if (ret < 0) {
958 req->r_result = ret; 1017 req->r_result = ret;
959 ceph_aio_complete_req(req, NULL); 1018 ceph_aio_complete_req(req);
960 } 1019 }
961 } 1020 }
962 return -EIOCBQUEUED; 1021 return -EIOCBQUEUED;
@@ -1067,9 +1126,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
1067 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, 1126 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
1068 false, true); 1127 false, true);
1069 1128
1070 /* BUG_ON(vino.snap != CEPH_NOSNAP); */ 1129 req->r_mtime = mtime;
1071 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
1072
1073 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); 1130 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1074 if (!ret) 1131 if (!ret)
1075 ret = ceph_osdc_wait_request(&fsc->client->osdc, req); 1132 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
@@ -1524,9 +1581,7 @@ static int ceph_zero_partial_object(struct inode *inode,
1524 goto out; 1581 goto out;
1525 } 1582 }
1526 1583
1527 ceph_osdc_build_request(req, offset, NULL, ceph_vino(inode).snap, 1584 req->r_mtime = inode->i_mtime;
1528 &inode->i_mtime);
1529
1530 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); 1585 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1531 if (!ret) { 1586 if (!ret) {
1532 ret = ceph_osdc_wait_request(&fsc->client->osdc, req); 1587 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e669cfa9d793..f059b5997072 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -11,6 +11,7 @@
11#include <linux/xattr.h> 11#include <linux/xattr.h>
12#include <linux/posix_acl.h> 12#include <linux/posix_acl.h>
13#include <linux/random.h> 13#include <linux/random.h>
14#include <linux/sort.h>
14 15
15#include "super.h" 16#include "super.h"
16#include "mds_client.h" 17#include "mds_client.h"
@@ -254,6 +255,9 @@ static int ceph_fill_dirfrag(struct inode *inode,
254 diri_auth = ci->i_auth_cap->mds; 255 diri_auth = ci->i_auth_cap->mds;
255 spin_unlock(&ci->i_ceph_lock); 256 spin_unlock(&ci->i_ceph_lock);
256 257
258 if (mds == -1) /* CDIR_AUTH_PARENT */
259 mds = diri_auth;
260
257 mutex_lock(&ci->i_fragtree_mutex); 261 mutex_lock(&ci->i_fragtree_mutex);
258 if (ndist == 0 && mds == diri_auth) { 262 if (ndist == 0 && mds == diri_auth) {
259 /* no delegation info needed. */ 263 /* no delegation info needed. */
@@ -300,20 +304,38 @@ out:
300 return err; 304 return err;
301} 305}
302 306
307static int frag_tree_split_cmp(const void *l, const void *r)
308{
309 struct ceph_frag_tree_split *ls = (struct ceph_frag_tree_split*)l;
310 struct ceph_frag_tree_split *rs = (struct ceph_frag_tree_split*)r;
311 return ceph_frag_compare(ls->frag, rs->frag);
312}
313
314static bool is_frag_child(u32 f, struct ceph_inode_frag *frag)
315{
316 if (!frag)
317 return f == ceph_frag_make(0, 0);
318 if (ceph_frag_bits(f) != ceph_frag_bits(frag->frag) + frag->split_by)
319 return false;
320 return ceph_frag_contains_value(frag->frag, ceph_frag_value(f));
321}
322
303static int ceph_fill_fragtree(struct inode *inode, 323static int ceph_fill_fragtree(struct inode *inode,
304 struct ceph_frag_tree_head *fragtree, 324 struct ceph_frag_tree_head *fragtree,
305 struct ceph_mds_reply_dirfrag *dirinfo) 325 struct ceph_mds_reply_dirfrag *dirinfo)
306{ 326{
307 struct ceph_inode_info *ci = ceph_inode(inode); 327 struct ceph_inode_info *ci = ceph_inode(inode);
308 struct ceph_inode_frag *frag; 328 struct ceph_inode_frag *frag, *prev_frag = NULL;
309 struct rb_node *rb_node; 329 struct rb_node *rb_node;
310 int i; 330 unsigned i, split_by, nsplits;
311 u32 id, nsplits; 331 u32 id;
312 bool update = false; 332 bool update = false;
313 333
314 mutex_lock(&ci->i_fragtree_mutex); 334 mutex_lock(&ci->i_fragtree_mutex);
315 nsplits = le32_to_cpu(fragtree->nsplits); 335 nsplits = le32_to_cpu(fragtree->nsplits);
316 if (nsplits) { 336 if (nsplits != ci->i_fragtree_nsplits) {
337 update = true;
338 } else if (nsplits) {
317 i = prandom_u32() % nsplits; 339 i = prandom_u32() % nsplits;
318 id = le32_to_cpu(fragtree->splits[i].frag); 340 id = le32_to_cpu(fragtree->splits[i].frag);
319 if (!__ceph_find_frag(ci, id)) 341 if (!__ceph_find_frag(ci, id))
@@ -332,10 +354,22 @@ static int ceph_fill_fragtree(struct inode *inode,
332 if (!update) 354 if (!update)
333 goto out_unlock; 355 goto out_unlock;
334 356
357 if (nsplits > 1) {
358 sort(fragtree->splits, nsplits, sizeof(fragtree->splits[0]),
359 frag_tree_split_cmp, NULL);
360 }
361
335 dout("fill_fragtree %llx.%llx\n", ceph_vinop(inode)); 362 dout("fill_fragtree %llx.%llx\n", ceph_vinop(inode));
336 rb_node = rb_first(&ci->i_fragtree); 363 rb_node = rb_first(&ci->i_fragtree);
337 for (i = 0; i < nsplits; i++) { 364 for (i = 0; i < nsplits; i++) {
338 id = le32_to_cpu(fragtree->splits[i].frag); 365 id = le32_to_cpu(fragtree->splits[i].frag);
366 split_by = le32_to_cpu(fragtree->splits[i].by);
367 if (split_by == 0 || ceph_frag_bits(id) + split_by > 24) {
368 pr_err("fill_fragtree %llx.%llx invalid split %d/%u, "
369 "frag %x split by %d\n", ceph_vinop(inode),
370 i, nsplits, id, split_by);
371 continue;
372 }
339 frag = NULL; 373 frag = NULL;
340 while (rb_node) { 374 while (rb_node) {
341 frag = rb_entry(rb_node, struct ceph_inode_frag, node); 375 frag = rb_entry(rb_node, struct ceph_inode_frag, node);
@@ -347,8 +381,14 @@ static int ceph_fill_fragtree(struct inode *inode,
347 break; 381 break;
348 } 382 }
349 rb_node = rb_next(rb_node); 383 rb_node = rb_next(rb_node);
350 rb_erase(&frag->node, &ci->i_fragtree); 384 /* delete stale split/leaf node */
351 kfree(frag); 385 if (frag->split_by > 0 ||
386 !is_frag_child(frag->frag, prev_frag)) {
387 rb_erase(&frag->node, &ci->i_fragtree);
388 if (frag->split_by > 0)
389 ci->i_fragtree_nsplits--;
390 kfree(frag);
391 }
352 frag = NULL; 392 frag = NULL;
353 } 393 }
354 if (!frag) { 394 if (!frag) {
@@ -356,14 +396,23 @@ static int ceph_fill_fragtree(struct inode *inode,
356 if (IS_ERR(frag)) 396 if (IS_ERR(frag))
357 continue; 397 continue;
358 } 398 }
359 frag->split_by = le32_to_cpu(fragtree->splits[i].by); 399 if (frag->split_by == 0)
400 ci->i_fragtree_nsplits++;
401 frag->split_by = split_by;
360 dout(" frag %x split by %d\n", frag->frag, frag->split_by); 402 dout(" frag %x split by %d\n", frag->frag, frag->split_by);
403 prev_frag = frag;
361 } 404 }
362 while (rb_node) { 405 while (rb_node) {
363 frag = rb_entry(rb_node, struct ceph_inode_frag, node); 406 frag = rb_entry(rb_node, struct ceph_inode_frag, node);
364 rb_node = rb_next(rb_node); 407 rb_node = rb_next(rb_node);
365 rb_erase(&frag->node, &ci->i_fragtree); 408 /* delete stale split/leaf node */
366 kfree(frag); 409 if (frag->split_by > 0 ||
410 !is_frag_child(frag->frag, prev_frag)) {
411 rb_erase(&frag->node, &ci->i_fragtree);
412 if (frag->split_by > 0)
413 ci->i_fragtree_nsplits--;
414 kfree(frag);
415 }
367 } 416 }
368out_unlock: 417out_unlock:
369 mutex_unlock(&ci->i_fragtree_mutex); 418 mutex_unlock(&ci->i_fragtree_mutex);
@@ -513,6 +562,7 @@ void ceph_destroy_inode(struct inode *inode)
513 rb_erase(n, &ci->i_fragtree); 562 rb_erase(n, &ci->i_fragtree);
514 kfree(frag); 563 kfree(frag);
515 } 564 }
565 ci->i_fragtree_nsplits = 0;
516 566
517 __ceph_destroy_xattrs(ci); 567 __ceph_destroy_xattrs(ci);
518 if (ci->i_xattrs.blob) 568 if (ci->i_xattrs.blob)
@@ -533,6 +583,11 @@ int ceph_drop_inode(struct inode *inode)
533 return 1; 583 return 1;
534} 584}
535 585
586static inline blkcnt_t calc_inode_blocks(u64 size)
587{
588 return (size + (1<<9) - 1) >> 9;
589}
590
536/* 591/*
537 * Helpers to fill in size, ctime, mtime, and atime. We have to be 592 * Helpers to fill in size, ctime, mtime, and atime. We have to be
538 * careful because either the client or MDS may have more up to date 593 * careful because either the client or MDS may have more up to date
@@ -555,7 +610,7 @@ int ceph_fill_file_size(struct inode *inode, int issued,
555 size = 0; 610 size = 0;
556 } 611 }
557 i_size_write(inode, size); 612 i_size_write(inode, size);
558 inode->i_blocks = (size + (1<<9) - 1) >> 9; 613 inode->i_blocks = calc_inode_blocks(size);
559 ci->i_reported_size = size; 614 ci->i_reported_size = size;
560 if (truncate_seq != ci->i_truncate_seq) { 615 if (truncate_seq != ci->i_truncate_seq) {
561 dout("truncate_seq %u -> %u\n", 616 dout("truncate_seq %u -> %u\n",
@@ -814,9 +869,13 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
814 869
815 spin_unlock(&ci->i_ceph_lock); 870 spin_unlock(&ci->i_ceph_lock);
816 871
817 err = -EINVAL; 872 if (symlen != i_size_read(inode)) {
818 if (WARN_ON(symlen != i_size_read(inode))) 873 pr_err("fill_inode %llx.%llx BAD symlink "
819 goto out; 874 "size %lld\n", ceph_vinop(inode),
875 i_size_read(inode));
876 i_size_write(inode, symlen);
877 inode->i_blocks = calc_inode_blocks(symlen);
878 }
820 879
821 err = -ENOMEM; 880 err = -ENOMEM;
822 sym = kstrndup(iinfo->symlink, symlen, GFP_NOFS); 881 sym = kstrndup(iinfo->symlink, symlen, GFP_NOFS);
@@ -1309,12 +1368,13 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
1309 int i, err = 0; 1368 int i, err = 0;
1310 1369
1311 for (i = 0; i < rinfo->dir_nr; i++) { 1370 for (i = 0; i < rinfo->dir_nr; i++) {
1371 struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
1312 struct ceph_vino vino; 1372 struct ceph_vino vino;
1313 struct inode *in; 1373 struct inode *in;
1314 int rc; 1374 int rc;
1315 1375
1316 vino.ino = le64_to_cpu(rinfo->dir_in[i].in->ino); 1376 vino.ino = le64_to_cpu(rde->inode.in->ino);
1317 vino.snap = le64_to_cpu(rinfo->dir_in[i].in->snapid); 1377 vino.snap = le64_to_cpu(rde->inode.in->snapid);
1318 1378
1319 in = ceph_get_inode(req->r_dentry->d_sb, vino); 1379 in = ceph_get_inode(req->r_dentry->d_sb, vino);
1320 if (IS_ERR(in)) { 1380 if (IS_ERR(in)) {
@@ -1322,14 +1382,14 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
1322 dout("new_inode badness got %d\n", err); 1382 dout("new_inode badness got %d\n", err);
1323 continue; 1383 continue;
1324 } 1384 }
1325 rc = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, 1385 rc = fill_inode(in, NULL, &rde->inode, NULL, session,
1326 req->r_request_started, -1, 1386 req->r_request_started, -1,
1327 &req->r_caps_reservation); 1387 &req->r_caps_reservation);
1328 if (rc < 0) { 1388 if (rc < 0) {
1329 pr_err("fill_inode badness on %p got %d\n", in, rc); 1389 pr_err("fill_inode badness on %p got %d\n", in, rc);
1330 err = rc; 1390 err = rc;
1331 continue;
1332 } 1391 }
1392 iput(in);
1333 } 1393 }
1334 1394
1335 return err; 1395 return err;
@@ -1387,6 +1447,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1387 struct ceph_mds_session *session) 1447 struct ceph_mds_session *session)
1388{ 1448{
1389 struct dentry *parent = req->r_dentry; 1449 struct dentry *parent = req->r_dentry;
1450 struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
1390 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 1451 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1391 struct qstr dname; 1452 struct qstr dname;
1392 struct dentry *dn; 1453 struct dentry *dn;
@@ -1394,22 +1455,27 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1394 int err = 0, skipped = 0, ret, i; 1455 int err = 0, skipped = 0, ret, i;
1395 struct inode *snapdir = NULL; 1456 struct inode *snapdir = NULL;
1396 struct ceph_mds_request_head *rhead = req->r_request->front.iov_base; 1457 struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
1397 struct ceph_dentry_info *di;
1398 u32 frag = le32_to_cpu(rhead->args.readdir.frag); 1458 u32 frag = le32_to_cpu(rhead->args.readdir.frag);
1459 u32 last_hash = 0;
1460 u32 fpos_offset;
1399 struct ceph_readdir_cache_control cache_ctl = {}; 1461 struct ceph_readdir_cache_control cache_ctl = {};
1400 1462
1401 if (req->r_aborted) 1463 if (req->r_aborted)
1402 return readdir_prepopulate_inodes_only(req, session); 1464 return readdir_prepopulate_inodes_only(req, session);
1403 1465
1466 if (rinfo->hash_order && req->r_path2) {
1467 last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
1468 req->r_path2, strlen(req->r_path2));
1469 last_hash = ceph_frag_value(last_hash);
1470 }
1471
1404 if (rinfo->dir_dir && 1472 if (rinfo->dir_dir &&
1405 le32_to_cpu(rinfo->dir_dir->frag) != frag) { 1473 le32_to_cpu(rinfo->dir_dir->frag) != frag) {
1406 dout("readdir_prepopulate got new frag %x -> %x\n", 1474 dout("readdir_prepopulate got new frag %x -> %x\n",
1407 frag, le32_to_cpu(rinfo->dir_dir->frag)); 1475 frag, le32_to_cpu(rinfo->dir_dir->frag));
1408 frag = le32_to_cpu(rinfo->dir_dir->frag); 1476 frag = le32_to_cpu(rinfo->dir_dir->frag);
1409 if (ceph_frag_is_leftmost(frag)) 1477 if (!rinfo->hash_order)
1410 req->r_readdir_offset = 2; 1478 req->r_readdir_offset = 2;
1411 else
1412 req->r_readdir_offset = 0;
1413 } 1479 }
1414 1480
1415 if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) { 1481 if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
@@ -1427,24 +1493,37 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1427 if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) { 1493 if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) {
1428 /* note dir version at start of readdir so we can tell 1494 /* note dir version at start of readdir so we can tell
1429 * if any dentries get dropped */ 1495 * if any dentries get dropped */
1430 struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
1431 req->r_dir_release_cnt = atomic64_read(&ci->i_release_count); 1496 req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);
1432 req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count); 1497 req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count);
1433 req->r_readdir_cache_idx = 0; 1498 req->r_readdir_cache_idx = 0;
1434 } 1499 }
1435 1500
1436 cache_ctl.index = req->r_readdir_cache_idx; 1501 cache_ctl.index = req->r_readdir_cache_idx;
1502 fpos_offset = req->r_readdir_offset;
1437 1503
1438 /* FIXME: release caps/leases if error occurs */ 1504 /* FIXME: release caps/leases if error occurs */
1439 for (i = 0; i < rinfo->dir_nr; i++) { 1505 for (i = 0; i < rinfo->dir_nr; i++) {
1506 struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
1440 struct ceph_vino vino; 1507 struct ceph_vino vino;
1441 1508
1442 dname.name = rinfo->dir_dname[i]; 1509 dname.name = rde->name;
1443 dname.len = rinfo->dir_dname_len[i]; 1510 dname.len = rde->name_len;
1444 dname.hash = full_name_hash(dname.name, dname.len); 1511 dname.hash = full_name_hash(dname.name, dname.len);
1445 1512
1446 vino.ino = le64_to_cpu(rinfo->dir_in[i].in->ino); 1513 vino.ino = le64_to_cpu(rde->inode.in->ino);
1447 vino.snap = le64_to_cpu(rinfo->dir_in[i].in->snapid); 1514 vino.snap = le64_to_cpu(rde->inode.in->snapid);
1515
1516 if (rinfo->hash_order) {
1517 u32 hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
1518 rde->name, rde->name_len);
1519 hash = ceph_frag_value(hash);
1520 if (hash != last_hash)
1521 fpos_offset = 2;
1522 last_hash = hash;
1523 rde->offset = ceph_make_fpos(hash, fpos_offset++, true);
1524 } else {
1525 rde->offset = ceph_make_fpos(frag, fpos_offset++, false);
1526 }
1448 1527
1449retry_lookup: 1528retry_lookup:
1450 dn = d_lookup(parent, &dname); 1529 dn = d_lookup(parent, &dname);
@@ -1490,7 +1569,7 @@ retry_lookup:
1490 } 1569 }
1491 } 1570 }
1492 1571
1493 ret = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, 1572 ret = fill_inode(in, NULL, &rde->inode, NULL, session,
1494 req->r_request_started, -1, 1573 req->r_request_started, -1,
1495 &req->r_caps_reservation); 1574 &req->r_caps_reservation);
1496 if (ret < 0) { 1575 if (ret < 0) {
@@ -1523,11 +1602,9 @@ retry_lookup:
1523 dn = realdn; 1602 dn = realdn;
1524 } 1603 }
1525 1604
1526 di = dn->d_fsdata; 1605 ceph_dentry(dn)->offset = rde->offset;
1527 di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset);
1528 1606
1529 update_dentry_lease(dn, rinfo->dir_dlease[i], 1607 update_dentry_lease(dn, rde->lease, req->r_session,
1530 req->r_session,
1531 req->r_request_started); 1608 req->r_request_started);
1532 1609
1533 if (err == 0 && skipped == 0 && cache_ctl.index >= 0) { 1610 if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
@@ -1562,7 +1639,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)
1562 spin_lock(&ci->i_ceph_lock); 1639 spin_lock(&ci->i_ceph_lock);
1563 dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size); 1640 dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
1564 i_size_write(inode, size); 1641 i_size_write(inode, size);
1565 inode->i_blocks = (size + (1 << 9) - 1) >> 9; 1642 inode->i_blocks = calc_inode_blocks(size);
1566 1643
1567 /* tell the MDS if we are approaching max_size */ 1644 /* tell the MDS if we are approaching max_size */
1568 if ((size << 1) >= ci->i_max_size && 1645 if ((size << 1) >= ci->i_max_size &&
@@ -1624,10 +1701,21 @@ static void ceph_invalidate_work(struct work_struct *work)
1624 struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info, 1701 struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
1625 i_pg_inv_work); 1702 i_pg_inv_work);
1626 struct inode *inode = &ci->vfs_inode; 1703 struct inode *inode = &ci->vfs_inode;
1704 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1627 u32 orig_gen; 1705 u32 orig_gen;
1628 int check = 0; 1706 int check = 0;
1629 1707
1630 mutex_lock(&ci->i_truncate_mutex); 1708 mutex_lock(&ci->i_truncate_mutex);
1709
1710 if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1711 pr_warn_ratelimited("invalidate_pages %p %lld forced umount\n",
1712 inode, ceph_ino(inode));
1713 mapping_set_error(inode->i_mapping, -EIO);
1714 truncate_pagecache(inode, 0);
1715 mutex_unlock(&ci->i_truncate_mutex);
1716 goto out;
1717 }
1718
1631 spin_lock(&ci->i_ceph_lock); 1719 spin_lock(&ci->i_ceph_lock);
1632 dout("invalidate_pages %p gen %d revoking %d\n", inode, 1720 dout("invalidate_pages %p gen %d revoking %d\n", inode,
1633 ci->i_rdcache_gen, ci->i_rdcache_revoking); 1721 ci->i_rdcache_gen, ci->i_rdcache_revoking);
@@ -1641,7 +1729,9 @@ static void ceph_invalidate_work(struct work_struct *work)
1641 orig_gen = ci->i_rdcache_gen; 1729 orig_gen = ci->i_rdcache_gen;
1642 spin_unlock(&ci->i_ceph_lock); 1730 spin_unlock(&ci->i_ceph_lock);
1643 1731
1644 truncate_pagecache(inode, 0); 1732 if (invalidate_inode_pages2(inode->i_mapping) < 0) {
1733 pr_err("invalidate_pages %p fails\n", inode);
1734 }
1645 1735
1646 spin_lock(&ci->i_ceph_lock); 1736 spin_lock(&ci->i_ceph_lock);
1647 if (orig_gen == ci->i_rdcache_gen && 1737 if (orig_gen == ci->i_rdcache_gen &&
@@ -1920,8 +2010,7 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
1920 if ((issued & CEPH_CAP_FILE_EXCL) && 2010 if ((issued & CEPH_CAP_FILE_EXCL) &&
1921 attr->ia_size > inode->i_size) { 2011 attr->ia_size > inode->i_size) {
1922 i_size_write(inode, attr->ia_size); 2012 i_size_write(inode, attr->ia_size);
1923 inode->i_blocks = 2013 inode->i_blocks = calc_inode_blocks(attr->ia_size);
1924 (attr->ia_size + (1 << 9) - 1) >> 9;
1925 inode->i_ctime = attr->ia_ctime; 2014 inode->i_ctime = attr->ia_ctime;
1926 ci->i_reported_size = attr->ia_size; 2015 ci->i_reported_size = attr->ia_size;
1927 dirtied |= CEPH_CAP_FILE_EXCL; 2016 dirtied |= CEPH_CAP_FILE_EXCL;
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index f851d8d70158..be6b1657b1af 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -193,12 +193,12 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
193 if (copy_from_user(&dl, arg, sizeof(dl))) 193 if (copy_from_user(&dl, arg, sizeof(dl)))
194 return -EFAULT; 194 return -EFAULT;
195 195
196 down_read(&osdc->map_sem); 196 down_read(&osdc->lock);
197 r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len, 197 r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len,
198 &dl.object_no, &dl.object_offset, 198 &dl.object_no, &dl.object_offset,
199 &olen); 199 &olen);
200 if (r < 0) { 200 if (r < 0) {
201 up_read(&osdc->map_sem); 201 up_read(&osdc->lock);
202 return -EIO; 202 return -EIO;
203 } 203 }
204 dl.file_offset -= dl.object_offset; 204 dl.file_offset -= dl.object_offset;
@@ -213,15 +213,15 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
213 ceph_ino(inode), dl.object_no); 213 ceph_ino(inode), dl.object_no);
214 214
215 oloc.pool = ceph_file_layout_pg_pool(ci->i_layout); 215 oloc.pool = ceph_file_layout_pg_pool(ci->i_layout);
216 ceph_oid_set_name(&oid, dl.object_name); 216 ceph_oid_printf(&oid, "%s", dl.object_name);
217 217
218 r = ceph_oloc_oid_to_pg(osdc->osdmap, &oloc, &oid, &pgid); 218 r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid);
219 if (r < 0) { 219 if (r < 0) {
220 up_read(&osdc->map_sem); 220 up_read(&osdc->lock);
221 return r; 221 return r;
222 } 222 }
223 223
224 dl.osd = ceph_calc_pg_primary(osdc->osdmap, pgid); 224 dl.osd = ceph_pg_to_acting_primary(osdc->osdmap, &pgid);
225 if (dl.osd >= 0) { 225 if (dl.osd >= 0) {
226 struct ceph_entity_addr *a = 226 struct ceph_entity_addr *a =
227 ceph_osd_addr(osdc->osdmap, dl.osd); 227 ceph_osd_addr(osdc->osdmap, dl.osd);
@@ -230,7 +230,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
230 } else { 230 } else {
231 memset(&dl.osd_addr, 0, sizeof(dl.osd_addr)); 231 memset(&dl.osd_addr, 0, sizeof(dl.osd_addr));
232 } 232 }
233 up_read(&osdc->map_sem); 233 up_read(&osdc->lock);
234 234
235 /* send result back to user */ 235 /* send result back to user */
236 if (copy_to_user(arg, &dl, sizeof(dl))) 236 if (copy_to_user(arg, &dl, sizeof(dl)))
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 85b8517f17a0..2103b823bec0 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -181,17 +181,18 @@ static int parse_reply_info_dir(void **p, void *end,
181 181
182 ceph_decode_need(p, end, sizeof(num) + 2, bad); 182 ceph_decode_need(p, end, sizeof(num) + 2, bad);
183 num = ceph_decode_32(p); 183 num = ceph_decode_32(p);
184 info->dir_end = ceph_decode_8(p); 184 {
185 info->dir_complete = ceph_decode_8(p); 185 u16 flags = ceph_decode_16(p);
186 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
187 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
188 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
189 }
186 if (num == 0) 190 if (num == 0)
187 goto done; 191 goto done;
188 192
189 BUG_ON(!info->dir_in); 193 BUG_ON(!info->dir_entries);
190 info->dir_dname = (void *)(info->dir_in + num); 194 if ((unsigned long)(info->dir_entries + num) >
191 info->dir_dname_len = (void *)(info->dir_dname + num); 195 (unsigned long)info->dir_entries + info->dir_buf_size) {
192 info->dir_dlease = (void *)(info->dir_dname_len + num);
193 if ((unsigned long)(info->dir_dlease + num) >
194 (unsigned long)info->dir_in + info->dir_buf_size) {
195 pr_err("dir contents are larger than expected\n"); 196 pr_err("dir contents are larger than expected\n");
196 WARN_ON(1); 197 WARN_ON(1);
197 goto bad; 198 goto bad;
@@ -199,21 +200,23 @@ static int parse_reply_info_dir(void **p, void *end,
199 200
200 info->dir_nr = num; 201 info->dir_nr = num;
201 while (num) { 202 while (num) {
203 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
202 /* dentry */ 204 /* dentry */
203 ceph_decode_need(p, end, sizeof(u32)*2, bad); 205 ceph_decode_need(p, end, sizeof(u32)*2, bad);
204 info->dir_dname_len[i] = ceph_decode_32(p); 206 rde->name_len = ceph_decode_32(p);
205 ceph_decode_need(p, end, info->dir_dname_len[i], bad); 207 ceph_decode_need(p, end, rde->name_len, bad);
206 info->dir_dname[i] = *p; 208 rde->name = *p;
207 *p += info->dir_dname_len[i]; 209 *p += rde->name_len;
208 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i], 210 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
209 info->dir_dname[i]); 211 rde->lease = *p;
210 info->dir_dlease[i] = *p;
211 *p += sizeof(struct ceph_mds_reply_lease); 212 *p += sizeof(struct ceph_mds_reply_lease);
212 213
213 /* inode */ 214 /* inode */
214 err = parse_reply_info_in(p, end, &info->dir_in[i], features); 215 err = parse_reply_info_in(p, end, &rde->inode, features);
215 if (err < 0) 216 if (err < 0)
216 goto out_bad; 217 goto out_bad;
218 /* ceph_readdir_prepopulate() will update it */
219 rde->offset = 0;
217 i++; 220 i++;
218 num--; 221 num--;
219 } 222 }
@@ -345,9 +348,9 @@ out_bad:
345 348
346static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 349static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
347{ 350{
348 if (!info->dir_in) 351 if (!info->dir_entries)
349 return; 352 return;
350 free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size)); 353 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
351} 354}
352 355
353 356
@@ -567,51 +570,23 @@ void ceph_mdsc_release_request(struct kref *kref)
567 kfree(req); 570 kfree(req);
568} 571}
569 572
573DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
574
570/* 575/*
571 * lookup session, bump ref if found. 576 * lookup session, bump ref if found.
572 * 577 *
573 * called under mdsc->mutex. 578 * called under mdsc->mutex.
574 */ 579 */
575static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc, 580static struct ceph_mds_request *
576 u64 tid) 581lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
577{ 582{
578 struct ceph_mds_request *req; 583 struct ceph_mds_request *req;
579 struct rb_node *n = mdsc->request_tree.rb_node;
580
581 while (n) {
582 req = rb_entry(n, struct ceph_mds_request, r_node);
583 if (tid < req->r_tid)
584 n = n->rb_left;
585 else if (tid > req->r_tid)
586 n = n->rb_right;
587 else {
588 ceph_mdsc_get_request(req);
589 return req;
590 }
591 }
592 return NULL;
593}
594 584
595static void __insert_request(struct ceph_mds_client *mdsc, 585 req = lookup_request(&mdsc->request_tree, tid);
596 struct ceph_mds_request *new) 586 if (req)
597{ 587 ceph_mdsc_get_request(req);
598 struct rb_node **p = &mdsc->request_tree.rb_node;
599 struct rb_node *parent = NULL;
600 struct ceph_mds_request *req = NULL;
601 588
602 while (*p) { 589 return req;
603 parent = *p;
604 req = rb_entry(parent, struct ceph_mds_request, r_node);
605 if (new->r_tid < req->r_tid)
606 p = &(*p)->rb_left;
607 else if (new->r_tid > req->r_tid)
608 p = &(*p)->rb_right;
609 else
610 BUG();
611 }
612
613 rb_link_node(&new->r_node, parent, p);
614 rb_insert_color(&new->r_node, &mdsc->request_tree);
615} 590}
616 591
617/* 592/*
@@ -630,7 +605,7 @@ static void __register_request(struct ceph_mds_client *mdsc,
630 req->r_num_caps); 605 req->r_num_caps);
631 dout("__register_request %p tid %lld\n", req, req->r_tid); 606 dout("__register_request %p tid %lld\n", req, req->r_tid);
632 ceph_mdsc_get_request(req); 607 ceph_mdsc_get_request(req);
633 __insert_request(mdsc, req); 608 insert_request(&mdsc->request_tree, req);
634 609
635 req->r_uid = current_fsuid(); 610 req->r_uid = current_fsuid();
636 req->r_gid = current_fsgid(); 611 req->r_gid = current_fsgid();
@@ -663,8 +638,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
663 } 638 }
664 } 639 }
665 640
666 rb_erase(&req->r_node, &mdsc->request_tree); 641 erase_request(&mdsc->request_tree, req);
667 RB_CLEAR_NODE(&req->r_node);
668 642
669 if (req->r_unsafe_dir && req->r_got_unsafe) { 643 if (req->r_unsafe_dir && req->r_got_unsafe) {
670 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 644 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
@@ -868,12 +842,14 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
868 int metadata_bytes = 0; 842 int metadata_bytes = 0;
869 int metadata_key_count = 0; 843 int metadata_key_count = 0;
870 struct ceph_options *opt = mdsc->fsc->client->options; 844 struct ceph_options *opt = mdsc->fsc->client->options;
845 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
871 void *p; 846 void *p;
872 847
873 const char* metadata[][2] = { 848 const char* metadata[][2] = {
874 {"hostname", utsname()->nodename}, 849 {"hostname", utsname()->nodename},
875 {"kernel_version", utsname()->release}, 850 {"kernel_version", utsname()->release},
876 {"entity_id", opt->name ? opt->name : ""}, 851 {"entity_id", opt->name ? : ""},
852 {"root", fsopt->server_path ? : "/"},
877 {NULL, NULL} 853 {NULL, NULL}
878 }; 854 };
879 855
@@ -1149,9 +1125,11 @@ out:
1149static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1125static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1150 void *arg) 1126 void *arg)
1151{ 1127{
1128 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1152 struct ceph_inode_info *ci = ceph_inode(inode); 1129 struct ceph_inode_info *ci = ceph_inode(inode);
1153 LIST_HEAD(to_remove); 1130 LIST_HEAD(to_remove);
1154 int drop = 0; 1131 bool drop = false;
1132 bool invalidate = false;
1155 1133
1156 dout("removing cap %p, ci is %p, inode is %p\n", 1134 dout("removing cap %p, ci is %p, inode is %p\n",
1157 cap, ci, &ci->vfs_inode); 1135 cap, ci, &ci->vfs_inode);
@@ -1159,8 +1137,13 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1159 __ceph_remove_cap(cap, false); 1137 __ceph_remove_cap(cap, false);
1160 if (!ci->i_auth_cap) { 1138 if (!ci->i_auth_cap) {
1161 struct ceph_cap_flush *cf; 1139 struct ceph_cap_flush *cf;
1162 struct ceph_mds_client *mdsc = 1140 struct ceph_mds_client *mdsc = fsc->mdsc;
1163 ceph_sb_to_client(inode->i_sb)->mdsc; 1141
1142 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1143
1144 if (ci->i_wrbuffer_ref > 0 &&
1145 ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
1146 invalidate = true;
1164 1147
1165 while (true) { 1148 while (true) {
1166 struct rb_node *n = rb_first(&ci->i_cap_flush_tree); 1149 struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
@@ -1183,7 +1166,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1183 inode, ceph_ino(inode)); 1166 inode, ceph_ino(inode));
1184 ci->i_dirty_caps = 0; 1167 ci->i_dirty_caps = 0;
1185 list_del_init(&ci->i_dirty_item); 1168 list_del_init(&ci->i_dirty_item);
1186 drop = 1; 1169 drop = true;
1187 } 1170 }
1188 if (!list_empty(&ci->i_flushing_item)) { 1171 if (!list_empty(&ci->i_flushing_item)) {
1189 pr_warn_ratelimited( 1172 pr_warn_ratelimited(
@@ -1193,7 +1176,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1193 ci->i_flushing_caps = 0; 1176 ci->i_flushing_caps = 0;
1194 list_del_init(&ci->i_flushing_item); 1177 list_del_init(&ci->i_flushing_item);
1195 mdsc->num_cap_flushing--; 1178 mdsc->num_cap_flushing--;
1196 drop = 1; 1179 drop = true;
1197 } 1180 }
1198 spin_unlock(&mdsc->cap_dirty_lock); 1181 spin_unlock(&mdsc->cap_dirty_lock);
1199 1182
@@ -1210,7 +1193,11 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1210 list_del(&cf->list); 1193 list_del(&cf->list);
1211 ceph_free_cap_flush(cf); 1194 ceph_free_cap_flush(cf);
1212 } 1195 }
1213 while (drop--) 1196
1197 wake_up_all(&ci->i_cap_wq);
1198 if (invalidate)
1199 ceph_queue_invalidate(inode);
1200 if (drop)
1214 iput(inode); 1201 iput(inode);
1215 return 0; 1202 return 0;
1216} 1203}
@@ -1220,12 +1207,13 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1220 */ 1207 */
1221static void remove_session_caps(struct ceph_mds_session *session) 1208static void remove_session_caps(struct ceph_mds_session *session)
1222{ 1209{
1210 struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1211 struct super_block *sb = fsc->sb;
1223 dout("remove_session_caps on %p\n", session); 1212 dout("remove_session_caps on %p\n", session);
1224 iterate_session_caps(session, remove_session_caps_cb, NULL); 1213 iterate_session_caps(session, remove_session_caps_cb, fsc);
1225 1214
1226 spin_lock(&session->s_cap_lock); 1215 spin_lock(&session->s_cap_lock);
1227 if (session->s_nr_caps > 0) { 1216 if (session->s_nr_caps > 0) {
1228 struct super_block *sb = session->s_mdsc->fsc->sb;
1229 struct inode *inode; 1217 struct inode *inode;
1230 struct ceph_cap *cap, *prev = NULL; 1218 struct ceph_cap *cap, *prev = NULL;
1231 struct ceph_vino vino; 1219 struct ceph_vino vino;
@@ -1270,13 +1258,13 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1270{ 1258{
1271 struct ceph_inode_info *ci = ceph_inode(inode); 1259 struct ceph_inode_info *ci = ceph_inode(inode);
1272 1260
1273 wake_up_all(&ci->i_cap_wq);
1274 if (arg) { 1261 if (arg) {
1275 spin_lock(&ci->i_ceph_lock); 1262 spin_lock(&ci->i_ceph_lock);
1276 ci->i_wanted_max_size = 0; 1263 ci->i_wanted_max_size = 0;
1277 ci->i_requested_max_size = 0; 1264 ci->i_requested_max_size = 0;
1278 spin_unlock(&ci->i_ceph_lock); 1265 spin_unlock(&ci->i_ceph_lock);
1279 } 1266 }
1267 wake_up_all(&ci->i_cap_wq);
1280 return 0; 1268 return 0;
1281} 1269}
1282 1270
@@ -1671,8 +1659,7 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
1671 struct ceph_inode_info *ci = ceph_inode(dir); 1659 struct ceph_inode_info *ci = ceph_inode(dir);
1672 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 1660 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1673 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 1661 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
1674 size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) + 1662 size_t size = sizeof(struct ceph_mds_reply_dir_entry);
1675 sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease);
1676 int order, num_entries; 1663 int order, num_entries;
1677 1664
1678 spin_lock(&ci->i_ceph_lock); 1665 spin_lock(&ci->i_ceph_lock);
@@ -1683,14 +1670,14 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
1683 1670
1684 order = get_order(size * num_entries); 1671 order = get_order(size * num_entries);
1685 while (order >= 0) { 1672 while (order >= 0) {
1686 rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL | 1673 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
1687 __GFP_NOWARN, 1674 __GFP_NOWARN,
1688 order); 1675 order);
1689 if (rinfo->dir_in) 1676 if (rinfo->dir_entries)
1690 break; 1677 break;
1691 order--; 1678 order--;
1692 } 1679 }
1693 if (!rinfo->dir_in) 1680 if (!rinfo->dir_entries)
1694 return -ENOMEM; 1681 return -ENOMEM;
1695 1682
1696 num_entries = (PAGE_SIZE << order) / size; 1683 num_entries = (PAGE_SIZE << order) / size;
@@ -1722,6 +1709,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1722 INIT_LIST_HEAD(&req->r_unsafe_target_item); 1709 INIT_LIST_HEAD(&req->r_unsafe_target_item);
1723 req->r_fmode = -1; 1710 req->r_fmode = -1;
1724 kref_init(&req->r_kref); 1711 kref_init(&req->r_kref);
1712 RB_CLEAR_NODE(&req->r_node);
1725 INIT_LIST_HEAD(&req->r_wait); 1713 INIT_LIST_HEAD(&req->r_wait);
1726 init_completion(&req->r_completion); 1714 init_completion(&req->r_completion);
1727 init_completion(&req->r_safe_completion); 1715 init_completion(&req->r_safe_completion);
@@ -2414,7 +2402,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2414 /* get request, session */ 2402 /* get request, session */
2415 tid = le64_to_cpu(msg->hdr.tid); 2403 tid = le64_to_cpu(msg->hdr.tid);
2416 mutex_lock(&mdsc->mutex); 2404 mutex_lock(&mdsc->mutex);
2417 req = __lookup_request(mdsc, tid); 2405 req = lookup_get_request(mdsc, tid);
2418 if (!req) { 2406 if (!req) {
2419 dout("handle_reply on unknown tid %llu\n", tid); 2407 dout("handle_reply on unknown tid %llu\n", tid);
2420 mutex_unlock(&mdsc->mutex); 2408 mutex_unlock(&mdsc->mutex);
@@ -2604,7 +2592,7 @@ static void handle_forward(struct ceph_mds_client *mdsc,
2604 fwd_seq = ceph_decode_32(&p); 2592 fwd_seq = ceph_decode_32(&p);
2605 2593
2606 mutex_lock(&mdsc->mutex); 2594 mutex_lock(&mdsc->mutex);
2607 req = __lookup_request(mdsc, tid); 2595 req = lookup_get_request(mdsc, tid);
2608 if (!req) { 2596 if (!req) {
2609 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 2597 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2610 goto out; /* dup reply? */ 2598 goto out; /* dup reply? */
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index ee69a537dba5..e7d38aac7109 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -47,6 +47,14 @@ struct ceph_mds_reply_info_in {
47 u32 pool_ns_len; 47 u32 pool_ns_len;
48}; 48};
49 49
50struct ceph_mds_reply_dir_entry {
51 char *name;
52 u32 name_len;
53 struct ceph_mds_reply_lease *lease;
54 struct ceph_mds_reply_info_in inode;
55 loff_t offset;
56};
57
50/* 58/*
51 * parsed info about an mds reply, including information about 59 * parsed info about an mds reply, including information about
52 * either: 1) the target inode and/or its parent directory and dentry, 60 * either: 1) the target inode and/or its parent directory and dentry,
@@ -73,11 +81,10 @@ struct ceph_mds_reply_info_parsed {
73 struct ceph_mds_reply_dirfrag *dir_dir; 81 struct ceph_mds_reply_dirfrag *dir_dir;
74 size_t dir_buf_size; 82 size_t dir_buf_size;
75 int dir_nr; 83 int dir_nr;
76 char **dir_dname; 84 bool dir_complete;
77 u32 *dir_dname_len; 85 bool dir_end;
78 struct ceph_mds_reply_lease **dir_dlease; 86 bool hash_order;
79 struct ceph_mds_reply_info_in *dir_in; 87 struct ceph_mds_reply_dir_entry *dir_entries;
80 u8 dir_complete, dir_end;
81 }; 88 };
82 89
83 /* for create results */ 90 /* for create results */
diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c
index 261531e55e9d..8c3591a7fbae 100644
--- a/fs/ceph/mdsmap.c
+++ b/fs/ceph/mdsmap.c
@@ -54,16 +54,21 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
54 const void *start = *p; 54 const void *start = *p;
55 int i, j, n; 55 int i, j, n;
56 int err = -EINVAL; 56 int err = -EINVAL;
57 u16 version; 57 u8 mdsmap_v, mdsmap_cv;
58 58
59 m = kzalloc(sizeof(*m), GFP_NOFS); 59 m = kzalloc(sizeof(*m), GFP_NOFS);
60 if (m == NULL) 60 if (m == NULL)
61 return ERR_PTR(-ENOMEM); 61 return ERR_PTR(-ENOMEM);
62 62
63 ceph_decode_16_safe(p, end, version, bad); 63 ceph_decode_need(p, end, 1 + 1, bad);
64 if (version > 3) { 64 mdsmap_v = ceph_decode_8(p);
65 pr_warn("got mdsmap version %d > 3, failing", version); 65 mdsmap_cv = ceph_decode_8(p);
66 goto bad; 66 if (mdsmap_v >= 4) {
67 u32 mdsmap_len;
68 ceph_decode_32_safe(p, end, mdsmap_len, bad);
69 if (end < *p + mdsmap_len)
70 goto bad;
71 end = *p + mdsmap_len;
67 } 72 }
68 73
69 ceph_decode_need(p, end, 8*sizeof(u32) + sizeof(u64), bad); 74 ceph_decode_need(p, end, 8*sizeof(u32) + sizeof(u64), bad);
@@ -87,16 +92,29 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
87 u32 namelen; 92 u32 namelen;
88 s32 mds, inc, state; 93 s32 mds, inc, state;
89 u64 state_seq; 94 u64 state_seq;
90 u8 infoversion; 95 u8 info_v;
96 void *info_end = NULL;
91 struct ceph_entity_addr addr; 97 struct ceph_entity_addr addr;
92 u32 num_export_targets; 98 u32 num_export_targets;
93 void *pexport_targets = NULL; 99 void *pexport_targets = NULL;
94 struct ceph_timespec laggy_since; 100 struct ceph_timespec laggy_since;
95 struct ceph_mds_info *info; 101 struct ceph_mds_info *info;
96 102
97 ceph_decode_need(p, end, sizeof(u64)*2 + 1 + sizeof(u32), bad); 103 ceph_decode_need(p, end, sizeof(u64) + 1, bad);
98 global_id = ceph_decode_64(p); 104 global_id = ceph_decode_64(p);
99 infoversion = ceph_decode_8(p); 105 info_v= ceph_decode_8(p);
106 if (info_v >= 4) {
107 u32 info_len;
108 u8 info_cv;
109 ceph_decode_need(p, end, 1 + sizeof(u32), bad);
110 info_cv = ceph_decode_8(p);
111 info_len = ceph_decode_32(p);
112 info_end = *p + info_len;
113 if (info_end > end)
114 goto bad;
115 }
116
117 ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad);
100 *p += sizeof(u64); 118 *p += sizeof(u64);
101 namelen = ceph_decode_32(p); /* skip mds name */ 119 namelen = ceph_decode_32(p); /* skip mds name */
102 *p += namelen; 120 *p += namelen;
@@ -115,7 +133,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
115 *p += sizeof(u32); 133 *p += sizeof(u32);
116 ceph_decode_32_safe(p, end, namelen, bad); 134 ceph_decode_32_safe(p, end, namelen, bad);
117 *p += namelen; 135 *p += namelen;
118 if (infoversion >= 2) { 136 if (info_v >= 2) {
119 ceph_decode_32_safe(p, end, num_export_targets, bad); 137 ceph_decode_32_safe(p, end, num_export_targets, bad);
120 pexport_targets = *p; 138 pexport_targets = *p;
121 *p += num_export_targets * sizeof(u32); 139 *p += num_export_targets * sizeof(u32);
@@ -123,6 +141,12 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
123 num_export_targets = 0; 141 num_export_targets = 0;
124 } 142 }
125 143
144 if (info_end && *p != info_end) {
145 if (*p > info_end)
146 goto bad;
147 *p = info_end;
148 }
149
126 dout("mdsmap_decode %d/%d %lld mds%d.%d %s %s\n", 150 dout("mdsmap_decode %d/%d %lld mds%d.%d %s %s\n",
127 i+1, n, global_id, mds, inc, 151 i+1, n, global_id, mds, inc,
128 ceph_pr_addr(&addr.in_addr), 152 ceph_pr_addr(&addr.in_addr),
@@ -163,6 +187,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
163 m->m_cas_pg_pool = ceph_decode_64(p); 187 m->m_cas_pg_pool = ceph_decode_64(p);
164 188
165 /* ok, we don't care about the rest. */ 189 /* ok, we don't care about the rest. */
190 *p = end;
166 dout("mdsmap_decode success epoch %u\n", m->m_epoch); 191 dout("mdsmap_decode success epoch %u\n", m->m_epoch);
167 return m; 192 return m;
168 193
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index f12d5e2955c2..91e02481ce06 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -108,6 +108,7 @@ static int ceph_sync_fs(struct super_block *sb, int wait)
108 * mount options 108 * mount options
109 */ 109 */
110enum { 110enum {
111 Opt_mds_namespace,
111 Opt_wsize, 112 Opt_wsize,
112 Opt_rsize, 113 Opt_rsize,
113 Opt_rasize, 114 Opt_rasize,
@@ -143,6 +144,7 @@ enum {
143}; 144};
144 145
145static match_table_t fsopt_tokens = { 146static match_table_t fsopt_tokens = {
147 {Opt_mds_namespace, "mds_namespace=%d"},
146 {Opt_wsize, "wsize=%d"}, 148 {Opt_wsize, "wsize=%d"},
147 {Opt_rsize, "rsize=%d"}, 149 {Opt_rsize, "rsize=%d"},
148 {Opt_rasize, "rasize=%d"}, 150 {Opt_rasize, "rasize=%d"},
@@ -212,6 +214,9 @@ static int parse_fsopt_token(char *c, void *private)
212 break; 214 break;
213 215
214 /* misc */ 216 /* misc */
217 case Opt_mds_namespace:
218 fsopt->mds_namespace = intval;
219 break;
215 case Opt_wsize: 220 case Opt_wsize:
216 fsopt->wsize = intval; 221 fsopt->wsize = intval;
217 break; 222 break;
@@ -297,6 +302,7 @@ static void destroy_mount_options(struct ceph_mount_options *args)
297{ 302{
298 dout("destroy_mount_options %p\n", args); 303 dout("destroy_mount_options %p\n", args);
299 kfree(args->snapdir_name); 304 kfree(args->snapdir_name);
305 kfree(args->server_path);
300 kfree(args); 306 kfree(args);
301} 307}
302 308
@@ -328,14 +334,17 @@ static int compare_mount_options(struct ceph_mount_options *new_fsopt,
328 if (ret) 334 if (ret)
329 return ret; 335 return ret;
330 336
337 ret = strcmp_null(fsopt1->server_path, fsopt2->server_path);
338 if (ret)
339 return ret;
340
331 return ceph_compare_options(new_opt, fsc->client); 341 return ceph_compare_options(new_opt, fsc->client);
332} 342}
333 343
334static int parse_mount_options(struct ceph_mount_options **pfsopt, 344static int parse_mount_options(struct ceph_mount_options **pfsopt,
335 struct ceph_options **popt, 345 struct ceph_options **popt,
336 int flags, char *options, 346 int flags, char *options,
337 const char *dev_name, 347 const char *dev_name)
338 const char **path)
339{ 348{
340 struct ceph_mount_options *fsopt; 349 struct ceph_mount_options *fsopt;
341 const char *dev_name_end; 350 const char *dev_name_end;
@@ -367,6 +376,7 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
367 fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT; 376 fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
368 fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT; 377 fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
369 fsopt->congestion_kb = default_congestion_kb(); 378 fsopt->congestion_kb = default_congestion_kb();
379 fsopt->mds_namespace = CEPH_FS_CLUSTER_ID_NONE;
370 380
371 /* 381 /*
372 * Distinguish the server list from the path in "dev_name". 382 * Distinguish the server list from the path in "dev_name".
@@ -380,12 +390,13 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
380 */ 390 */
381 dev_name_end = strchr(dev_name, '/'); 391 dev_name_end = strchr(dev_name, '/');
382 if (dev_name_end) { 392 if (dev_name_end) {
383 /* skip over leading '/' for path */ 393 fsopt->server_path = kstrdup(dev_name_end, GFP_KERNEL);
384 *path = dev_name_end + 1; 394 if (!fsopt->server_path) {
395 err = -ENOMEM;
396 goto out;
397 }
385 } else { 398 } else {
386 /* path is empty */
387 dev_name_end = dev_name + strlen(dev_name); 399 dev_name_end = dev_name + strlen(dev_name);
388 *path = dev_name_end;
389 } 400 }
390 err = -EINVAL; 401 err = -EINVAL;
391 dev_name_end--; /* back up to ':' separator */ 402 dev_name_end--; /* back up to ':' separator */
@@ -395,7 +406,8 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
395 goto out; 406 goto out;
396 } 407 }
397 dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name); 408 dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name);
398 dout("server path '%s'\n", *path); 409 if (fsopt->server_path)
410 dout("server path '%s'\n", fsopt->server_path);
399 411
400 *popt = ceph_parse_options(options, dev_name, dev_name_end, 412 *popt = ceph_parse_options(options, dev_name, dev_name_end,
401 parse_fsopt_token, (void *)fsopt); 413 parse_fsopt_token, (void *)fsopt);
@@ -457,6 +469,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
457 seq_puts(m, ",noacl"); 469 seq_puts(m, ",noacl");
458#endif 470#endif
459 471
472 if (fsopt->mds_namespace != CEPH_FS_CLUSTER_ID_NONE)
473 seq_printf(m, ",mds_namespace=%d", fsopt->mds_namespace);
460 if (fsopt->wsize) 474 if (fsopt->wsize)
461 seq_printf(m, ",wsize=%d", fsopt->wsize); 475 seq_printf(m, ",wsize=%d", fsopt->wsize);
462 if (fsopt->rsize != CEPH_RSIZE_DEFAULT) 476 if (fsopt->rsize != CEPH_RSIZE_DEFAULT)
@@ -511,9 +525,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
511{ 525{
512 struct ceph_fs_client *fsc; 526 struct ceph_fs_client *fsc;
513 const u64 supported_features = 527 const u64 supported_features =
514 CEPH_FEATURE_FLOCK | 528 CEPH_FEATURE_FLOCK | CEPH_FEATURE_DIRLAYOUTHASH |
515 CEPH_FEATURE_DIRLAYOUTHASH | 529 CEPH_FEATURE_MDSENC | CEPH_FEATURE_MDS_INLINE_DATA;
516 CEPH_FEATURE_MDS_INLINE_DATA;
517 const u64 required_features = 0; 530 const u64 required_features = 0;
518 int page_count; 531 int page_count;
519 size_t size; 532 size_t size;
@@ -530,6 +543,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
530 goto fail; 543 goto fail;
531 } 544 }
532 fsc->client->extra_mon_dispatch = extra_mon_dispatch; 545 fsc->client->extra_mon_dispatch = extra_mon_dispatch;
546 fsc->client->monc.fs_cluster_id = fsopt->mds_namespace;
533 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true); 547 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true);
534 548
535 fsc->mount_options = fsopt; 549 fsc->mount_options = fsopt;
@@ -785,8 +799,7 @@ out:
785/* 799/*
786 * mount: join the ceph cluster, and open root directory. 800 * mount: join the ceph cluster, and open root directory.
787 */ 801 */
788static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc, 802static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc)
789 const char *path)
790{ 803{
791 int err; 804 int err;
792 unsigned long started = jiffies; /* note the start time */ 805 unsigned long started = jiffies; /* note the start time */
@@ -815,11 +828,12 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
815 goto fail; 828 goto fail;
816 } 829 }
817 830
818 if (path[0] == 0) { 831 if (!fsc->mount_options->server_path) {
819 root = fsc->sb->s_root; 832 root = fsc->sb->s_root;
820 dget(root); 833 dget(root);
821 } else { 834 } else {
822 dout("mount opening base mountpoint\n"); 835 const char *path = fsc->mount_options->server_path + 1;
836 dout("mount opening path %s\n", path);
823 root = open_root_dentry(fsc, path, started); 837 root = open_root_dentry(fsc, path, started);
824 if (IS_ERR(root)) { 838 if (IS_ERR(root)) {
825 err = PTR_ERR(root); 839 err = PTR_ERR(root);
@@ -935,7 +949,6 @@ static struct dentry *ceph_mount(struct file_system_type *fs_type,
935 struct dentry *res; 949 struct dentry *res;
936 int err; 950 int err;
937 int (*compare_super)(struct super_block *, void *) = ceph_compare_super; 951 int (*compare_super)(struct super_block *, void *) = ceph_compare_super;
938 const char *path = NULL;
939 struct ceph_mount_options *fsopt = NULL; 952 struct ceph_mount_options *fsopt = NULL;
940 struct ceph_options *opt = NULL; 953 struct ceph_options *opt = NULL;
941 954
@@ -944,7 +957,7 @@ static struct dentry *ceph_mount(struct file_system_type *fs_type,
944#ifdef CONFIG_CEPH_FS_POSIX_ACL 957#ifdef CONFIG_CEPH_FS_POSIX_ACL
945 flags |= MS_POSIXACL; 958 flags |= MS_POSIXACL;
946#endif 959#endif
947 err = parse_mount_options(&fsopt, &opt, flags, data, dev_name, &path); 960 err = parse_mount_options(&fsopt, &opt, flags, data, dev_name);
948 if (err < 0) { 961 if (err < 0) {
949 res = ERR_PTR(err); 962 res = ERR_PTR(err);
950 goto out_final; 963 goto out_final;
@@ -987,7 +1000,7 @@ static struct dentry *ceph_mount(struct file_system_type *fs_type,
987 } 1000 }
988 } 1001 }
989 1002
990 res = ceph_real_mount(fsc, path); 1003 res = ceph_real_mount(fsc);
991 if (IS_ERR(res)) 1004 if (IS_ERR(res))
992 goto out_splat; 1005 goto out_splat;
993 dout("root %p inode %p ino %llx.%llx\n", res, 1006 dout("root %p inode %p ino %llx.%llx\n", res,
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 7b99eb756477..0130a8592191 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -62,6 +62,7 @@ struct ceph_mount_options {
62 int cap_release_safety; 62 int cap_release_safety;
63 int max_readdir; /* max readdir result (entires) */ 63 int max_readdir; /* max readdir result (entires) */
64 int max_readdir_bytes; /* max readdir result (bytes) */ 64 int max_readdir_bytes; /* max readdir result (bytes) */
65 int mds_namespace;
65 66
66 /* 67 /*
67 * everything above this point can be memcmp'd; everything below 68 * everything above this point can be memcmp'd; everything below
@@ -69,6 +70,7 @@ struct ceph_mount_options {
69 */ 70 */
70 71
71 char *snapdir_name; /* default ".snap" */ 72 char *snapdir_name; /* default ".snap" */
73 char *server_path; /* default "/" */
72}; 74};
73 75
74struct ceph_fs_client { 76struct ceph_fs_client {
@@ -295,6 +297,7 @@ struct ceph_inode_info {
295 u64 i_files, i_subdirs; 297 u64 i_files, i_subdirs;
296 298
297 struct rb_root i_fragtree; 299 struct rb_root i_fragtree;
300 int i_fragtree_nsplits;
298 struct mutex i_fragtree_mutex; 301 struct mutex i_fragtree_mutex;
299 302
300 struct ceph_inode_xattrs_info i_xattrs; 303 struct ceph_inode_xattrs_info i_xattrs;
@@ -469,6 +472,7 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
469#define CEPH_I_POOL_RD (1 << 5) /* can read from pool */ 472#define CEPH_I_POOL_RD (1 << 5) /* can read from pool */
470#define CEPH_I_POOL_WR (1 << 6) /* can write to pool */ 473#define CEPH_I_POOL_WR (1 << 6) /* can write to pool */
471#define CEPH_I_SEC_INITED (1 << 7) /* security initialized */ 474#define CEPH_I_SEC_INITED (1 << 7) /* security initialized */
475#define CEPH_I_CAP_DROPPED (1 << 8) /* caps were forcibly dropped */
472 476
473static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, 477static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
474 long long release_count, 478 long long release_count,
@@ -537,11 +541,6 @@ static inline struct ceph_dentry_info *ceph_dentry(struct dentry *dentry)
537 return (struct ceph_dentry_info *)dentry->d_fsdata; 541 return (struct ceph_dentry_info *)dentry->d_fsdata;
538} 542}
539 543
540static inline loff_t ceph_make_fpos(unsigned frag, unsigned off)
541{
542 return ((loff_t)frag << 32) | (loff_t)off;
543}
544
545/* 544/*
546 * caps helpers 545 * caps helpers
547 */ 546 */
@@ -632,7 +631,6 @@ struct ceph_file_info {
632 struct ceph_mds_request *last_readdir; 631 struct ceph_mds_request *last_readdir;
633 632
634 /* readdir: position within a frag */ 633 /* readdir: position within a frag */
635 unsigned offset; /* offset of last chunk, adjusted for . and .. */
636 unsigned next_offset; /* offset of next chunk (last_name's + 1) */ 634 unsigned next_offset; /* offset of next chunk (last_name's + 1) */
637 char *last_name; /* last entry in previous chunk */ 635 char *last_name; /* last entry in previous chunk */
638 long long dir_release_count; 636 long long dir_release_count;
@@ -927,6 +925,7 @@ extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc);
927/* file.c */ 925/* file.c */
928extern const struct file_operations ceph_file_fops; 926extern const struct file_operations ceph_file_fops;
929 927
928extern int ceph_renew_caps(struct inode *inode);
930extern int ceph_open(struct inode *inode, struct file *file); 929extern int ceph_open(struct inode *inode, struct file *file);
931extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry, 930extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
932 struct file *file, unsigned flags, umode_t mode, 931 struct file *file, unsigned flags, umode_t mode,
@@ -942,6 +941,7 @@ extern const struct inode_operations ceph_snapdir_iops;
942extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops, 941extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
943 ceph_snapdir_dentry_ops; 942 ceph_snapdir_dentry_ops;
944 943
944extern loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order);
945extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry); 945extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry);
946extern int ceph_handle_snapdir(struct ceph_mds_request *req, 946extern int ceph_handle_snapdir(struct ceph_mds_request *req,
947 struct dentry *dentry, int err); 947 struct dentry *dentry, int err);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 0d66722c6a52..dacc1bd85629 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -77,7 +77,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
77 char buf[128]; 77 char buf[128];
78 78
79 dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode); 79 dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode);
80 down_read(&osdc->map_sem); 80 down_read(&osdc->lock);
81 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool); 81 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
82 if (pool_name) { 82 if (pool_name) {
83 size_t len = strlen(pool_name); 83 size_t len = strlen(pool_name);
@@ -109,7 +109,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
109 ret = -ERANGE; 109 ret = -ERANGE;
110 } 110 }
111 } 111 }
112 up_read(&osdc->map_sem); 112 up_read(&osdc->lock);
113 return ret; 113 return ret;
114} 114}
115 115
@@ -143,13 +143,13 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
143 s64 pool = ceph_file_layout_pg_pool(ci->i_layout); 143 s64 pool = ceph_file_layout_pg_pool(ci->i_layout);
144 const char *pool_name; 144 const char *pool_name;
145 145
146 down_read(&osdc->map_sem); 146 down_read(&osdc->lock);
147 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool); 147 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
148 if (pool_name) 148 if (pool_name)
149 ret = snprintf(val, size, "%s", pool_name); 149 ret = snprintf(val, size, "%s", pool_name);
150 else 150 else
151 ret = snprintf(val, size, "%lld", (unsigned long long)pool); 151 ret = snprintf(val, size, "%lld", (unsigned long long)pool);
152 up_read(&osdc->map_sem); 152 up_read(&osdc->lock);
153 return ret; 153 return ret;
154} 154}
155 155
@@ -862,6 +862,7 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name,
862 struct ceph_mds_request *req; 862 struct ceph_mds_request *req;
863 struct ceph_mds_client *mdsc = fsc->mdsc; 863 struct ceph_mds_client *mdsc = fsc->mdsc;
864 struct ceph_pagelist *pagelist = NULL; 864 struct ceph_pagelist *pagelist = NULL;
865 int op = CEPH_MDS_OP_SETXATTR;
865 int err; 866 int err;
866 867
867 if (size > 0) { 868 if (size > 0) {
@@ -875,20 +876,21 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name,
875 if (err) 876 if (err)
876 goto out; 877 goto out;
877 } else if (!value) { 878 } else if (!value) {
878 flags |= CEPH_XATTR_REMOVE; 879 if (flags & CEPH_XATTR_REPLACE)
880 op = CEPH_MDS_OP_RMXATTR;
881 else
882 flags |= CEPH_XATTR_REMOVE;
879 } 883 }
880 884
881 dout("setxattr value=%.*s\n", (int)size, value); 885 dout("setxattr value=%.*s\n", (int)size, value);
882 886
883 /* do request */ 887 /* do request */
884 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETXATTR, 888 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
885 USE_AUTH_MDS);
886 if (IS_ERR(req)) { 889 if (IS_ERR(req)) {
887 err = PTR_ERR(req); 890 err = PTR_ERR(req);
888 goto out; 891 goto out;
889 } 892 }
890 893
891 req->r_args.setxattr.flags = cpu_to_le32(flags);
892 req->r_path2 = kstrdup(name, GFP_NOFS); 894 req->r_path2 = kstrdup(name, GFP_NOFS);
893 if (!req->r_path2) { 895 if (!req->r_path2) {
894 ceph_mdsc_put_request(req); 896 ceph_mdsc_put_request(req);
@@ -896,8 +898,11 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name,
896 goto out; 898 goto out;
897 } 899 }
898 900
899 req->r_pagelist = pagelist; 901 if (op == CEPH_MDS_OP_SETXATTR) {
900 pagelist = NULL; 902 req->r_args.setxattr.flags = cpu_to_le32(flags);
903 req->r_pagelist = pagelist;
904 pagelist = NULL;
905 }
901 906
902 req->r_inode = inode; 907 req->r_inode = inode;
903 ihold(inode); 908 ihold(inode);