summaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-12-17 19:03:12 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2014-12-17 19:03:12 -0500
commit57666509b70030a9483d13222bfec8eec5db07df (patch)
tree1e0021c2aabc2ce8832e8c816e2aa94b0b77a323 /fs
parent87c31b39abcb6fb6bd7d111200c9627a594bf6a9 (diff)
parent0aeff37abada9f8c08d2b10481a43d3ae406c823 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull ceph updates from Sage Weil: "The big item here is support for inline data for CephFS and for message signatures from Zheng. There are also several bug fixes, including interrupted flock request handling, 0-length xattrs, mksnap, cached readdir results, and a message version compat field. Finally there are several cleanups from Ilya, Dan, and Markus. Note that there is another series coming soon that fixes some bugs in the RBD 'lingering' requests, but it isn't quite ready yet" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (27 commits) ceph: fix setting empty extended attribute ceph: fix mksnap crash ceph: do_sync is never initialized libceph: fixup includes in pagelist.h ceph: support inline data feature ceph: flush inline version ceph: convert inline data to normal data before data write ceph: sync read inline data ceph: fetch inline data when getting Fcr cap refs ceph: use getattr request to fetch inline data ceph: add inline data to pagecache ceph: parse inline data in MClientReply and MClientCaps libceph: specify position of extent operation libceph: add CREATE osd operation support libceph: add SETXATTR/CMPXATTR osd operations support rbd: don't treat CEPH_OSD_OP_DELETE as extent op ceph: remove unused stringification macros libceph: require cephx message signature by default ceph: introduce global empty snap context ceph: message versioning fixes ...
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/addr.c273
-rw-r--r--fs/ceph/caps.c132
-rw-r--r--fs/ceph/dir.c27
-rw-r--r--fs/ceph/file.c97
-rw-r--r--fs/ceph/inode.c59
-rw-r--r--fs/ceph/locks.c64
-rw-r--r--fs/ceph/mds_client.c41
-rw-r--r--fs/ceph/mds_client.h10
-rw-r--r--fs/ceph/snap.c37
-rw-r--r--fs/ceph/super.c16
-rw-r--r--fs/ceph/super.h55
-rw-r--r--fs/ceph/super.h.rej10
-rw-r--r--fs/ceph/xattr.c7
13 files changed, 712 insertions, 116 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 18c06bbaf136..f5013d92a7e6 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -192,17 +192,30 @@ static int readpage_nounlock(struct file *filp, struct page *page)
192 struct ceph_osd_client *osdc = 192 struct ceph_osd_client *osdc =
193 &ceph_inode_to_client(inode)->client->osdc; 193 &ceph_inode_to_client(inode)->client->osdc;
194 int err = 0; 194 int err = 0;
195 u64 off = page_offset(page);
195 u64 len = PAGE_CACHE_SIZE; 196 u64 len = PAGE_CACHE_SIZE;
196 197
197 err = ceph_readpage_from_fscache(inode, page); 198 if (off >= i_size_read(inode)) {
199 zero_user_segment(page, err, PAGE_CACHE_SIZE);
200 SetPageUptodate(page);
201 return 0;
202 }
198 203
204 /*
205 * Uptodate inline data should have been added into page cache
206 * while getting Fcr caps.
207 */
208 if (ci->i_inline_version != CEPH_INLINE_NONE)
209 return -EINVAL;
210
211 err = ceph_readpage_from_fscache(inode, page);
199 if (err == 0) 212 if (err == 0)
200 goto out; 213 goto out;
201 214
202 dout("readpage inode %p file %p page %p index %lu\n", 215 dout("readpage inode %p file %p page %p index %lu\n",
203 inode, filp, page, page->index); 216 inode, filp, page, page->index);
204 err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, 217 err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
205 (u64) page_offset(page), &len, 218 off, &len,
206 ci->i_truncate_seq, ci->i_truncate_size, 219 ci->i_truncate_seq, ci->i_truncate_size,
207 &page, 1, 0); 220 &page, 1, 0);
208 if (err == -ENOENT) 221 if (err == -ENOENT)
@@ -319,7 +332,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
319 off, len); 332 off, len);
320 vino = ceph_vino(inode); 333 vino = ceph_vino(inode);
321 req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, 334 req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
322 1, CEPH_OSD_OP_READ, 335 0, 1, CEPH_OSD_OP_READ,
323 CEPH_OSD_FLAG_READ, NULL, 336 CEPH_OSD_FLAG_READ, NULL,
324 ci->i_truncate_seq, ci->i_truncate_size, 337 ci->i_truncate_seq, ci->i_truncate_size,
325 false); 338 false);
@@ -384,6 +397,9 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
384 int rc = 0; 397 int rc = 0;
385 int max = 0; 398 int max = 0;
386 399
400 if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
401 return -EINVAL;
402
387 rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list, 403 rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
388 &nr_pages); 404 &nr_pages);
389 405
@@ -673,7 +689,7 @@ static int ceph_writepages_start(struct address_space *mapping,
673 int rc = 0; 689 int rc = 0;
674 unsigned wsize = 1 << inode->i_blkbits; 690 unsigned wsize = 1 << inode->i_blkbits;
675 struct ceph_osd_request *req = NULL; 691 struct ceph_osd_request *req = NULL;
676 int do_sync; 692 int do_sync = 0;
677 u64 truncate_size, snap_size; 693 u64 truncate_size, snap_size;
678 u32 truncate_seq; 694 u32 truncate_seq;
679 695
@@ -750,7 +766,6 @@ retry:
750 last_snapc = snapc; 766 last_snapc = snapc;
751 767
752 while (!done && index <= end) { 768 while (!done && index <= end) {
753 int num_ops = do_sync ? 2 : 1;
754 unsigned i; 769 unsigned i;
755 int first; 770 int first;
756 pgoff_t next; 771 pgoff_t next;
@@ -850,7 +865,8 @@ get_more_pages:
850 len = wsize; 865 len = wsize;
851 req = ceph_osdc_new_request(&fsc->client->osdc, 866 req = ceph_osdc_new_request(&fsc->client->osdc,
852 &ci->i_layout, vino, 867 &ci->i_layout, vino,
853 offset, &len, num_ops, 868 offset, &len, 0,
869 do_sync ? 2 : 1,
854 CEPH_OSD_OP_WRITE, 870 CEPH_OSD_OP_WRITE,
855 CEPH_OSD_FLAG_WRITE | 871 CEPH_OSD_FLAG_WRITE |
856 CEPH_OSD_FLAG_ONDISK, 872 CEPH_OSD_FLAG_ONDISK,
@@ -862,6 +878,9 @@ get_more_pages:
862 break; 878 break;
863 } 879 }
864 880
881 if (do_sync)
882 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
883
865 req->r_callback = writepages_finish; 884 req->r_callback = writepages_finish;
866 req->r_inode = inode; 885 req->r_inode = inode;
867 886
@@ -1204,6 +1223,7 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1204 struct inode *inode = file_inode(vma->vm_file); 1223 struct inode *inode = file_inode(vma->vm_file);
1205 struct ceph_inode_info *ci = ceph_inode(inode); 1224 struct ceph_inode_info *ci = ceph_inode(inode);
1206 struct ceph_file_info *fi = vma->vm_file->private_data; 1225 struct ceph_file_info *fi = vma->vm_file->private_data;
1226 struct page *pinned_page = NULL;
1207 loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT; 1227 loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT;
1208 int want, got, ret; 1228 int want, got, ret;
1209 1229
@@ -1215,7 +1235,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1215 want = CEPH_CAP_FILE_CACHE; 1235 want = CEPH_CAP_FILE_CACHE;
1216 while (1) { 1236 while (1) {
1217 got = 0; 1237 got = 0;
1218 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); 1238 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want,
1239 -1, &got, &pinned_page);
1219 if (ret == 0) 1240 if (ret == 0)
1220 break; 1241 break;
1221 if (ret != -ERESTARTSYS) { 1242 if (ret != -ERESTARTSYS) {
@@ -1226,12 +1247,54 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1226 dout("filemap_fault %p %llu~%zd got cap refs on %s\n", 1247 dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
1227 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got)); 1248 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got));
1228 1249
1229 ret = filemap_fault(vma, vmf); 1250 if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
1251 ci->i_inline_version == CEPH_INLINE_NONE)
1252 ret = filemap_fault(vma, vmf);
1253 else
1254 ret = -EAGAIN;
1230 1255
1231 dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", 1256 dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
1232 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret); 1257 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret);
1258 if (pinned_page)
1259 page_cache_release(pinned_page);
1233 ceph_put_cap_refs(ci, got); 1260 ceph_put_cap_refs(ci, got);
1234 1261
1262 if (ret != -EAGAIN)
1263 return ret;
1264
1265 /* read inline data */
1266 if (off >= PAGE_CACHE_SIZE) {
1267 /* does not support inline data > PAGE_SIZE */
1268 ret = VM_FAULT_SIGBUS;
1269 } else {
1270 int ret1;
1271 struct address_space *mapping = inode->i_mapping;
1272 struct page *page = find_or_create_page(mapping, 0,
1273 mapping_gfp_mask(mapping) &
1274 ~__GFP_FS);
1275 if (!page) {
1276 ret = VM_FAULT_OOM;
1277 goto out;
1278 }
1279 ret1 = __ceph_do_getattr(inode, page,
1280 CEPH_STAT_CAP_INLINE_DATA, true);
1281 if (ret1 < 0 || off >= i_size_read(inode)) {
1282 unlock_page(page);
1283 page_cache_release(page);
1284 ret = VM_FAULT_SIGBUS;
1285 goto out;
1286 }
1287 if (ret1 < PAGE_CACHE_SIZE)
1288 zero_user_segment(page, ret1, PAGE_CACHE_SIZE);
1289 else
1290 flush_dcache_page(page);
1291 SetPageUptodate(page);
1292 vmf->page = page;
1293 ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
1294 }
1295out:
1296 dout("filemap_fault %p %llu~%zd read inline data ret %d\n",
1297 inode, off, (size_t)PAGE_CACHE_SIZE, ret);
1235 return ret; 1298 return ret;
1236} 1299}
1237 1300
@@ -1250,6 +1313,19 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1250 size_t len; 1313 size_t len;
1251 int want, got, ret; 1314 int want, got, ret;
1252 1315
1316 if (ci->i_inline_version != CEPH_INLINE_NONE) {
1317 struct page *locked_page = NULL;
1318 if (off == 0) {
1319 lock_page(page);
1320 locked_page = page;
1321 }
1322 ret = ceph_uninline_data(vma->vm_file, locked_page);
1323 if (locked_page)
1324 unlock_page(locked_page);
1325 if (ret < 0)
1326 return VM_FAULT_SIGBUS;
1327 }
1328
1253 if (off + PAGE_CACHE_SIZE <= size) 1329 if (off + PAGE_CACHE_SIZE <= size)
1254 len = PAGE_CACHE_SIZE; 1330 len = PAGE_CACHE_SIZE;
1255 else 1331 else
@@ -1263,7 +1339,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1263 want = CEPH_CAP_FILE_BUFFER; 1339 want = CEPH_CAP_FILE_BUFFER;
1264 while (1) { 1340 while (1) {
1265 got = 0; 1341 got = 0;
1266 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len); 1342 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
1343 &got, NULL);
1267 if (ret == 0) 1344 if (ret == 0)
1268 break; 1345 break;
1269 if (ret != -ERESTARTSYS) { 1346 if (ret != -ERESTARTSYS) {
@@ -1297,11 +1374,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1297 ret = VM_FAULT_SIGBUS; 1374 ret = VM_FAULT_SIGBUS;
1298 } 1375 }
1299out: 1376out:
1300 if (ret != VM_FAULT_LOCKED) { 1377 if (ret != VM_FAULT_LOCKED)
1301 unlock_page(page); 1378 unlock_page(page);
1302 } else { 1379 if (ret == VM_FAULT_LOCKED ||
1380 ci->i_inline_version != CEPH_INLINE_NONE) {
1303 int dirty; 1381 int dirty;
1304 spin_lock(&ci->i_ceph_lock); 1382 spin_lock(&ci->i_ceph_lock);
1383 ci->i_inline_version = CEPH_INLINE_NONE;
1305 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); 1384 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
1306 spin_unlock(&ci->i_ceph_lock); 1385 spin_unlock(&ci->i_ceph_lock);
1307 if (dirty) 1386 if (dirty)
@@ -1315,6 +1394,178 @@ out:
1315 return ret; 1394 return ret;
1316} 1395}
1317 1396
1397void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
1398 char *data, size_t len)
1399{
1400 struct address_space *mapping = inode->i_mapping;
1401 struct page *page;
1402
1403 if (locked_page) {
1404 page = locked_page;
1405 } else {
1406 if (i_size_read(inode) == 0)
1407 return;
1408 page = find_or_create_page(mapping, 0,
1409 mapping_gfp_mask(mapping) & ~__GFP_FS);
1410 if (!page)
1411 return;
1412 if (PageUptodate(page)) {
1413 unlock_page(page);
1414 page_cache_release(page);
1415 return;
1416 }
1417 }
1418
1419 dout("fill_inline_data %p %llx.%llx len %lu locked_page %p\n",
1420 inode, ceph_vinop(inode), len, locked_page);
1421
1422 if (len > 0) {
1423 void *kaddr = kmap_atomic(page);
1424 memcpy(kaddr, data, len);
1425 kunmap_atomic(kaddr);
1426 }
1427
1428 if (page != locked_page) {
1429 if (len < PAGE_CACHE_SIZE)
1430 zero_user_segment(page, len, PAGE_CACHE_SIZE);
1431 else
1432 flush_dcache_page(page);
1433
1434 SetPageUptodate(page);
1435 unlock_page(page);
1436 page_cache_release(page);
1437 }
1438}
1439
1440int ceph_uninline_data(struct file *filp, struct page *locked_page)
1441{
1442 struct inode *inode = file_inode(filp);
1443 struct ceph_inode_info *ci = ceph_inode(inode);
1444 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1445 struct ceph_osd_request *req;
1446 struct page *page = NULL;
1447 u64 len, inline_version;
1448 int err = 0;
1449 bool from_pagecache = false;
1450
1451 spin_lock(&ci->i_ceph_lock);
1452 inline_version = ci->i_inline_version;
1453 spin_unlock(&ci->i_ceph_lock);
1454
1455 dout("uninline_data %p %llx.%llx inline_version %llu\n",
1456 inode, ceph_vinop(inode), inline_version);
1457
1458 if (inline_version == 1 || /* initial version, no data */
1459 inline_version == CEPH_INLINE_NONE)
1460 goto out;
1461
1462 if (locked_page) {
1463 page = locked_page;
1464 WARN_ON(!PageUptodate(page));
1465 } else if (ceph_caps_issued(ci) &
1466 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
1467 page = find_get_page(inode->i_mapping, 0);
1468 if (page) {
1469 if (PageUptodate(page)) {
1470 from_pagecache = true;
1471 lock_page(page);
1472 } else {
1473 page_cache_release(page);
1474 page = NULL;
1475 }
1476 }
1477 }
1478
1479 if (page) {
1480 len = i_size_read(inode);
1481 if (len > PAGE_CACHE_SIZE)
1482 len = PAGE_CACHE_SIZE;
1483 } else {
1484 page = __page_cache_alloc(GFP_NOFS);
1485 if (!page) {
1486 err = -ENOMEM;
1487 goto out;
1488 }
1489 err = __ceph_do_getattr(inode, page,
1490 CEPH_STAT_CAP_INLINE_DATA, true);
1491 if (err < 0) {
1492 /* no inline data */
1493 if (err == -ENODATA)
1494 err = 0;
1495 goto out;
1496 }
1497 len = err;
1498 }
1499
1500 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1501 ceph_vino(inode), 0, &len, 0, 1,
1502 CEPH_OSD_OP_CREATE,
1503 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1504 ci->i_snap_realm->cached_context,
1505 0, 0, false);
1506 if (IS_ERR(req)) {
1507 err = PTR_ERR(req);
1508 goto out;
1509 }
1510
1511 ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
1512 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1513 if (!err)
1514 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1515 ceph_osdc_put_request(req);
1516 if (err < 0)
1517 goto out;
1518
1519 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1520 ceph_vino(inode), 0, &len, 1, 3,
1521 CEPH_OSD_OP_WRITE,
1522 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1523 ci->i_snap_realm->cached_context,
1524 ci->i_truncate_seq, ci->i_truncate_size,
1525 false);
1526 if (IS_ERR(req)) {
1527 err = PTR_ERR(req);
1528 goto out;
1529 }
1530
1531 osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
1532
1533 err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
1534 "inline_version", &inline_version,
1535 sizeof(inline_version),
1536 CEPH_OSD_CMPXATTR_OP_GT,
1537 CEPH_OSD_CMPXATTR_MODE_U64);
1538 if (err)
1539 goto out_put;
1540
1541 err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
1542 "inline_version", &inline_version,
1543 sizeof(inline_version), 0, 0);
1544 if (err)
1545 goto out_put;
1546
1547 ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
1548 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1549 if (!err)
1550 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1551out_put:
1552 ceph_osdc_put_request(req);
1553 if (err == -ECANCELED)
1554 err = 0;
1555out:
1556 if (page && page != locked_page) {
1557 if (from_pagecache) {
1558 unlock_page(page);
1559 page_cache_release(page);
1560 } else
1561 __free_pages(page, 0);
1562 }
1563
1564 dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
1565 inode, ceph_vinop(inode), inline_version, err);
1566 return err;
1567}
1568
1318static struct vm_operations_struct ceph_vmops = { 1569static struct vm_operations_struct ceph_vmops = {
1319 .fault = ceph_filemap_fault, 1570 .fault = ceph_filemap_fault,
1320 .page_mkwrite = ceph_page_mkwrite, 1571 .page_mkwrite = ceph_page_mkwrite,
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index cefca661464b..b93c631c6c87 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -975,10 +975,12 @@ static int send_cap_msg(struct ceph_mds_session *session,
975 kuid_t uid, kgid_t gid, umode_t mode, 975 kuid_t uid, kgid_t gid, umode_t mode,
976 u64 xattr_version, 976 u64 xattr_version,
977 struct ceph_buffer *xattrs_buf, 977 struct ceph_buffer *xattrs_buf,
978 u64 follows) 978 u64 follows, bool inline_data)
979{ 979{
980 struct ceph_mds_caps *fc; 980 struct ceph_mds_caps *fc;
981 struct ceph_msg *msg; 981 struct ceph_msg *msg;
982 void *p;
983 size_t extra_len;
982 984
983 dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" 985 dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
984 " seq %u/%u mseq %u follows %lld size %llu/%llu" 986 " seq %u/%u mseq %u follows %lld size %llu/%llu"
@@ -988,7 +990,10 @@ static int send_cap_msg(struct ceph_mds_session *session,
988 seq, issue_seq, mseq, follows, size, max_size, 990 seq, issue_seq, mseq, follows, size, max_size,
989 xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); 991 xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
990 992
991 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false); 993 /* flock buffer size + inline version + inline data size */
994 extra_len = 4 + 8 + 4;
995 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
996 GFP_NOFS, false);
992 if (!msg) 997 if (!msg)
993 return -ENOMEM; 998 return -ENOMEM;
994 999
@@ -1020,6 +1025,14 @@ static int send_cap_msg(struct ceph_mds_session *session,
1020 fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid)); 1025 fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid));
1021 fc->mode = cpu_to_le32(mode); 1026 fc->mode = cpu_to_le32(mode);
1022 1027
1028 p = fc + 1;
1029 /* flock buffer size */
1030 ceph_encode_32(&p, 0);
1031 /* inline version */
1032 ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE);
1033 /* inline data size */
1034 ceph_encode_32(&p, 0);
1035
1023 fc->xattr_version = cpu_to_le64(xattr_version); 1036 fc->xattr_version = cpu_to_le64(xattr_version);
1024 if (xattrs_buf) { 1037 if (xattrs_buf) {
1025 msg->middle = ceph_buffer_get(xattrs_buf); 1038 msg->middle = ceph_buffer_get(xattrs_buf);
@@ -1126,6 +1139,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1126 u64 flush_tid = 0; 1139 u64 flush_tid = 0;
1127 int i; 1140 int i;
1128 int ret; 1141 int ret;
1142 bool inline_data;
1129 1143
1130 held = cap->issued | cap->implemented; 1144 held = cap->issued | cap->implemented;
1131 revoking = cap->implemented & ~cap->issued; 1145 revoking = cap->implemented & ~cap->issued;
@@ -1209,13 +1223,15 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1209 xattr_version = ci->i_xattrs.version; 1223 xattr_version = ci->i_xattrs.version;
1210 } 1224 }
1211 1225
1226 inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
1227
1212 spin_unlock(&ci->i_ceph_lock); 1228 spin_unlock(&ci->i_ceph_lock);
1213 1229
1214 ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, 1230 ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
1215 op, keep, want, flushing, seq, flush_tid, issue_seq, mseq, 1231 op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
1216 size, max_size, &mtime, &atime, time_warp_seq, 1232 size, max_size, &mtime, &atime, time_warp_seq,
1217 uid, gid, mode, xattr_version, xattr_blob, 1233 uid, gid, mode, xattr_version, xattr_blob,
1218 follows); 1234 follows, inline_data);
1219 if (ret < 0) { 1235 if (ret < 0) {
1220 dout("error sending cap msg, must requeue %p\n", inode); 1236 dout("error sending cap msg, must requeue %p\n", inode);
1221 delayed = 1; 1237 delayed = 1;
@@ -1336,7 +1352,7 @@ retry:
1336 capsnap->time_warp_seq, 1352 capsnap->time_warp_seq,
1337 capsnap->uid, capsnap->gid, capsnap->mode, 1353 capsnap->uid, capsnap->gid, capsnap->mode,
1338 capsnap->xattr_version, capsnap->xattr_blob, 1354 capsnap->xattr_version, capsnap->xattr_blob,
1339 capsnap->follows); 1355 capsnap->follows, capsnap->inline_data);
1340 1356
1341 next_follows = capsnap->follows + 1; 1357 next_follows = capsnap->follows + 1;
1342 ceph_put_cap_snap(capsnap); 1358 ceph_put_cap_snap(capsnap);
@@ -2057,15 +2073,17 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
2057 * requested from the MDS. 2073 * requested from the MDS.
2058 */ 2074 */
2059static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, 2075static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2060 int *got, loff_t endoff, int *check_max, int *err) 2076 loff_t endoff, int *got, struct page **pinned_page,
2077 int *check_max, int *err)
2061{ 2078{
2062 struct inode *inode = &ci->vfs_inode; 2079 struct inode *inode = &ci->vfs_inode;
2063 int ret = 0; 2080 int ret = 0;
2064 int have, implemented; 2081 int have, implemented, _got = 0;
2065 int file_wanted; 2082 int file_wanted;
2066 2083
2067 dout("get_cap_refs %p need %s want %s\n", inode, 2084 dout("get_cap_refs %p need %s want %s\n", inode,
2068 ceph_cap_string(need), ceph_cap_string(want)); 2085 ceph_cap_string(need), ceph_cap_string(want));
2086again:
2069 spin_lock(&ci->i_ceph_lock); 2087 spin_lock(&ci->i_ceph_lock);
2070 2088
2071 /* make sure file is actually open */ 2089 /* make sure file is actually open */
@@ -2075,7 +2093,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2075 ceph_cap_string(need), ceph_cap_string(file_wanted)); 2093 ceph_cap_string(need), ceph_cap_string(file_wanted));
2076 *err = -EBADF; 2094 *err = -EBADF;
2077 ret = 1; 2095 ret = 1;
2078 goto out; 2096 goto out_unlock;
2079 } 2097 }
2080 2098
2081 /* finish pending truncate */ 2099 /* finish pending truncate */
@@ -2095,7 +2113,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2095 *check_max = 1; 2113 *check_max = 1;
2096 ret = 1; 2114 ret = 1;
2097 } 2115 }
2098 goto out; 2116 goto out_unlock;
2099 } 2117 }
2100 /* 2118 /*
2101 * If a sync write is in progress, we must wait, so that we 2119 * If a sync write is in progress, we must wait, so that we
@@ -2103,7 +2121,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2103 */ 2121 */
2104 if (__ceph_have_pending_cap_snap(ci)) { 2122 if (__ceph_have_pending_cap_snap(ci)) {
2105 dout("get_cap_refs %p cap_snap_pending\n", inode); 2123 dout("get_cap_refs %p cap_snap_pending\n", inode);
2106 goto out; 2124 goto out_unlock;
2107 } 2125 }
2108 } 2126 }
2109 2127
@@ -2120,18 +2138,50 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2120 inode, ceph_cap_string(have), ceph_cap_string(not), 2138 inode, ceph_cap_string(have), ceph_cap_string(not),
2121 ceph_cap_string(revoking)); 2139 ceph_cap_string(revoking));
2122 if ((revoking & not) == 0) { 2140 if ((revoking & not) == 0) {
2123 *got = need | (have & want); 2141 _got = need | (have & want);
2124 __take_cap_refs(ci, *got); 2142 __take_cap_refs(ci, _got);
2125 ret = 1; 2143 ret = 1;
2126 } 2144 }
2127 } else { 2145 } else {
2128 dout("get_cap_refs %p have %s needed %s\n", inode, 2146 dout("get_cap_refs %p have %s needed %s\n", inode,
2129 ceph_cap_string(have), ceph_cap_string(need)); 2147 ceph_cap_string(have), ceph_cap_string(need));
2130 } 2148 }
2131out: 2149out_unlock:
2132 spin_unlock(&ci->i_ceph_lock); 2150 spin_unlock(&ci->i_ceph_lock);
2151
2152 if (ci->i_inline_version != CEPH_INLINE_NONE &&
2153 (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
2154 i_size_read(inode) > 0) {
2155 int ret1;
2156 struct page *page = find_get_page(inode->i_mapping, 0);
2157 if (page) {
2158 if (PageUptodate(page)) {
2159 *pinned_page = page;
2160 goto out;
2161 }
2162 page_cache_release(page);
2163 }
2164 /*
2165 * drop cap refs first because getattr while holding
2166 * caps refs can cause deadlock.
2167 */
2168 ceph_put_cap_refs(ci, _got);
2169 _got = 0;
2170
2171 /* getattr request will bring inline data into page cache */
2172 ret1 = __ceph_do_getattr(inode, NULL,
2173 CEPH_STAT_CAP_INLINE_DATA, true);
2174 if (ret1 >= 0) {
2175 ret = 0;
2176 goto again;
2177 }
2178 *err = ret1;
2179 ret = 1;
2180 }
2181out:
2133 dout("get_cap_refs %p ret %d got %s\n", inode, 2182 dout("get_cap_refs %p ret %d got %s\n", inode,
2134 ret, ceph_cap_string(*got)); 2183 ret, ceph_cap_string(_got));
2184 *got = _got;
2135 return ret; 2185 return ret;
2136} 2186}
2137 2187
@@ -2168,8 +2218,8 @@ static void check_max_size(struct inode *inode, loff_t endoff)
2168 * due to a small max_size, make sure we check_max_size (and possibly 2218 * due to a small max_size, make sure we check_max_size (and possibly
2169 * ask the mds) so we don't get hung up indefinitely. 2219 * ask the mds) so we don't get hung up indefinitely.
2170 */ 2220 */
2171int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got, 2221int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
2172 loff_t endoff) 2222 loff_t endoff, int *got, struct page **pinned_page)
2173{ 2223{
2174 int check_max, ret, err; 2224 int check_max, ret, err;
2175 2225
@@ -2179,8 +2229,8 @@ retry:
2179 check_max = 0; 2229 check_max = 0;
2180 err = 0; 2230 err = 0;
2181 ret = wait_event_interruptible(ci->i_cap_wq, 2231 ret = wait_event_interruptible(ci->i_cap_wq,
2182 try_get_cap_refs(ci, need, want, 2232 try_get_cap_refs(ci, need, want, endoff,
2183 got, endoff, 2233 got, pinned_page,
2184 &check_max, &err)); 2234 &check_max, &err));
2185 if (err) 2235 if (err)
2186 ret = err; 2236 ret = err;
@@ -2383,6 +2433,8 @@ static void invalidate_aliases(struct inode *inode)
2383static void handle_cap_grant(struct ceph_mds_client *mdsc, 2433static void handle_cap_grant(struct ceph_mds_client *mdsc,
2384 struct inode *inode, struct ceph_mds_caps *grant, 2434 struct inode *inode, struct ceph_mds_caps *grant,
2385 void *snaptrace, int snaptrace_len, 2435 void *snaptrace, int snaptrace_len,
2436 u64 inline_version,
2437 void *inline_data, int inline_len,
2386 struct ceph_buffer *xattr_buf, 2438 struct ceph_buffer *xattr_buf,
2387 struct ceph_mds_session *session, 2439 struct ceph_mds_session *session,
2388 struct ceph_cap *cap, int issued) 2440 struct ceph_cap *cap, int issued)
@@ -2403,6 +2455,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
2403 bool queue_invalidate = false; 2455 bool queue_invalidate = false;
2404 bool queue_revalidate = false; 2456 bool queue_revalidate = false;
2405 bool deleted_inode = false; 2457 bool deleted_inode = false;
2458 bool fill_inline = false;
2406 2459
2407 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", 2460 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
2408 inode, cap, mds, seq, ceph_cap_string(newcaps)); 2461 inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2576,6 +2629,13 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
2576 } 2629 }
2577 BUG_ON(cap->issued & ~cap->implemented); 2630 BUG_ON(cap->issued & ~cap->implemented);
2578 2631
2632 if (inline_version > 0 && inline_version >= ci->i_inline_version) {
2633 ci->i_inline_version = inline_version;
2634 if (ci->i_inline_version != CEPH_INLINE_NONE &&
2635 (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
2636 fill_inline = true;
2637 }
2638
2579 spin_unlock(&ci->i_ceph_lock); 2639 spin_unlock(&ci->i_ceph_lock);
2580 2640
2581 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { 2641 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
@@ -2589,6 +2649,9 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
2589 wake = true; 2649 wake = true;
2590 } 2650 }
2591 2651
2652 if (fill_inline)
2653 ceph_fill_inline_data(inode, NULL, inline_data, inline_len);
2654
2592 if (queue_trunc) { 2655 if (queue_trunc) {
2593 ceph_queue_vmtruncate(inode); 2656 ceph_queue_vmtruncate(inode);
2594 ceph_queue_revalidate(inode); 2657 ceph_queue_revalidate(inode);
@@ -2996,11 +3059,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2996 u64 cap_id; 3059 u64 cap_id;
2997 u64 size, max_size; 3060 u64 size, max_size;
2998 u64 tid; 3061 u64 tid;
3062 u64 inline_version = 0;
3063 void *inline_data = NULL;
3064 u32 inline_len = 0;
2999 void *snaptrace; 3065 void *snaptrace;
3000 size_t snaptrace_len; 3066 size_t snaptrace_len;
3001 void *flock; 3067 void *p, *end;
3002 void *end;
3003 u32 flock_len;
3004 3068
3005 dout("handle_caps from mds%d\n", mds); 3069 dout("handle_caps from mds%d\n", mds);
3006 3070
@@ -3021,30 +3085,37 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3021 3085
3022 snaptrace = h + 1; 3086 snaptrace = h + 1;
3023 snaptrace_len = le32_to_cpu(h->snap_trace_len); 3087 snaptrace_len = le32_to_cpu(h->snap_trace_len);
3088 p = snaptrace + snaptrace_len;
3024 3089
3025 if (le16_to_cpu(msg->hdr.version) >= 2) { 3090 if (le16_to_cpu(msg->hdr.version) >= 2) {
3026 void *p = snaptrace + snaptrace_len; 3091 u32 flock_len;
3027 ceph_decode_32_safe(&p, end, flock_len, bad); 3092 ceph_decode_32_safe(&p, end, flock_len, bad);
3028 if (p + flock_len > end) 3093 if (p + flock_len > end)
3029 goto bad; 3094 goto bad;
3030 flock = p; 3095 p += flock_len;
3031 } else {
3032 flock = NULL;
3033 flock_len = 0;
3034 } 3096 }
3035 3097
3036 if (le16_to_cpu(msg->hdr.version) >= 3) { 3098 if (le16_to_cpu(msg->hdr.version) >= 3) {
3037 if (op == CEPH_CAP_OP_IMPORT) { 3099 if (op == CEPH_CAP_OP_IMPORT) {
3038 void *p = flock + flock_len;
3039 if (p + sizeof(*peer) > end) 3100 if (p + sizeof(*peer) > end)
3040 goto bad; 3101 goto bad;
3041 peer = p; 3102 peer = p;
3103 p += sizeof(*peer);
3042 } else if (op == CEPH_CAP_OP_EXPORT) { 3104 } else if (op == CEPH_CAP_OP_EXPORT) {
3043 /* recorded in unused fields */ 3105 /* recorded in unused fields */
3044 peer = (void *)&h->size; 3106 peer = (void *)&h->size;
3045 } 3107 }
3046 } 3108 }
3047 3109
3110 if (le16_to_cpu(msg->hdr.version) >= 4) {
3111 ceph_decode_64_safe(&p, end, inline_version, bad);
3112 ceph_decode_32_safe(&p, end, inline_len, bad);
3113 if (p + inline_len > end)
3114 goto bad;
3115 inline_data = p;
3116 p += inline_len;
3117 }
3118
3048 /* lookup ino */ 3119 /* lookup ino */
3049 inode = ceph_find_inode(sb, vino); 3120 inode = ceph_find_inode(sb, vino);
3050 ci = ceph_inode(inode); 3121 ci = ceph_inode(inode);
@@ -3085,6 +3156,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3085 handle_cap_import(mdsc, inode, h, peer, session, 3156 handle_cap_import(mdsc, inode, h, peer, session,
3086 &cap, &issued); 3157 &cap, &issued);
3087 handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len, 3158 handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len,
3159 inline_version, inline_data, inline_len,
3088 msg->middle, session, cap, issued); 3160 msg->middle, session, cap, issued);
3089 goto done_unlocked; 3161 goto done_unlocked;
3090 } 3162 }
@@ -3105,8 +3177,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3105 case CEPH_CAP_OP_GRANT: 3177 case CEPH_CAP_OP_GRANT:
3106 __ceph_caps_issued(ci, &issued); 3178 __ceph_caps_issued(ci, &issued);
3107 issued |= __ceph_caps_dirty(ci); 3179 issued |= __ceph_caps_dirty(ci);
3108 handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle, 3180 handle_cap_grant(mdsc, inode, h, NULL, 0,
3109 session, cap, issued); 3181 inline_version, inline_data, inline_len,
3182 msg->middle, session, cap, issued);
3110 goto done_unlocked; 3183 goto done_unlocked;
3111 3184
3112 case CEPH_CAP_OP_FLUSH_ACK: 3185 case CEPH_CAP_OP_FLUSH_ACK:
@@ -3137,8 +3210,7 @@ flush_cap_releases:
3137done: 3210done:
3138 mutex_unlock(&session->s_mutex); 3211 mutex_unlock(&session->s_mutex);
3139done_unlocked: 3212done_unlocked:
3140 if (inode) 3213 iput(inode);
3141 iput(inode);
3142 return; 3214 return;
3143 3215
3144bad: 3216bad:
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 681a8537b64f..c241603764fd 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -183,7 +183,7 @@ more:
183 spin_unlock(&parent->d_lock); 183 spin_unlock(&parent->d_lock);
184 184
185 /* make sure a dentry wasn't dropped while we didn't have parent lock */ 185 /* make sure a dentry wasn't dropped while we didn't have parent lock */
186 if (!ceph_dir_is_complete(dir)) { 186 if (!ceph_dir_is_complete_ordered(dir)) {
187 dout(" lost dir complete on %p; falling back to mds\n", dir); 187 dout(" lost dir complete on %p; falling back to mds\n", dir);
188 dput(dentry); 188 dput(dentry);
189 err = -EAGAIN; 189 err = -EAGAIN;
@@ -261,10 +261,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
261 261
262 /* always start with . and .. */ 262 /* always start with . and .. */
263 if (ctx->pos == 0) { 263 if (ctx->pos == 0) {
264 /* note dir version at start of readdir so we can tell
265 * if any dentries get dropped */
266 fi->dir_release_count = atomic_read(&ci->i_release_count);
267
268 dout("readdir off 0 -> '.'\n"); 264 dout("readdir off 0 -> '.'\n");
269 if (!dir_emit(ctx, ".", 1, 265 if (!dir_emit(ctx, ".", 1,
270 ceph_translate_ino(inode->i_sb, inode->i_ino), 266 ceph_translate_ino(inode->i_sb, inode->i_ino),
@@ -289,7 +285,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
289 if ((ctx->pos == 2 || fi->dentry) && 285 if ((ctx->pos == 2 || fi->dentry) &&
290 !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && 286 !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
291 ceph_snap(inode) != CEPH_SNAPDIR && 287 ceph_snap(inode) != CEPH_SNAPDIR &&
292 __ceph_dir_is_complete(ci) && 288 __ceph_dir_is_complete_ordered(ci) &&
293 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { 289 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
294 u32 shared_gen = ci->i_shared_gen; 290 u32 shared_gen = ci->i_shared_gen;
295 spin_unlock(&ci->i_ceph_lock); 291 spin_unlock(&ci->i_ceph_lock);
@@ -312,6 +308,13 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
312 308
313 /* proceed with a normal readdir */ 309 /* proceed with a normal readdir */
314 310
311 if (ctx->pos == 2) {
312 /* note dir version at start of readdir so we can tell
313 * if any dentries get dropped */
314 fi->dir_release_count = atomic_read(&ci->i_release_count);
315 fi->dir_ordered_count = ci->i_ordered_count;
316 }
317
315more: 318more:
316 /* do we have the correct frag content buffered? */ 319 /* do we have the correct frag content buffered? */
317 if (fi->frag != frag || fi->last_readdir == NULL) { 320 if (fi->frag != frag || fi->last_readdir == NULL) {
@@ -446,8 +449,12 @@ more:
446 */ 449 */
447 spin_lock(&ci->i_ceph_lock); 450 spin_lock(&ci->i_ceph_lock);
448 if (atomic_read(&ci->i_release_count) == fi->dir_release_count) { 451 if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
449 dout(" marking %p complete\n", inode); 452 if (ci->i_ordered_count == fi->dir_ordered_count)
450 __ceph_dir_set_complete(ci, fi->dir_release_count); 453 dout(" marking %p complete and ordered\n", inode);
454 else
455 dout(" marking %p complete\n", inode);
456 __ceph_dir_set_complete(ci, fi->dir_release_count,
457 fi->dir_ordered_count);
451 } 458 }
452 spin_unlock(&ci->i_ceph_lock); 459 spin_unlock(&ci->i_ceph_lock);
453 460
@@ -805,7 +812,9 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
805 acls.pagelist = NULL; 812 acls.pagelist = NULL;
806 } 813 }
807 err = ceph_mdsc_do_request(mdsc, dir, req); 814 err = ceph_mdsc_do_request(mdsc, dir, req);
808 if (!err && !req->r_reply_info.head->is_dentry) 815 if (!err &&
816 !req->r_reply_info.head->is_target &&
817 !req->r_reply_info.head->is_dentry)
809 err = ceph_handle_notrace_create(dir, dentry); 818 err = ceph_handle_notrace_create(dir, dentry);
810 ceph_mdsc_put_request(req); 819 ceph_mdsc_put_request(req);
811out: 820out:
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 9f8e3572040e..ce74b394b49d 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -333,6 +333,11 @@ int ceph_release(struct inode *inode, struct file *file)
333 return 0; 333 return 0;
334} 334}
335 335
336enum {
337 CHECK_EOF = 1,
338 READ_INLINE = 2,
339};
340
336/* 341/*
337 * Read a range of bytes striped over one or more objects. Iterate over 342 * Read a range of bytes striped over one or more objects. Iterate over
338 * objects we stripe over. (That's not atomic, but good enough for now.) 343 * objects we stripe over. (That's not atomic, but good enough for now.)
@@ -412,7 +417,7 @@ more:
412 ret = read; 417 ret = read;
413 /* did we bounce off eof? */ 418 /* did we bounce off eof? */
414 if (pos + left > inode->i_size) 419 if (pos + left > inode->i_size)
415 *checkeof = 1; 420 *checkeof = CHECK_EOF;
416 } 421 }
417 422
418 dout("striped_read returns %d\n", ret); 423 dout("striped_read returns %d\n", ret);
@@ -598,7 +603,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
598 snapc = ci->i_snap_realm->cached_context; 603 snapc = ci->i_snap_realm->cached_context;
599 vino = ceph_vino(inode); 604 vino = ceph_vino(inode);
600 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 605 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
601 vino, pos, &len, 606 vino, pos, &len, 0,
602 2,/*include a 'startsync' command*/ 607 2,/*include a 'startsync' command*/
603 CEPH_OSD_OP_WRITE, flags, snapc, 608 CEPH_OSD_OP_WRITE, flags, snapc,
604 ci->i_truncate_seq, 609 ci->i_truncate_seq,
@@ -609,6 +614,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
609 break; 614 break;
610 } 615 }
611 616
617 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
618
612 n = iov_iter_get_pages_alloc(from, &pages, len, &start); 619 n = iov_iter_get_pages_alloc(from, &pages, len, &start);
613 if (unlikely(n < 0)) { 620 if (unlikely(n < 0)) {
614 ret = n; 621 ret = n;
@@ -713,7 +720,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
713 snapc = ci->i_snap_realm->cached_context; 720 snapc = ci->i_snap_realm->cached_context;
714 vino = ceph_vino(inode); 721 vino = ceph_vino(inode);
715 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 722 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
716 vino, pos, &len, 1, 723 vino, pos, &len, 0, 1,
717 CEPH_OSD_OP_WRITE, flags, snapc, 724 CEPH_OSD_OP_WRITE, flags, snapc,
718 ci->i_truncate_seq, 725 ci->i_truncate_seq,
719 ci->i_truncate_size, 726 ci->i_truncate_size,
@@ -803,9 +810,10 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
803 size_t len = iocb->ki_nbytes; 810 size_t len = iocb->ki_nbytes;
804 struct inode *inode = file_inode(filp); 811 struct inode *inode = file_inode(filp);
805 struct ceph_inode_info *ci = ceph_inode(inode); 812 struct ceph_inode_info *ci = ceph_inode(inode);
813 struct page *pinned_page = NULL;
806 ssize_t ret; 814 ssize_t ret;
807 int want, got = 0; 815 int want, got = 0;
808 int checkeof = 0, read = 0; 816 int retry_op = 0, read = 0;
809 817
810again: 818again:
811 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", 819 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
@@ -815,7 +823,7 @@ again:
815 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; 823 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
816 else 824 else
817 want = CEPH_CAP_FILE_CACHE; 825 want = CEPH_CAP_FILE_CACHE;
818 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); 826 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
819 if (ret < 0) 827 if (ret < 0)
820 return ret; 828 return ret;
821 829
@@ -827,8 +835,12 @@ again:
827 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, 835 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
828 ceph_cap_string(got)); 836 ceph_cap_string(got));
829 837
830 /* hmm, this isn't really async... */ 838 if (ci->i_inline_version == CEPH_INLINE_NONE) {
831 ret = ceph_sync_read(iocb, to, &checkeof); 839 /* hmm, this isn't really async... */
840 ret = ceph_sync_read(iocb, to, &retry_op);
841 } else {
842 retry_op = READ_INLINE;
843 }
832 } else { 844 } else {
833 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", 845 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
834 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, 846 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
@@ -838,13 +850,55 @@ again:
838 } 850 }
839 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", 851 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
840 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); 852 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
853 if (pinned_page) {
854 page_cache_release(pinned_page);
855 pinned_page = NULL;
856 }
841 ceph_put_cap_refs(ci, got); 857 ceph_put_cap_refs(ci, got);
858 if (retry_op && ret >= 0) {
859 int statret;
860 struct page *page = NULL;
861 loff_t i_size;
862 if (retry_op == READ_INLINE) {
863 page = __page_cache_alloc(GFP_NOFS);
864 if (!page)
865 return -ENOMEM;
866 }
842 867
843 if (checkeof && ret >= 0) { 868 statret = __ceph_do_getattr(inode, page,
844 int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false); 869 CEPH_STAT_CAP_INLINE_DATA, !!page);
870 if (statret < 0) {
871 __free_page(page);
872 if (statret == -ENODATA) {
873 BUG_ON(retry_op != READ_INLINE);
874 goto again;
875 }
876 return statret;
877 }
878
879 i_size = i_size_read(inode);
880 if (retry_op == READ_INLINE) {
881 /* does not support inline data > PAGE_SIZE */
882 if (i_size > PAGE_CACHE_SIZE) {
883 ret = -EIO;
884 } else if (iocb->ki_pos < i_size) {
885 loff_t end = min_t(loff_t, i_size,
886 iocb->ki_pos + len);
887 if (statret < end)
888 zero_user_segment(page, statret, end);
889 ret = copy_page_to_iter(page,
890 iocb->ki_pos & ~PAGE_MASK,
891 end - iocb->ki_pos, to);
892 iocb->ki_pos += ret;
893 } else {
894 ret = 0;
895 }
896 __free_pages(page, 0);
897 return ret;
898 }
845 899
846 /* hit EOF or hole? */ 900 /* hit EOF or hole? */
847 if (statret == 0 && iocb->ki_pos < inode->i_size && 901 if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
848 ret < len) { 902 ret < len) {
849 dout("sync_read hit hole, ppos %lld < size %lld" 903 dout("sync_read hit hole, ppos %lld < size %lld"
850 ", reading more\n", iocb->ki_pos, 904 ", reading more\n", iocb->ki_pos,
@@ -852,7 +906,7 @@ again:
852 906
853 read += ret; 907 read += ret;
854 len -= ret; 908 len -= ret;
855 checkeof = 0; 909 retry_op = 0;
856 goto again; 910 goto again;
857 } 911 }
858 } 912 }
@@ -909,6 +963,12 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
909 if (err) 963 if (err)
910 goto out; 964 goto out;
911 965
966 if (ci->i_inline_version != CEPH_INLINE_NONE) {
967 err = ceph_uninline_data(file, NULL);
968 if (err < 0)
969 goto out;
970 }
971
912retry_snap: 972retry_snap:
913 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) { 973 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) {
914 err = -ENOSPC; 974 err = -ENOSPC;
@@ -922,7 +982,8 @@ retry_snap:
922 else 982 else
923 want = CEPH_CAP_FILE_BUFFER; 983 want = CEPH_CAP_FILE_BUFFER;
924 got = 0; 984 got = 0;
925 err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos + count); 985 err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
986 &got, NULL);
926 if (err < 0) 987 if (err < 0)
927 goto out; 988 goto out;
928 989
@@ -969,6 +1030,7 @@ retry_snap:
969 if (written >= 0) { 1030 if (written >= 0) {
970 int dirty; 1031 int dirty;
971 spin_lock(&ci->i_ceph_lock); 1032 spin_lock(&ci->i_ceph_lock);
1033 ci->i_inline_version = CEPH_INLINE_NONE;
972 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); 1034 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
973 spin_unlock(&ci->i_ceph_lock); 1035 spin_unlock(&ci->i_ceph_lock);
974 if (dirty) 1036 if (dirty)
@@ -1111,7 +1173,7 @@ static int ceph_zero_partial_object(struct inode *inode,
1111 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 1173 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1112 ceph_vino(inode), 1174 ceph_vino(inode),
1113 offset, length, 1175 offset, length,
1114 1, op, 1176 0, 1, op,
1115 CEPH_OSD_FLAG_WRITE | 1177 CEPH_OSD_FLAG_WRITE |
1116 CEPH_OSD_FLAG_ONDISK, 1178 CEPH_OSD_FLAG_ONDISK,
1117 NULL, 0, 0, false); 1179 NULL, 0, 0, false);
@@ -1214,6 +1276,12 @@ static long ceph_fallocate(struct file *file, int mode,
1214 goto unlock; 1276 goto unlock;
1215 } 1277 }
1216 1278
1279 if (ci->i_inline_version != CEPH_INLINE_NONE) {
1280 ret = ceph_uninline_data(file, NULL);
1281 if (ret < 0)
1282 goto unlock;
1283 }
1284
1217 size = i_size_read(inode); 1285 size = i_size_read(inode);
1218 if (!(mode & FALLOC_FL_KEEP_SIZE)) 1286 if (!(mode & FALLOC_FL_KEEP_SIZE))
1219 endoff = offset + length; 1287 endoff = offset + length;
@@ -1223,7 +1291,7 @@ static long ceph_fallocate(struct file *file, int mode,
1223 else 1291 else
1224 want = CEPH_CAP_FILE_BUFFER; 1292 want = CEPH_CAP_FILE_BUFFER;
1225 1293
1226 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff); 1294 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
1227 if (ret < 0) 1295 if (ret < 0)
1228 goto unlock; 1296 goto unlock;
1229 1297
@@ -1240,6 +1308,7 @@ static long ceph_fallocate(struct file *file, int mode,
1240 1308
1241 if (!ret) { 1309 if (!ret) {
1242 spin_lock(&ci->i_ceph_lock); 1310 spin_lock(&ci->i_ceph_lock);
1311 ci->i_inline_version = CEPH_INLINE_NONE;
1243 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); 1312 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
1244 spin_unlock(&ci->i_ceph_lock); 1313 spin_unlock(&ci->i_ceph_lock);
1245 if (dirty) 1314 if (dirty)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index a5593d51d035..f61a74115beb 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -387,8 +387,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
387 spin_lock_init(&ci->i_ceph_lock); 387 spin_lock_init(&ci->i_ceph_lock);
388 388
389 ci->i_version = 0; 389 ci->i_version = 0;
390 ci->i_inline_version = 0;
390 ci->i_time_warp_seq = 0; 391 ci->i_time_warp_seq = 0;
391 ci->i_ceph_flags = 0; 392 ci->i_ceph_flags = 0;
393 ci->i_ordered_count = 0;
392 atomic_set(&ci->i_release_count, 1); 394 atomic_set(&ci->i_release_count, 1);
393 atomic_set(&ci->i_complete_count, 0); 395 atomic_set(&ci->i_complete_count, 0);
394 ci->i_symlink = NULL; 396 ci->i_symlink = NULL;
@@ -657,7 +659,7 @@ void ceph_fill_file_time(struct inode *inode, int issued,
657 * Populate an inode based on info from mds. May be called on new or 659 * Populate an inode based on info from mds. May be called on new or
658 * existing inodes. 660 * existing inodes.
659 */ 661 */
660static int fill_inode(struct inode *inode, 662static int fill_inode(struct inode *inode, struct page *locked_page,
661 struct ceph_mds_reply_info_in *iinfo, 663 struct ceph_mds_reply_info_in *iinfo,
662 struct ceph_mds_reply_dirfrag *dirinfo, 664 struct ceph_mds_reply_dirfrag *dirinfo,
663 struct ceph_mds_session *session, 665 struct ceph_mds_session *session,
@@ -675,6 +677,7 @@ static int fill_inode(struct inode *inode,
675 bool wake = false; 677 bool wake = false;
676 bool queue_trunc = false; 678 bool queue_trunc = false;
677 bool new_version = false; 679 bool new_version = false;
680 bool fill_inline = false;
678 681
679 dout("fill_inode %p ino %llx.%llx v %llu had %llu\n", 682 dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
680 inode, ceph_vinop(inode), le64_to_cpu(info->version), 683 inode, ceph_vinop(inode), le64_to_cpu(info->version),
@@ -845,7 +848,8 @@ static int fill_inode(struct inode *inode,
845 (issued & CEPH_CAP_FILE_EXCL) == 0 && 848 (issued & CEPH_CAP_FILE_EXCL) == 0 &&
846 !__ceph_dir_is_complete(ci)) { 849 !__ceph_dir_is_complete(ci)) {
847 dout(" marking %p complete (empty)\n", inode); 850 dout(" marking %p complete (empty)\n", inode);
848 __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count)); 851 __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
852 ci->i_ordered_count);
849 } 853 }
850 854
851 /* were we issued a capability? */ 855 /* were we issued a capability? */
@@ -873,8 +877,23 @@ static int fill_inode(struct inode *inode,
873 ceph_vinop(inode)); 877 ceph_vinop(inode));
874 __ceph_get_fmode(ci, cap_fmode); 878 __ceph_get_fmode(ci, cap_fmode);
875 } 879 }
880
881 if (iinfo->inline_version > 0 &&
882 iinfo->inline_version >= ci->i_inline_version) {
883 int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
884 ci->i_inline_version = iinfo->inline_version;
885 if (ci->i_inline_version != CEPH_INLINE_NONE &&
886 (locked_page ||
887 (le32_to_cpu(info->cap.caps) & cache_caps)))
888 fill_inline = true;
889 }
890
876 spin_unlock(&ci->i_ceph_lock); 891 spin_unlock(&ci->i_ceph_lock);
877 892
893 if (fill_inline)
894 ceph_fill_inline_data(inode, locked_page,
895 iinfo->inline_data, iinfo->inline_len);
896
878 if (wake) 897 if (wake)
879 wake_up_all(&ci->i_cap_wq); 898 wake_up_all(&ci->i_cap_wq);
880 899
@@ -1062,7 +1081,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
1062 struct inode *dir = req->r_locked_dir; 1081 struct inode *dir = req->r_locked_dir;
1063 1082
1064 if (dir) { 1083 if (dir) {
1065 err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag, 1084 err = fill_inode(dir, NULL,
1085 &rinfo->diri, rinfo->dirfrag,
1066 session, req->r_request_started, -1, 1086 session, req->r_request_started, -1,
1067 &req->r_caps_reservation); 1087 &req->r_caps_reservation);
1068 if (err < 0) 1088 if (err < 0)
@@ -1132,7 +1152,7 @@ retry_lookup:
1132 } 1152 }
1133 req->r_target_inode = in; 1153 req->r_target_inode = in;
1134 1154
1135 err = fill_inode(in, &rinfo->targeti, NULL, 1155 err = fill_inode(in, req->r_locked_page, &rinfo->targeti, NULL,
1136 session, req->r_request_started, 1156 session, req->r_request_started,
1137 (!req->r_aborted && rinfo->head->result == 0) ? 1157 (!req->r_aborted && rinfo->head->result == 0) ?
1138 req->r_fmode : -1, 1158 req->r_fmode : -1,
@@ -1204,8 +1224,8 @@ retry_lookup:
1204 ceph_invalidate_dentry_lease(dn); 1224 ceph_invalidate_dentry_lease(dn);
1205 1225
1206 /* d_move screws up sibling dentries' offsets */ 1226 /* d_move screws up sibling dentries' offsets */
1207 ceph_dir_clear_complete(dir); 1227 ceph_dir_clear_ordered(dir);
1208 ceph_dir_clear_complete(olddir); 1228 ceph_dir_clear_ordered(olddir);
1209 1229
1210 dout("dn %p gets new offset %lld\n", req->r_old_dentry, 1230 dout("dn %p gets new offset %lld\n", req->r_old_dentry,
1211 ceph_dentry(req->r_old_dentry)->offset); 1231 ceph_dentry(req->r_old_dentry)->offset);
@@ -1217,6 +1237,7 @@ retry_lookup:
1217 if (!rinfo->head->is_target) { 1237 if (!rinfo->head->is_target) {
1218 dout("fill_trace null dentry\n"); 1238 dout("fill_trace null dentry\n");
1219 if (dn->d_inode) { 1239 if (dn->d_inode) {
1240 ceph_dir_clear_ordered(dir);
1220 dout("d_delete %p\n", dn); 1241 dout("d_delete %p\n", dn);
1221 d_delete(dn); 1242 d_delete(dn);
1222 } else { 1243 } else {
@@ -1233,7 +1254,7 @@ retry_lookup:
1233 1254
1234 /* attach proper inode */ 1255 /* attach proper inode */
1235 if (!dn->d_inode) { 1256 if (!dn->d_inode) {
1236 ceph_dir_clear_complete(dir); 1257 ceph_dir_clear_ordered(dir);
1237 ihold(in); 1258 ihold(in);
1238 dn = splice_dentry(dn, in, &have_lease); 1259 dn = splice_dentry(dn, in, &have_lease);
1239 if (IS_ERR(dn)) { 1260 if (IS_ERR(dn)) {
@@ -1263,7 +1284,7 @@ retry_lookup:
1263 BUG_ON(!dir); 1284 BUG_ON(!dir);
1264 BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR); 1285 BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
1265 dout(" linking snapped dir %p to dn %p\n", in, dn); 1286 dout(" linking snapped dir %p to dn %p\n", in, dn);
1266 ceph_dir_clear_complete(dir); 1287 ceph_dir_clear_ordered(dir);
1267 ihold(in); 1288 ihold(in);
1268 dn = splice_dentry(dn, in, NULL); 1289 dn = splice_dentry(dn, in, NULL);
1269 if (IS_ERR(dn)) { 1290 if (IS_ERR(dn)) {
@@ -1300,7 +1321,7 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
1300 dout("new_inode badness got %d\n", err); 1321 dout("new_inode badness got %d\n", err);
1301 continue; 1322 continue;
1302 } 1323 }
1303 rc = fill_inode(in, &rinfo->dir_in[i], NULL, session, 1324 rc = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
1304 req->r_request_started, -1, 1325 req->r_request_started, -1,
1305 &req->r_caps_reservation); 1326 &req->r_caps_reservation);
1306 if (rc < 0) { 1327 if (rc < 0) {
@@ -1416,7 +1437,7 @@ retry_lookup:
1416 } 1437 }
1417 } 1438 }
1418 1439
1419 if (fill_inode(in, &rinfo->dir_in[i], NULL, session, 1440 if (fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
1420 req->r_request_started, -1, 1441 req->r_request_started, -1,
1421 &req->r_caps_reservation) < 0) { 1442 &req->r_caps_reservation) < 0) {
1422 pr_err("fill_inode badness on %p\n", in); 1443 pr_err("fill_inode badness on %p\n", in);
@@ -1899,7 +1920,8 @@ out_put:
1899 * Verify that we have a lease on the given mask. If not, 1920 * Verify that we have a lease on the given mask. If not,
1900 * do a getattr against an mds. 1921 * do a getattr against an mds.
1901 */ 1922 */
1902int ceph_do_getattr(struct inode *inode, int mask, bool force) 1923int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
1924 int mask, bool force)
1903{ 1925{
1904 struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); 1926 struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
1905 struct ceph_mds_client *mdsc = fsc->mdsc; 1927 struct ceph_mds_client *mdsc = fsc->mdsc;
@@ -1911,7 +1933,8 @@ int ceph_do_getattr(struct inode *inode, int mask, bool force)
1911 return 0; 1933 return 0;
1912 } 1934 }
1913 1935
1914 dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode); 1936 dout("do_getattr inode %p mask %s mode 0%o\n",
1937 inode, ceph_cap_string(mask), inode->i_mode);
1915 if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1)) 1938 if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
1916 return 0; 1939 return 0;
1917 1940
@@ -1922,7 +1945,19 @@ int ceph_do_getattr(struct inode *inode, int mask, bool force)
1922 ihold(inode); 1945 ihold(inode);
1923 req->r_num_caps = 1; 1946 req->r_num_caps = 1;
1924 req->r_args.getattr.mask = cpu_to_le32(mask); 1947 req->r_args.getattr.mask = cpu_to_le32(mask);
1948 req->r_locked_page = locked_page;
1925 err = ceph_mdsc_do_request(mdsc, NULL, req); 1949 err = ceph_mdsc_do_request(mdsc, NULL, req);
1950 if (locked_page && err == 0) {
1951 u64 inline_version = req->r_reply_info.targeti.inline_version;
1952 if (inline_version == 0) {
1953 /* the reply is supposed to contain inline data */
1954 err = -EINVAL;
1955 } else if (inline_version == CEPH_INLINE_NONE) {
1956 err = -ENODATA;
1957 } else {
1958 err = req->r_reply_info.targeti.inline_len;
1959 }
1960 }
1926 ceph_mdsc_put_request(req); 1961 ceph_mdsc_put_request(req);
1927 dout("do_getattr result=%d\n", err); 1962 dout("do_getattr result=%d\n", err);
1928 return err; 1963 return err;
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index fbc39c47bacd..c35c5c614e38 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -9,6 +9,8 @@
9#include <linux/ceph/pagelist.h> 9#include <linux/ceph/pagelist.h>
10 10
11static u64 lock_secret; 11static u64 lock_secret;
12static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
13 struct ceph_mds_request *req);
12 14
13static inline u64 secure_addr(void *addr) 15static inline u64 secure_addr(void *addr)
14{ 16{
@@ -40,6 +42,9 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
40 u64 length = 0; 42 u64 length = 0;
41 u64 owner; 43 u64 owner;
42 44
45 if (operation != CEPH_MDS_OP_SETFILELOCK || cmd == CEPH_LOCK_UNLOCK)
46 wait = 0;
47
43 req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS); 48 req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS);
44 if (IS_ERR(req)) 49 if (IS_ERR(req))
45 return PTR_ERR(req); 50 return PTR_ERR(req);
@@ -68,6 +73,9 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
68 req->r_args.filelock_change.length = cpu_to_le64(length); 73 req->r_args.filelock_change.length = cpu_to_le64(length);
69 req->r_args.filelock_change.wait = wait; 74 req->r_args.filelock_change.wait = wait;
70 75
76 if (wait)
77 req->r_wait_for_completion = ceph_lock_wait_for_completion;
78
71 err = ceph_mdsc_do_request(mdsc, inode, req); 79 err = ceph_mdsc_do_request(mdsc, inode, req);
72 80
73 if (operation == CEPH_MDS_OP_GETFILELOCK) { 81 if (operation == CEPH_MDS_OP_GETFILELOCK) {
@@ -96,6 +104,52 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
96 return err; 104 return err;
97} 105}
98 106
107static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
108 struct ceph_mds_request *req)
109{
110 struct ceph_mds_request *intr_req;
111 struct inode *inode = req->r_inode;
112 int err, lock_type;
113
114 BUG_ON(req->r_op != CEPH_MDS_OP_SETFILELOCK);
115 if (req->r_args.filelock_change.rule == CEPH_LOCK_FCNTL)
116 lock_type = CEPH_LOCK_FCNTL_INTR;
117 else if (req->r_args.filelock_change.rule == CEPH_LOCK_FLOCK)
118 lock_type = CEPH_LOCK_FLOCK_INTR;
119 else
120 BUG_ON(1);
121 BUG_ON(req->r_args.filelock_change.type == CEPH_LOCK_UNLOCK);
122
123 err = wait_for_completion_interruptible(&req->r_completion);
124 if (!err)
125 return 0;
126
127 dout("ceph_lock_wait_for_completion: request %llu was interrupted\n",
128 req->r_tid);
129
130 intr_req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETFILELOCK,
131 USE_AUTH_MDS);
132 if (IS_ERR(intr_req))
133 return PTR_ERR(intr_req);
134
135 intr_req->r_inode = inode;
136 ihold(inode);
137 intr_req->r_num_caps = 1;
138
139 intr_req->r_args.filelock_change = req->r_args.filelock_change;
140 intr_req->r_args.filelock_change.rule = lock_type;
141 intr_req->r_args.filelock_change.type = CEPH_LOCK_UNLOCK;
142
143 err = ceph_mdsc_do_request(mdsc, inode, intr_req);
144 ceph_mdsc_put_request(intr_req);
145
146 if (err && err != -ERESTARTSYS)
147 return err;
148
149 wait_for_completion(&req->r_completion);
150 return 0;
151}
152
99/** 153/**
100 * Attempt to set an fcntl lock. 154 * Attempt to set an fcntl lock.
101 * For now, this just goes away to the server. Later it may be more awesome. 155 * For now, this just goes away to the server. Later it may be more awesome.
@@ -143,11 +197,6 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
143 err); 197 err);
144 } 198 }
145 } 199 }
146
147 } else if (err == -ERESTARTSYS) {
148 dout("undoing lock\n");
149 ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
150 CEPH_LOCK_UNLOCK, 0, fl);
151 } 200 }
152 return err; 201 return err;
153} 202}
@@ -186,11 +235,6 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
186 file, CEPH_LOCK_UNLOCK, 0, fl); 235 file, CEPH_LOCK_UNLOCK, 0, fl);
187 dout("got %d on flock_lock_file_wait, undid lock", err); 236 dout("got %d on flock_lock_file_wait, undid lock", err);
188 } 237 }
189 } else if (err == -ERESTARTSYS) {
190 dout("undoing lock\n");
191 ceph_lock_message(CEPH_LOCK_FLOCK,
192 CEPH_MDS_OP_SETFILELOCK,
193 file, CEPH_LOCK_UNLOCK, 0, fl);
194 } 238 }
195 return err; 239 return err;
196} 240}
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index a92d3f5c6c12..d2171f4a6980 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -89,6 +89,16 @@ static int parse_reply_info_in(void **p, void *end,
89 ceph_decode_need(p, end, info->xattr_len, bad); 89 ceph_decode_need(p, end, info->xattr_len, bad);
90 info->xattr_data = *p; 90 info->xattr_data = *p;
91 *p += info->xattr_len; 91 *p += info->xattr_len;
92
93 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
94 ceph_decode_64_safe(p, end, info->inline_version, bad);
95 ceph_decode_32_safe(p, end, info->inline_len, bad);
96 ceph_decode_need(p, end, info->inline_len, bad);
97 info->inline_data = *p;
98 *p += info->inline_len;
99 } else
100 info->inline_version = CEPH_INLINE_NONE;
101
92 return 0; 102 return 0;
93bad: 103bad:
94 return err; 104 return err;
@@ -524,8 +534,7 @@ void ceph_mdsc_release_request(struct kref *kref)
524 } 534 }
525 if (req->r_locked_dir) 535 if (req->r_locked_dir)
526 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 536 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
527 if (req->r_target_inode) 537 iput(req->r_target_inode);
528 iput(req->r_target_inode);
529 if (req->r_dentry) 538 if (req->r_dentry)
530 dput(req->r_dentry); 539 dput(req->r_dentry);
531 if (req->r_old_dentry) 540 if (req->r_old_dentry)
@@ -861,8 +870,11 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
861 /* 870 /*
862 * Serialize client metadata into waiting buffer space, using 871 * Serialize client metadata into waiting buffer space, using
863 * the format that userspace expects for map<string, string> 872 * the format that userspace expects for map<string, string>
873 *
874 * ClientSession messages with metadata are v2
864 */ 875 */
865 msg->hdr.version = 2; /* ClientSession messages with metadata are v2 */ 876 msg->hdr.version = cpu_to_le16(2);
877 msg->hdr.compat_version = cpu_to_le16(1);
866 878
867 /* The write pointer, following the session_head structure */ 879 /* The write pointer, following the session_head structure */
868 p = msg->front.iov_base + sizeof(*h); 880 p = msg->front.iov_base + sizeof(*h);
@@ -1066,8 +1078,7 @@ out:
1066 session->s_cap_iterator = NULL; 1078 session->s_cap_iterator = NULL;
1067 spin_unlock(&session->s_cap_lock); 1079 spin_unlock(&session->s_cap_lock);
1068 1080
1069 if (last_inode) 1081 iput(last_inode);
1070 iput(last_inode);
1071 if (old_cap) 1082 if (old_cap)
1072 ceph_put_cap(session->s_mdsc, old_cap); 1083 ceph_put_cap(session->s_mdsc, old_cap);
1073 1084
@@ -1874,7 +1885,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1874 goto out_free2; 1885 goto out_free2;
1875 } 1886 }
1876 1887
1877 msg->hdr.version = 2; 1888 msg->hdr.version = cpu_to_le16(2);
1878 msg->hdr.tid = cpu_to_le64(req->r_tid); 1889 msg->hdr.tid = cpu_to_le64(req->r_tid);
1879 1890
1880 head = msg->front.iov_base; 1891 head = msg->front.iov_base;
@@ -2208,6 +2219,8 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2208 &req->r_completion, req->r_timeout); 2219 &req->r_completion, req->r_timeout);
2209 if (err == 0) 2220 if (err == 0)
2210 err = -EIO; 2221 err = -EIO;
2222 } else if (req->r_wait_for_completion) {
2223 err = req->r_wait_for_completion(mdsc, req);
2211 } else { 2224 } else {
2212 err = wait_for_completion_killable(&req->r_completion); 2225 err = wait_for_completion_killable(&req->r_completion);
2213 } 2226 }
@@ -3744,6 +3757,20 @@ static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
3744 return msg; 3757 return msg;
3745} 3758}
3746 3759
3760static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
3761{
3762 struct ceph_mds_session *s = con->private;
3763 struct ceph_auth_handshake *auth = &s->s_auth;
3764 return ceph_auth_sign_message(auth, msg);
3765}
3766
3767static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
3768{
3769 struct ceph_mds_session *s = con->private;
3770 struct ceph_auth_handshake *auth = &s->s_auth;
3771 return ceph_auth_check_message_signature(auth, msg);
3772}
3773
3747static const struct ceph_connection_operations mds_con_ops = { 3774static const struct ceph_connection_operations mds_con_ops = {
3748 .get = con_get, 3775 .get = con_get,
3749 .put = con_put, 3776 .put = con_put,
@@ -3753,6 +3780,8 @@ static const struct ceph_connection_operations mds_con_ops = {
3753 .invalidate_authorizer = invalidate_authorizer, 3780 .invalidate_authorizer = invalidate_authorizer,
3754 .peer_reset = peer_reset, 3781 .peer_reset = peer_reset,
3755 .alloc_msg = mds_alloc_msg, 3782 .alloc_msg = mds_alloc_msg,
3783 .sign_message = sign_message,
3784 .check_message_signature = check_message_signature,
3756}; 3785};
3757 3786
3758/* eof */ 3787/* eof */
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 3288359353e9..e2817d00f7d9 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -41,6 +41,9 @@ struct ceph_mds_reply_info_in {
41 char *symlink; 41 char *symlink;
42 u32 xattr_len; 42 u32 xattr_len;
43 char *xattr_data; 43 char *xattr_data;
44 u64 inline_version;
45 u32 inline_len;
46 char *inline_data;
44}; 47};
45 48
46/* 49/*
@@ -166,6 +169,11 @@ struct ceph_mds_client;
166 */ 169 */
167typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc, 170typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc,
168 struct ceph_mds_request *req); 171 struct ceph_mds_request *req);
172/*
173 * wait for request completion callback
174 */
175typedef int (*ceph_mds_request_wait_callback_t) (struct ceph_mds_client *mdsc,
176 struct ceph_mds_request *req);
169 177
170/* 178/*
171 * an in-flight mds request 179 * an in-flight mds request
@@ -215,6 +223,7 @@ struct ceph_mds_request {
215 int r_request_release_offset; 223 int r_request_release_offset;
216 struct ceph_msg *r_reply; 224 struct ceph_msg *r_reply;
217 struct ceph_mds_reply_info_parsed r_reply_info; 225 struct ceph_mds_reply_info_parsed r_reply_info;
226 struct page *r_locked_page;
218 int r_err; 227 int r_err;
219 bool r_aborted; 228 bool r_aborted;
220 229
@@ -239,6 +248,7 @@ struct ceph_mds_request {
239 struct completion r_completion; 248 struct completion r_completion;
240 struct completion r_safe_completion; 249 struct completion r_safe_completion;
241 ceph_mds_request_callback_t r_callback; 250 ceph_mds_request_callback_t r_callback;
251 ceph_mds_request_wait_callback_t r_wait_for_completion;
242 struct list_head r_unsafe_item; /* per-session unsafe list item */ 252 struct list_head r_unsafe_item; /* per-session unsafe list item */
243 bool r_got_unsafe, r_got_safe, r_got_result; 253 bool r_got_unsafe, r_got_safe, r_got_result;
244 254
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index f01645a27752..ce35fbd4ba5d 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -288,6 +288,9 @@ static int cmpu64_rev(const void *a, const void *b)
288 return 0; 288 return 0;
289} 289}
290 290
291
292static struct ceph_snap_context *empty_snapc;
293
291/* 294/*
292 * build the snap context for a given realm. 295 * build the snap context for a given realm.
293 */ 296 */
@@ -328,6 +331,12 @@ static int build_snap_context(struct ceph_snap_realm *realm)
328 return 0; 331 return 0;
329 } 332 }
330 333
334 if (num == 0 && realm->seq == empty_snapc->seq) {
335 ceph_get_snap_context(empty_snapc);
336 snapc = empty_snapc;
337 goto done;
338 }
339
331 /* alloc new snap context */ 340 /* alloc new snap context */
332 err = -ENOMEM; 341 err = -ENOMEM;
333 if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64)) 342 if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64))
@@ -365,8 +374,8 @@ static int build_snap_context(struct ceph_snap_realm *realm)
365 realm->ino, realm, snapc, snapc->seq, 374 realm->ino, realm, snapc, snapc->seq,
366 (unsigned int) snapc->num_snaps); 375 (unsigned int) snapc->num_snaps);
367 376
368 if (realm->cached_context) 377done:
369 ceph_put_snap_context(realm->cached_context); 378 ceph_put_snap_context(realm->cached_context);
370 realm->cached_context = snapc; 379 realm->cached_context = snapc;
371 return 0; 380 return 0;
372 381
@@ -466,6 +475,9 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
466 cap_snap. lucky us. */ 475 cap_snap. lucky us. */
467 dout("queue_cap_snap %p already pending\n", inode); 476 dout("queue_cap_snap %p already pending\n", inode);
468 kfree(capsnap); 477 kfree(capsnap);
478 } else if (ci->i_snap_realm->cached_context == empty_snapc) {
479 dout("queue_cap_snap %p empty snapc\n", inode);
480 kfree(capsnap);
469 } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL| 481 } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
470 CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) { 482 CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
471 struct ceph_snap_context *snapc = ci->i_head_snapc; 483 struct ceph_snap_context *snapc = ci->i_head_snapc;
@@ -504,6 +516,8 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
504 capsnap->xattr_version = 0; 516 capsnap->xattr_version = 0;
505 } 517 }
506 518
519 capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
520
507 /* dirty page count moved from _head to this cap_snap; 521 /* dirty page count moved from _head to this cap_snap;
508 all subsequent writes page dirties occur _after_ this 522 all subsequent writes page dirties occur _after_ this
509 snapshot. */ 523 snapshot. */
@@ -590,15 +604,13 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
590 if (!inode) 604 if (!inode)
591 continue; 605 continue;
592 spin_unlock(&realm->inodes_with_caps_lock); 606 spin_unlock(&realm->inodes_with_caps_lock);
593 if (lastinode) 607 iput(lastinode);
594 iput(lastinode);
595 lastinode = inode; 608 lastinode = inode;
596 ceph_queue_cap_snap(ci); 609 ceph_queue_cap_snap(ci);
597 spin_lock(&realm->inodes_with_caps_lock); 610 spin_lock(&realm->inodes_with_caps_lock);
598 } 611 }
599 spin_unlock(&realm->inodes_with_caps_lock); 612 spin_unlock(&realm->inodes_with_caps_lock);
600 if (lastinode) 613 iput(lastinode);
601 iput(lastinode);
602 614
603 list_for_each_entry(child, &realm->children, child_item) { 615 list_for_each_entry(child, &realm->children, child_item) {
604 dout("queue_realm_cap_snaps %p %llx queue child %p %llx\n", 616 dout("queue_realm_cap_snaps %p %llx queue child %p %llx\n",
@@ -928,5 +940,16 @@ out:
928 return; 940 return;
929} 941}
930 942
943int __init ceph_snap_init(void)
944{
945 empty_snapc = ceph_create_snap_context(0, GFP_NOFS);
946 if (!empty_snapc)
947 return -ENOMEM;
948 empty_snapc->seq = 1;
949 return 0;
950}
931 951
932 952void ceph_snap_exit(void)
953{
954 ceph_put_snap_context(empty_snapc);
955}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index f6e12377335c..50f06cddc94b 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -515,7 +515,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
515 struct ceph_fs_client *fsc; 515 struct ceph_fs_client *fsc;
516 const u64 supported_features = 516 const u64 supported_features =
517 CEPH_FEATURE_FLOCK | 517 CEPH_FEATURE_FLOCK |
518 CEPH_FEATURE_DIRLAYOUTHASH; 518 CEPH_FEATURE_DIRLAYOUTHASH |
519 CEPH_FEATURE_MDS_INLINE_DATA;
519 const u64 required_features = 0; 520 const u64 required_features = 0;
520 int page_count; 521 int page_count;
521 size_t size; 522 size_t size;
@@ -1017,9 +1018,6 @@ static struct file_system_type ceph_fs_type = {
1017}; 1018};
1018MODULE_ALIAS_FS("ceph"); 1019MODULE_ALIAS_FS("ceph");
1019 1020
1020#define _STRINGIFY(x) #x
1021#define STRINGIFY(x) _STRINGIFY(x)
1022
1023static int __init init_ceph(void) 1021static int __init init_ceph(void)
1024{ 1022{
1025 int ret = init_caches(); 1023 int ret = init_caches();
@@ -1028,15 +1026,20 @@ static int __init init_ceph(void)
1028 1026
1029 ceph_flock_init(); 1027 ceph_flock_init();
1030 ceph_xattr_init(); 1028 ceph_xattr_init();
1029 ret = ceph_snap_init();
1030 if (ret)
1031 goto out_xattr;
1031 ret = register_filesystem(&ceph_fs_type); 1032 ret = register_filesystem(&ceph_fs_type);
1032 if (ret) 1033 if (ret)
1033 goto out_icache; 1034 goto out_snap;
1034 1035
1035 pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL); 1036 pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL);
1036 1037
1037 return 0; 1038 return 0;
1038 1039
1039out_icache: 1040out_snap:
1041 ceph_snap_exit();
1042out_xattr:
1040 ceph_xattr_exit(); 1043 ceph_xattr_exit();
1041 destroy_caches(); 1044 destroy_caches();
1042out: 1045out:
@@ -1047,6 +1050,7 @@ static void __exit exit_ceph(void)
1047{ 1050{
1048 dout("exit_ceph\n"); 1051 dout("exit_ceph\n");
1049 unregister_filesystem(&ceph_fs_type); 1052 unregister_filesystem(&ceph_fs_type);
1053 ceph_snap_exit();
1050 ceph_xattr_exit(); 1054 ceph_xattr_exit();
1051 destroy_caches(); 1055 destroy_caches();
1052} 1056}
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index b82f507979b8..e1aa32d0759d 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -161,6 +161,7 @@ struct ceph_cap_snap {
161 u64 time_warp_seq; 161 u64 time_warp_seq;
162 int writing; /* a sync write is still in progress */ 162 int writing; /* a sync write is still in progress */
163 int dirty_pages; /* dirty pages awaiting writeback */ 163 int dirty_pages; /* dirty pages awaiting writeback */
164 bool inline_data;
164}; 165};
165 166
166static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) 167static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
@@ -253,9 +254,11 @@ struct ceph_inode_info {
253 spinlock_t i_ceph_lock; 254 spinlock_t i_ceph_lock;
254 255
255 u64 i_version; 256 u64 i_version;
257 u64 i_inline_version;
256 u32 i_time_warp_seq; 258 u32 i_time_warp_seq;
257 259
258 unsigned i_ceph_flags; 260 unsigned i_ceph_flags;
261 int i_ordered_count;
259 atomic_t i_release_count; 262 atomic_t i_release_count;
260 atomic_t i_complete_count; 263 atomic_t i_complete_count;
261 264
@@ -434,14 +437,19 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
434/* 437/*
435 * Ceph inode. 438 * Ceph inode.
436 */ 439 */
437#define CEPH_I_NODELAY 4 /* do not delay cap release */ 440#define CEPH_I_DIR_ORDERED 1 /* dentries in dir are ordered */
438#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */ 441#define CEPH_I_NODELAY 4 /* do not delay cap release */
439#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */ 442#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */
443#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */
440 444
441static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, 445static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
442 int release_count) 446 int release_count, int ordered_count)
443{ 447{
444 atomic_set(&ci->i_complete_count, release_count); 448 atomic_set(&ci->i_complete_count, release_count);
449 if (ci->i_ordered_count == ordered_count)
450 ci->i_ceph_flags |= CEPH_I_DIR_ORDERED;
451 else
452 ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
445} 453}
446 454
447static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci) 455static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci)
@@ -455,16 +463,35 @@ static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci)
455 atomic_read(&ci->i_release_count); 463 atomic_read(&ci->i_release_count);
456} 464}
457 465
466static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci)
467{
468 return __ceph_dir_is_complete(ci) &&
469 (ci->i_ceph_flags & CEPH_I_DIR_ORDERED);
470}
471
458static inline void ceph_dir_clear_complete(struct inode *inode) 472static inline void ceph_dir_clear_complete(struct inode *inode)
459{ 473{
460 __ceph_dir_clear_complete(ceph_inode(inode)); 474 __ceph_dir_clear_complete(ceph_inode(inode));
461} 475}
462 476
463static inline bool ceph_dir_is_complete(struct inode *inode) 477static inline void ceph_dir_clear_ordered(struct inode *inode)
464{ 478{
465 return __ceph_dir_is_complete(ceph_inode(inode)); 479 struct ceph_inode_info *ci = ceph_inode(inode);
480 spin_lock(&ci->i_ceph_lock);
481 ci->i_ordered_count++;
482 ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
483 spin_unlock(&ci->i_ceph_lock);
466} 484}
467 485
486static inline bool ceph_dir_is_complete_ordered(struct inode *inode)
487{
488 struct ceph_inode_info *ci = ceph_inode(inode);
489 bool ret;
490 spin_lock(&ci->i_ceph_lock);
491 ret = __ceph_dir_is_complete_ordered(ci);
492 spin_unlock(&ci->i_ceph_lock);
493 return ret;
494}
468 495
469/* find a specific frag @f */ 496/* find a specific frag @f */
470extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, 497extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci,
@@ -580,6 +607,7 @@ struct ceph_file_info {
580 char *last_name; /* last entry in previous chunk */ 607 char *last_name; /* last entry in previous chunk */
581 struct dentry *dentry; /* next dentry (for dcache readdir) */ 608 struct dentry *dentry; /* next dentry (for dcache readdir) */
582 int dir_release_count; 609 int dir_release_count;
610 int dir_ordered_count;
583 611
584 /* used for -o dirstat read() on directory thing */ 612 /* used for -o dirstat read() on directory thing */
585 char *dir_info; 613 char *dir_info;
@@ -673,6 +701,8 @@ extern void ceph_queue_cap_snap(struct ceph_inode_info *ci);
673extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci, 701extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
674 struct ceph_cap_snap *capsnap); 702 struct ceph_cap_snap *capsnap);
675extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc); 703extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
704extern int ceph_snap_init(void);
705extern void ceph_snap_exit(void);
676 706
677/* 707/*
678 * a cap_snap is "pending" if it is still awaiting an in-progress 708 * a cap_snap is "pending" if it is still awaiting an in-progress
@@ -715,7 +745,12 @@ extern void ceph_queue_vmtruncate(struct inode *inode);
715extern void ceph_queue_invalidate(struct inode *inode); 745extern void ceph_queue_invalidate(struct inode *inode);
716extern void ceph_queue_writeback(struct inode *inode); 746extern void ceph_queue_writeback(struct inode *inode);
717 747
718extern int ceph_do_getattr(struct inode *inode, int mask, bool force); 748extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
749 int mask, bool force);
750static inline int ceph_do_getattr(struct inode *inode, int mask, bool force)
751{
752 return __ceph_do_getattr(inode, NULL, mask, force);
753}
719extern int ceph_permission(struct inode *inode, int mask); 754extern int ceph_permission(struct inode *inode, int mask);
720extern int ceph_setattr(struct dentry *dentry, struct iattr *attr); 755extern int ceph_setattr(struct dentry *dentry, struct iattr *attr);
721extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry, 756extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
@@ -830,7 +865,7 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
830 int mds, int drop, int unless); 865 int mds, int drop, int unless);
831 866
832extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, 867extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
833 int *got, loff_t endoff); 868 loff_t endoff, int *got, struct page **pinned_page);
834 869
835/* for counting open files by mode */ 870/* for counting open files by mode */
836static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode) 871static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode)
@@ -852,7 +887,9 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
852 struct file *file, unsigned flags, umode_t mode, 887 struct file *file, unsigned flags, umode_t mode,
853 int *opened); 888 int *opened);
854extern int ceph_release(struct inode *inode, struct file *filp); 889extern int ceph_release(struct inode *inode, struct file *filp);
855 890extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
891 char *data, size_t len);
892int ceph_uninline_data(struct file *filp, struct page *locked_page);
856/* dir.c */ 893/* dir.c */
857extern const struct file_operations ceph_dir_fops; 894extern const struct file_operations ceph_dir_fops;
858extern const struct inode_operations ceph_dir_iops; 895extern const struct inode_operations ceph_dir_iops;
diff --git a/fs/ceph/super.h.rej b/fs/ceph/super.h.rej
new file mode 100644
index 000000000000..88fe3dfadb29
--- /dev/null
+++ b/fs/ceph/super.h.rej
@@ -0,0 +1,10 @@
1--- fs/ceph/super.h
2+++ fs/ceph/super.h
3@@ -254,6 +255,7 @@
4 spinlock_t i_ceph_lock;
5
6 u64 i_version;
7+ u64 i_inline_version;
8 u32 i_time_warp_seq;
9
10 unsigned i_ceph_flags;
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 678b0d2bbbc4..5a492caf34cb 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -854,7 +854,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
854 struct ceph_pagelist *pagelist = NULL; 854 struct ceph_pagelist *pagelist = NULL;
855 int err; 855 int err;
856 856
857 if (value) { 857 if (size > 0) {
858 /* copy value into pagelist */ 858 /* copy value into pagelist */
859 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 859 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
860 if (!pagelist) 860 if (!pagelist)
@@ -864,7 +864,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
864 err = ceph_pagelist_append(pagelist, value, size); 864 err = ceph_pagelist_append(pagelist, value, size);
865 if (err) 865 if (err)
866 goto out; 866 goto out;
867 } else { 867 } else if (!value) {
868 flags |= CEPH_XATTR_REMOVE; 868 flags |= CEPH_XATTR_REMOVE;
869 } 869 }
870 870
@@ -1001,6 +1001,9 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
1001 if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN)) 1001 if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
1002 return generic_setxattr(dentry, name, value, size, flags); 1002 return generic_setxattr(dentry, name, value, size, flags);
1003 1003
1004 if (size == 0)
1005 value = ""; /* empty EA, do not remove */
1006
1004 return __ceph_setxattr(dentry, name, value, size, flags); 1007 return __ceph_setxattr(dentry, name, value, size, flags);
1005} 1008}
1006 1009