summaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/addr.c273
-rw-r--r--fs/ceph/caps.c132
-rw-r--r--fs/ceph/dir.c27
-rw-r--r--fs/ceph/file.c97
-rw-r--r--fs/ceph/inode.c59
-rw-r--r--fs/ceph/locks.c64
-rw-r--r--fs/ceph/mds_client.c41
-rw-r--r--fs/ceph/mds_client.h10
-rw-r--r--fs/ceph/snap.c37
-rw-r--r--fs/ceph/super.c16
-rw-r--r--fs/ceph/super.h55
-rw-r--r--fs/ceph/super.h.rej10
-rw-r--r--fs/ceph/xattr.c7
13 files changed, 712 insertions, 116 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 18c06bbaf136..f5013d92a7e6 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -192,17 +192,30 @@ static int readpage_nounlock(struct file *filp, struct page *page)
192 struct ceph_osd_client *osdc = 192 struct ceph_osd_client *osdc =
193 &ceph_inode_to_client(inode)->client->osdc; 193 &ceph_inode_to_client(inode)->client->osdc;
194 int err = 0; 194 int err = 0;
195 u64 off = page_offset(page);
195 u64 len = PAGE_CACHE_SIZE; 196 u64 len = PAGE_CACHE_SIZE;
196 197
197 err = ceph_readpage_from_fscache(inode, page); 198 if (off >= i_size_read(inode)) {
199 zero_user_segment(page, err, PAGE_CACHE_SIZE);
200 SetPageUptodate(page);
201 return 0;
202 }
198 203
204 /*
205 * Uptodate inline data should have been added into page cache
206 * while getting Fcr caps.
207 */
208 if (ci->i_inline_version != CEPH_INLINE_NONE)
209 return -EINVAL;
210
211 err = ceph_readpage_from_fscache(inode, page);
199 if (err == 0) 212 if (err == 0)
200 goto out; 213 goto out;
201 214
202 dout("readpage inode %p file %p page %p index %lu\n", 215 dout("readpage inode %p file %p page %p index %lu\n",
203 inode, filp, page, page->index); 216 inode, filp, page, page->index);
204 err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, 217 err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
205 (u64) page_offset(page), &len, 218 off, &len,
206 ci->i_truncate_seq, ci->i_truncate_size, 219 ci->i_truncate_seq, ci->i_truncate_size,
207 &page, 1, 0); 220 &page, 1, 0);
208 if (err == -ENOENT) 221 if (err == -ENOENT)
@@ -319,7 +332,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
319 off, len); 332 off, len);
320 vino = ceph_vino(inode); 333 vino = ceph_vino(inode);
321 req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, 334 req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
322 1, CEPH_OSD_OP_READ, 335 0, 1, CEPH_OSD_OP_READ,
323 CEPH_OSD_FLAG_READ, NULL, 336 CEPH_OSD_FLAG_READ, NULL,
324 ci->i_truncate_seq, ci->i_truncate_size, 337 ci->i_truncate_seq, ci->i_truncate_size,
325 false); 338 false);
@@ -384,6 +397,9 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
384 int rc = 0; 397 int rc = 0;
385 int max = 0; 398 int max = 0;
386 399
400 if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
401 return -EINVAL;
402
387 rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list, 403 rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
388 &nr_pages); 404 &nr_pages);
389 405
@@ -673,7 +689,7 @@ static int ceph_writepages_start(struct address_space *mapping,
673 int rc = 0; 689 int rc = 0;
674 unsigned wsize = 1 << inode->i_blkbits; 690 unsigned wsize = 1 << inode->i_blkbits;
675 struct ceph_osd_request *req = NULL; 691 struct ceph_osd_request *req = NULL;
676 int do_sync; 692 int do_sync = 0;
677 u64 truncate_size, snap_size; 693 u64 truncate_size, snap_size;
678 u32 truncate_seq; 694 u32 truncate_seq;
679 695
@@ -750,7 +766,6 @@ retry:
750 last_snapc = snapc; 766 last_snapc = snapc;
751 767
752 while (!done && index <= end) { 768 while (!done && index <= end) {
753 int num_ops = do_sync ? 2 : 1;
754 unsigned i; 769 unsigned i;
755 int first; 770 int first;
756 pgoff_t next; 771 pgoff_t next;
@@ -850,7 +865,8 @@ get_more_pages:
850 len = wsize; 865 len = wsize;
851 req = ceph_osdc_new_request(&fsc->client->osdc, 866 req = ceph_osdc_new_request(&fsc->client->osdc,
852 &ci->i_layout, vino, 867 &ci->i_layout, vino,
853 offset, &len, num_ops, 868 offset, &len, 0,
869 do_sync ? 2 : 1,
854 CEPH_OSD_OP_WRITE, 870 CEPH_OSD_OP_WRITE,
855 CEPH_OSD_FLAG_WRITE | 871 CEPH_OSD_FLAG_WRITE |
856 CEPH_OSD_FLAG_ONDISK, 872 CEPH_OSD_FLAG_ONDISK,
@@ -862,6 +878,9 @@ get_more_pages:
862 break; 878 break;
863 } 879 }
864 880
881 if (do_sync)
882 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
883
865 req->r_callback = writepages_finish; 884 req->r_callback = writepages_finish;
866 req->r_inode = inode; 885 req->r_inode = inode;
867 886
@@ -1204,6 +1223,7 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1204 struct inode *inode = file_inode(vma->vm_file); 1223 struct inode *inode = file_inode(vma->vm_file);
1205 struct ceph_inode_info *ci = ceph_inode(inode); 1224 struct ceph_inode_info *ci = ceph_inode(inode);
1206 struct ceph_file_info *fi = vma->vm_file->private_data; 1225 struct ceph_file_info *fi = vma->vm_file->private_data;
1226 struct page *pinned_page = NULL;
1207 loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT; 1227 loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT;
1208 int want, got, ret; 1228 int want, got, ret;
1209 1229
@@ -1215,7 +1235,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1215 want = CEPH_CAP_FILE_CACHE; 1235 want = CEPH_CAP_FILE_CACHE;
1216 while (1) { 1236 while (1) {
1217 got = 0; 1237 got = 0;
1218 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); 1238 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want,
1239 -1, &got, &pinned_page);
1219 if (ret == 0) 1240 if (ret == 0)
1220 break; 1241 break;
1221 if (ret != -ERESTARTSYS) { 1242 if (ret != -ERESTARTSYS) {
@@ -1226,12 +1247,54 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1226 dout("filemap_fault %p %llu~%zd got cap refs on %s\n", 1247 dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
1227 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got)); 1248 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got));
1228 1249
1229 ret = filemap_fault(vma, vmf); 1250 if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
1251 ci->i_inline_version == CEPH_INLINE_NONE)
1252 ret = filemap_fault(vma, vmf);
1253 else
1254 ret = -EAGAIN;
1230 1255
1231 dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", 1256 dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
1232 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret); 1257 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret);
1258 if (pinned_page)
1259 page_cache_release(pinned_page);
1233 ceph_put_cap_refs(ci, got); 1260 ceph_put_cap_refs(ci, got);
1234 1261
1262 if (ret != -EAGAIN)
1263 return ret;
1264
1265 /* read inline data */
1266 if (off >= PAGE_CACHE_SIZE) {
1267 /* does not support inline data > PAGE_SIZE */
1268 ret = VM_FAULT_SIGBUS;
1269 } else {
1270 int ret1;
1271 struct address_space *mapping = inode->i_mapping;
1272 struct page *page = find_or_create_page(mapping, 0,
1273 mapping_gfp_mask(mapping) &
1274 ~__GFP_FS);
1275 if (!page) {
1276 ret = VM_FAULT_OOM;
1277 goto out;
1278 }
1279 ret1 = __ceph_do_getattr(inode, page,
1280 CEPH_STAT_CAP_INLINE_DATA, true);
1281 if (ret1 < 0 || off >= i_size_read(inode)) {
1282 unlock_page(page);
1283 page_cache_release(page);
1284 ret = VM_FAULT_SIGBUS;
1285 goto out;
1286 }
1287 if (ret1 < PAGE_CACHE_SIZE)
1288 zero_user_segment(page, ret1, PAGE_CACHE_SIZE);
1289 else
1290 flush_dcache_page(page);
1291 SetPageUptodate(page);
1292 vmf->page = page;
1293 ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
1294 }
1295out:
1296 dout("filemap_fault %p %llu~%zd read inline data ret %d\n",
1297 inode, off, (size_t)PAGE_CACHE_SIZE, ret);
1235 return ret; 1298 return ret;
1236} 1299}
1237 1300
@@ -1250,6 +1313,19 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1250 size_t len; 1313 size_t len;
1251 int want, got, ret; 1314 int want, got, ret;
1252 1315
1316 if (ci->i_inline_version != CEPH_INLINE_NONE) {
1317 struct page *locked_page = NULL;
1318 if (off == 0) {
1319 lock_page(page);
1320 locked_page = page;
1321 }
1322 ret = ceph_uninline_data(vma->vm_file, locked_page);
1323 if (locked_page)
1324 unlock_page(locked_page);
1325 if (ret < 0)
1326 return VM_FAULT_SIGBUS;
1327 }
1328
1253 if (off + PAGE_CACHE_SIZE <= size) 1329 if (off + PAGE_CACHE_SIZE <= size)
1254 len = PAGE_CACHE_SIZE; 1330 len = PAGE_CACHE_SIZE;
1255 else 1331 else
@@ -1263,7 +1339,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1263 want = CEPH_CAP_FILE_BUFFER; 1339 want = CEPH_CAP_FILE_BUFFER;
1264 while (1) { 1340 while (1) {
1265 got = 0; 1341 got = 0;
1266 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len); 1342 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
1343 &got, NULL);
1267 if (ret == 0) 1344 if (ret == 0)
1268 break; 1345 break;
1269 if (ret != -ERESTARTSYS) { 1346 if (ret != -ERESTARTSYS) {
@@ -1297,11 +1374,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1297 ret = VM_FAULT_SIGBUS; 1374 ret = VM_FAULT_SIGBUS;
1298 } 1375 }
1299out: 1376out:
1300 if (ret != VM_FAULT_LOCKED) { 1377 if (ret != VM_FAULT_LOCKED)
1301 unlock_page(page); 1378 unlock_page(page);
1302 } else { 1379 if (ret == VM_FAULT_LOCKED ||
1380 ci->i_inline_version != CEPH_INLINE_NONE) {
1303 int dirty; 1381 int dirty;
1304 spin_lock(&ci->i_ceph_lock); 1382 spin_lock(&ci->i_ceph_lock);
1383 ci->i_inline_version = CEPH_INLINE_NONE;
1305 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); 1384 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
1306 spin_unlock(&ci->i_ceph_lock); 1385 spin_unlock(&ci->i_ceph_lock);
1307 if (dirty) 1386 if (dirty)
@@ -1315,6 +1394,178 @@ out:
1315 return ret; 1394 return ret;
1316} 1395}
1317 1396
1397void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
1398 char *data, size_t len)
1399{
1400 struct address_space *mapping = inode->i_mapping;
1401 struct page *page;
1402
1403 if (locked_page) {
1404 page = locked_page;
1405 } else {
1406 if (i_size_read(inode) == 0)
1407 return;
1408 page = find_or_create_page(mapping, 0,
1409 mapping_gfp_mask(mapping) & ~__GFP_FS);
1410 if (!page)
1411 return;
1412 if (PageUptodate(page)) {
1413 unlock_page(page);
1414 page_cache_release(page);
1415 return;
1416 }
1417 }
1418
1419 dout("fill_inline_data %p %llx.%llx len %lu locked_page %p\n",
1420 inode, ceph_vinop(inode), len, locked_page);
1421
1422 if (len > 0) {
1423 void *kaddr = kmap_atomic(page);
1424 memcpy(kaddr, data, len);
1425 kunmap_atomic(kaddr);
1426 }
1427
1428 if (page != locked_page) {
1429 if (len < PAGE_CACHE_SIZE)
1430 zero_user_segment(page, len, PAGE_CACHE_SIZE);
1431 else
1432 flush_dcache_page(page);
1433
1434 SetPageUptodate(page);
1435 unlock_page(page);
1436 page_cache_release(page);
1437 }
1438}
1439
1440int ceph_uninline_data(struct file *filp, struct page *locked_page)
1441{
1442 struct inode *inode = file_inode(filp);
1443 struct ceph_inode_info *ci = ceph_inode(inode);
1444 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1445 struct ceph_osd_request *req;
1446 struct page *page = NULL;
1447 u64 len, inline_version;
1448 int err = 0;
1449 bool from_pagecache = false;
1450
1451 spin_lock(&ci->i_ceph_lock);
1452 inline_version = ci->i_inline_version;
1453 spin_unlock(&ci->i_ceph_lock);
1454
1455 dout("uninline_data %p %llx.%llx inline_version %llu\n",
1456 inode, ceph_vinop(inode), inline_version);
1457
1458 if (inline_version == 1 || /* initial version, no data */
1459 inline_version == CEPH_INLINE_NONE)
1460 goto out;
1461
1462 if (locked_page) {
1463 page = locked_page;
1464 WARN_ON(!PageUptodate(page));
1465 } else if (ceph_caps_issued(ci) &
1466 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
1467 page = find_get_page(inode->i_mapping, 0);
1468 if (page) {
1469 if (PageUptodate(page)) {
1470 from_pagecache = true;
1471 lock_page(page);
1472 } else {
1473 page_cache_release(page);
1474 page = NULL;
1475 }
1476 }
1477 }
1478
1479 if (page) {
1480 len = i_size_read(inode);
1481 if (len > PAGE_CACHE_SIZE)
1482 len = PAGE_CACHE_SIZE;
1483 } else {
1484 page = __page_cache_alloc(GFP_NOFS);
1485 if (!page) {
1486 err = -ENOMEM;
1487 goto out;
1488 }
1489 err = __ceph_do_getattr(inode, page,
1490 CEPH_STAT_CAP_INLINE_DATA, true);
1491 if (err < 0) {
1492 /* no inline data */
1493 if (err == -ENODATA)
1494 err = 0;
1495 goto out;
1496 }
1497 len = err;
1498 }
1499
1500 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1501 ceph_vino(inode), 0, &len, 0, 1,
1502 CEPH_OSD_OP_CREATE,
1503 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1504 ci->i_snap_realm->cached_context,
1505 0, 0, false);
1506 if (IS_ERR(req)) {
1507 err = PTR_ERR(req);
1508 goto out;
1509 }
1510
1511 ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
1512 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1513 if (!err)
1514 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1515 ceph_osdc_put_request(req);
1516 if (err < 0)
1517 goto out;
1518
1519 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1520 ceph_vino(inode), 0, &len, 1, 3,
1521 CEPH_OSD_OP_WRITE,
1522 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1523 ci->i_snap_realm->cached_context,
1524 ci->i_truncate_seq, ci->i_truncate_size,
1525 false);
1526 if (IS_ERR(req)) {
1527 err = PTR_ERR(req);
1528 goto out;
1529 }
1530
1531 osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
1532
1533 err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
1534 "inline_version", &inline_version,
1535 sizeof(inline_version),
1536 CEPH_OSD_CMPXATTR_OP_GT,
1537 CEPH_OSD_CMPXATTR_MODE_U64);
1538 if (err)
1539 goto out_put;
1540
1541 err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
1542 "inline_version", &inline_version,
1543 sizeof(inline_version), 0, 0);
1544 if (err)
1545 goto out_put;
1546
1547 ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
1548 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1549 if (!err)
1550 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1551out_put:
1552 ceph_osdc_put_request(req);
1553 if (err == -ECANCELED)
1554 err = 0;
1555out:
1556 if (page && page != locked_page) {
1557 if (from_pagecache) {
1558 unlock_page(page);
1559 page_cache_release(page);
1560 } else
1561 __free_pages(page, 0);
1562 }
1563
1564 dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
1565 inode, ceph_vinop(inode), inline_version, err);
1566 return err;
1567}
1568
1318static struct vm_operations_struct ceph_vmops = { 1569static struct vm_operations_struct ceph_vmops = {
1319 .fault = ceph_filemap_fault, 1570 .fault = ceph_filemap_fault,
1320 .page_mkwrite = ceph_page_mkwrite, 1571 .page_mkwrite = ceph_page_mkwrite,
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index cefca661464b..b93c631c6c87 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -975,10 +975,12 @@ static int send_cap_msg(struct ceph_mds_session *session,
975 kuid_t uid, kgid_t gid, umode_t mode, 975 kuid_t uid, kgid_t gid, umode_t mode,
976 u64 xattr_version, 976 u64 xattr_version,
977 struct ceph_buffer *xattrs_buf, 977 struct ceph_buffer *xattrs_buf,
978 u64 follows) 978 u64 follows, bool inline_data)
979{ 979{
980 struct ceph_mds_caps *fc; 980 struct ceph_mds_caps *fc;
981 struct ceph_msg *msg; 981 struct ceph_msg *msg;
982 void *p;
983 size_t extra_len;
982 984
983 dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" 985 dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
984 " seq %u/%u mseq %u follows %lld size %llu/%llu" 986 " seq %u/%u mseq %u follows %lld size %llu/%llu"
@@ -988,7 +990,10 @@ static int send_cap_msg(struct ceph_mds_session *session,
988 seq, issue_seq, mseq, follows, size, max_size, 990 seq, issue_seq, mseq, follows, size, max_size,
989 xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); 991 xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
990 992
991 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false); 993 /* flock buffer size + inline version + inline data size */
994 extra_len = 4 + 8 + 4;
995 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
996 GFP_NOFS, false);
992 if (!msg) 997 if (!msg)
993 return -ENOMEM; 998 return -ENOMEM;
994 999
@@ -1020,6 +1025,14 @@ static int send_cap_msg(struct ceph_mds_session *session,
1020 fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid)); 1025 fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid));
1021 fc->mode = cpu_to_le32(mode); 1026 fc->mode = cpu_to_le32(mode);
1022 1027
1028 p = fc + 1;
1029 /* flock buffer size */
1030 ceph_encode_32(&p, 0);
1031 /* inline version */
1032 ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE);
1033 /* inline data size */
1034 ceph_encode_32(&p, 0);
1035
1023 fc->xattr_version = cpu_to_le64(xattr_version); 1036 fc->xattr_version = cpu_to_le64(xattr_version);
1024 if (xattrs_buf) { 1037 if (xattrs_buf) {
1025 msg->middle = ceph_buffer_get(xattrs_buf); 1038 msg->middle = ceph_buffer_get(xattrs_buf);
@@ -1126,6 +1139,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1126 u64 flush_tid = 0; 1139 u64 flush_tid = 0;
1127 int i; 1140 int i;
1128 int ret; 1141 int ret;
1142 bool inline_data;
1129 1143
1130 held = cap->issued | cap->implemented; 1144 held = cap->issued | cap->implemented;
1131 revoking = cap->implemented & ~cap->issued; 1145 revoking = cap->implemented & ~cap->issued;
@@ -1209,13 +1223,15 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1209 xattr_version = ci->i_xattrs.version; 1223 xattr_version = ci->i_xattrs.version;
1210 } 1224 }
1211 1225
1226 inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
1227
1212 spin_unlock(&ci->i_ceph_lock); 1228 spin_unlock(&ci->i_ceph_lock);
1213 1229
1214 ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, 1230 ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
1215 op, keep, want, flushing, seq, flush_tid, issue_seq, mseq, 1231 op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
1216 size, max_size, &mtime, &atime, time_warp_seq, 1232 size, max_size, &mtime, &atime, time_warp_seq,
1217 uid, gid, mode, xattr_version, xattr_blob, 1233 uid, gid, mode, xattr_version, xattr_blob,
1218 follows); 1234 follows, inline_data);
1219 if (ret < 0) { 1235 if (ret < 0) {
1220 dout("error sending cap msg, must requeue %p\n", inode); 1236 dout("error sending cap msg, must requeue %p\n", inode);
1221 delayed = 1; 1237 delayed = 1;
@@ -1336,7 +1352,7 @@ retry:
1336 capsnap->time_warp_seq, 1352 capsnap->time_warp_seq,
1337 capsnap->uid, capsnap->gid, capsnap->mode, 1353 capsnap->uid, capsnap->gid, capsnap->mode,
1338 capsnap->xattr_version, capsnap->xattr_blob, 1354 capsnap->xattr_version, capsnap->xattr_blob,
1339 capsnap->follows); 1355 capsnap->follows, capsnap->inline_data);
1340 1356
1341 next_follows = capsnap->follows + 1; 1357 next_follows = capsnap->follows + 1;
1342 ceph_put_cap_snap(capsnap); 1358 ceph_put_cap_snap(capsnap);
@@ -2057,15 +2073,17 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
2057 * requested from the MDS. 2073 * requested from the MDS.
2058 */ 2074 */
2059static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, 2075static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2060 int *got, loff_t endoff, int *check_max, int *err) 2076 loff_t endoff, int *got, struct page **pinned_page,
2077 int *check_max, int *err)
2061{ 2078{
2062 struct inode *inode = &ci->vfs_inode; 2079 struct inode *inode = &ci->vfs_inode;
2063 int ret = 0; 2080 int ret = 0;
2064 int have, implemented; 2081 int have, implemented, _got = 0;
2065 int file_wanted; 2082 int file_wanted;
2066 2083
2067 dout("get_cap_refs %p need %s want %s\n", inode, 2084 dout("get_cap_refs %p need %s want %s\n", inode,
2068 ceph_cap_string(need), ceph_cap_string(want)); 2085 ceph_cap_string(need), ceph_cap_string(want));
2086again:
2069 spin_lock(&ci->i_ceph_lock); 2087 spin_lock(&ci->i_ceph_lock);
2070 2088
2071 /* make sure file is actually open */ 2089 /* make sure file is actually open */
@@ -2075,7 +2093,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2075 ceph_cap_string(need), ceph_cap_string(file_wanted)); 2093 ceph_cap_string(need), ceph_cap_string(file_wanted));
2076 *err = -EBADF; 2094 *err = -EBADF;
2077 ret = 1; 2095 ret = 1;
2078 goto out; 2096 goto out_unlock;
2079 } 2097 }
2080 2098
2081 /* finish pending truncate */ 2099 /* finish pending truncate */
@@ -2095,7 +2113,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2095 *check_max = 1; 2113 *check_max = 1;
2096 ret = 1; 2114 ret = 1;
2097 } 2115 }
2098 goto out; 2116 goto out_unlock;
2099 } 2117 }
2100 /* 2118 /*
2101 * If a sync write is in progress, we must wait, so that we 2119 * If a sync write is in progress, we must wait, so that we
@@ -2103,7 +2121,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2103 */ 2121 */
2104 if (__ceph_have_pending_cap_snap(ci)) { 2122 if (__ceph_have_pending_cap_snap(ci)) {
2105 dout("get_cap_refs %p cap_snap_pending\n", inode); 2123 dout("get_cap_refs %p cap_snap_pending\n", inode);
2106 goto out; 2124 goto out_unlock;
2107 } 2125 }
2108 } 2126 }
2109 2127
@@ -2120,18 +2138,50 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2120 inode, ceph_cap_string(have), ceph_cap_string(not), 2138 inode, ceph_cap_string(have), ceph_cap_string(not),
2121 ceph_cap_string(revoking)); 2139 ceph_cap_string(revoking));
2122 if ((revoking & not) == 0) { 2140 if ((revoking & not) == 0) {
2123 *got = need | (have & want); 2141 _got = need | (have & want);
2124 __take_cap_refs(ci, *got); 2142 __take_cap_refs(ci, _got);
2125 ret = 1; 2143 ret = 1;
2126 } 2144 }
2127 } else { 2145 } else {
2128 dout("get_cap_refs %p have %s needed %s\n", inode, 2146 dout("get_cap_refs %p have %s needed %s\n", inode,
2129 ceph_cap_string(have), ceph_cap_string(need)); 2147 ceph_cap_string(have), ceph_cap_string(need));
2130 } 2148 }
2131out: 2149out_unlock:
2132 spin_unlock(&ci->i_ceph_lock); 2150 spin_unlock(&ci->i_ceph_lock);
2151
2152 if (ci->i_inline_version != CEPH_INLINE_NONE &&
2153 (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
2154 i_size_read(inode) > 0) {
2155 int ret1;
2156 struct page *page = find_get_page(inode->i_mapping, 0);
2157 if (page) {
2158 if (PageUptodate(page)) {
2159 *pinned_page = page;
2160 goto out;
2161 }
2162 page_cache_release(page);
2163 }
2164 /*
2165 * drop cap refs first because getattr while holding
2166 * caps refs can cause deadlock.
2167 */
2168 ceph_put_cap_refs(ci, _got);
2169 _got = 0;
2170
2171 /* getattr request will bring inline data into page cache */
2172 ret1 = __ceph_do_getattr(inode, NULL,
2173 CEPH_STAT_CAP_INLINE_DATA, true);
2174 if (ret1 >= 0) {
2175 ret = 0;
2176 goto again;
2177 }
2178 *err = ret1;
2179 ret = 1;
2180 }
2181out:
2133 dout("get_cap_refs %p ret %d got %s\n", inode, 2182 dout("get_cap_refs %p ret %d got %s\n", inode,
2134 ret, ceph_cap_string(*got)); 2183 ret, ceph_cap_string(_got));
2184 *got = _got;
2135 return ret; 2185 return ret;
2136} 2186}
2137 2187
@@ -2168,8 +2218,8 @@ static void check_max_size(struct inode *inode, loff_t endoff)
2168 * due to a small max_size, make sure we check_max_size (and possibly 2218 * due to a small max_size, make sure we check_max_size (and possibly
2169 * ask the mds) so we don't get hung up indefinitely. 2219 * ask the mds) so we don't get hung up indefinitely.
2170 */ 2220 */
2171int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got, 2221int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
2172 loff_t endoff) 2222 loff_t endoff, int *got, struct page **pinned_page)
2173{ 2223{
2174 int check_max, ret, err; 2224 int check_max, ret, err;
2175 2225
@@ -2179,8 +2229,8 @@ retry:
2179 check_max = 0; 2229 check_max = 0;
2180 err = 0; 2230 err = 0;
2181 ret = wait_event_interruptible(ci->i_cap_wq, 2231 ret = wait_event_interruptible(ci->i_cap_wq,
2182 try_get_cap_refs(ci, need, want, 2232 try_get_cap_refs(ci, need, want, endoff,
2183 got, endoff, 2233 got, pinned_page,
2184 &check_max, &err)); 2234 &check_max, &err));
2185 if (err) 2235 if (err)
2186 ret = err; 2236 ret = err;
@@ -2383,6 +2433,8 @@ static void invalidate_aliases(struct inode *inode)
2383static void handle_cap_grant(struct ceph_mds_client *mdsc, 2433static void handle_cap_grant(struct ceph_mds_client *mdsc,
2384 struct inode *inode, struct ceph_mds_caps *grant, 2434 struct inode *inode, struct ceph_mds_caps *grant,
2385 void *snaptrace, int snaptrace_len, 2435 void *snaptrace, int snaptrace_len,
2436 u64 inline_version,
2437 void *inline_data, int inline_len,
2386 struct ceph_buffer *xattr_buf, 2438 struct ceph_buffer *xattr_buf,
2387 struct ceph_mds_session *session, 2439 struct ceph_mds_session *session,
2388 struct ceph_cap *cap, int issued) 2440 struct ceph_cap *cap, int issued)
@@ -2403,6 +2455,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
2403 bool queue_invalidate = false; 2455 bool queue_invalidate = false;
2404 bool queue_revalidate = false; 2456 bool queue_revalidate = false;
2405 bool deleted_inode = false; 2457 bool deleted_inode = false;
2458 bool fill_inline = false;
2406 2459
2407 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", 2460 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
2408 inode, cap, mds, seq, ceph_cap_string(newcaps)); 2461 inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2576,6 +2629,13 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
2576 } 2629 }
2577 BUG_ON(cap->issued & ~cap->implemented); 2630 BUG_ON(cap->issued & ~cap->implemented);
2578 2631
2632 if (inline_version > 0 && inline_version >= ci->i_inline_version) {
2633 ci->i_inline_version = inline_version;
2634 if (ci->i_inline_version != CEPH_INLINE_NONE &&
2635 (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
2636 fill_inline = true;
2637 }
2638
2579 spin_unlock(&ci->i_ceph_lock); 2639 spin_unlock(&ci->i_ceph_lock);
2580 2640
2581 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { 2641 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
@@ -2589,6 +2649,9 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
2589 wake = true; 2649 wake = true;
2590 } 2650 }
2591 2651
2652 if (fill_inline)
2653 ceph_fill_inline_data(inode, NULL, inline_data, inline_len);
2654
2592 if (queue_trunc) { 2655 if (queue_trunc) {
2593 ceph_queue_vmtruncate(inode); 2656 ceph_queue_vmtruncate(inode);
2594 ceph_queue_revalidate(inode); 2657 ceph_queue_revalidate(inode);
@@ -2996,11 +3059,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2996 u64 cap_id; 3059 u64 cap_id;
2997 u64 size, max_size; 3060 u64 size, max_size;
2998 u64 tid; 3061 u64 tid;
3062 u64 inline_version = 0;
3063 void *inline_data = NULL;
3064 u32 inline_len = 0;
2999 void *snaptrace; 3065 void *snaptrace;
3000 size_t snaptrace_len; 3066 size_t snaptrace_len;
3001 void *flock; 3067 void *p, *end;
3002 void *end;
3003 u32 flock_len;
3004 3068
3005 dout("handle_caps from mds%d\n", mds); 3069 dout("handle_caps from mds%d\n", mds);
3006 3070
@@ -3021,30 +3085,37 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3021 3085
3022 snaptrace = h + 1; 3086 snaptrace = h + 1;
3023 snaptrace_len = le32_to_cpu(h->snap_trace_len); 3087 snaptrace_len = le32_to_cpu(h->snap_trace_len);
3088 p = snaptrace + snaptrace_len;
3024 3089
3025 if (le16_to_cpu(msg->hdr.version) >= 2) { 3090 if (le16_to_cpu(msg->hdr.version) >= 2) {
3026 void *p = snaptrace + snaptrace_len; 3091 u32 flock_len;
3027 ceph_decode_32_safe(&p, end, flock_len, bad); 3092 ceph_decode_32_safe(&p, end, flock_len, bad);
3028 if (p + flock_len > end) 3093 if (p + flock_len > end)
3029 goto bad; 3094 goto bad;
3030 flock = p; 3095 p += flock_len;
3031 } else {
3032 flock = NULL;
3033 flock_len = 0;
3034 } 3096 }
3035 3097
3036 if (le16_to_cpu(msg->hdr.version) >= 3) { 3098 if (le16_to_cpu(msg->hdr.version) >= 3) {
3037 if (op == CEPH_CAP_OP_IMPORT) { 3099 if (op == CEPH_CAP_OP_IMPORT) {
3038 void *p = flock + flock_len;
3039 if (p + sizeof(*peer) > end) 3100 if (p + sizeof(*peer) > end)
3040 goto bad; 3101 goto bad;
3041 peer = p; 3102 peer = p;
3103 p += sizeof(*peer);
3042 } else if (op == CEPH_CAP_OP_EXPORT) { 3104 } else if (op == CEPH_CAP_OP_EXPORT) {
3043 /* recorded in unused fields */ 3105 /* recorded in unused fields */
3044 peer = (void *)&h->size; 3106 peer = (void *)&h->size;
3045 } 3107 }
3046 } 3108 }
3047 3109
3110 if (le16_to_cpu(msg->hdr.version) >= 4) {
3111 ceph_decode_64_safe(&p, end, inline_version, bad);
3112 ceph_decode_32_safe(&p, end, inline_len, bad);
3113 if (p + inline_len > end)
3114 goto bad;
3115 inline_data = p;
3116 p += inline_len;
3117 }
3118
3048 /* lookup ino */ 3119 /* lookup ino */
3049 inode = ceph_find_inode(sb, vino); 3120 inode = ceph_find_inode(sb, vino);
3050 ci = ceph_inode(inode); 3121 ci = ceph_inode(inode);
@@ -3085,6 +3156,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3085 handle_cap_import(mdsc, inode, h, peer, session, 3156 handle_cap_import(mdsc, inode, h, peer, session,
3086 &cap, &issued); 3157 &cap, &issued);
3087 handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len, 3158 handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len,
3159 inline_version, inline_data, inline_len,
3088 msg->middle, session, cap, issued); 3160 msg->middle, session, cap, issued);
3089 goto done_unlocked; 3161 goto done_unlocked;
3090 } 3162 }
@@ -3105,8 +3177,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3105 case CEPH_CAP_OP_GRANT: 3177 case CEPH_CAP_OP_GRANT:
3106 __ceph_caps_issued(ci, &issued); 3178 __ceph_caps_issued(ci, &issued);
3107 issued |= __ceph_caps_dirty(ci); 3179 issued |= __ceph_caps_dirty(ci);
3108 handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle, 3180 handle_cap_grant(mdsc, inode, h, NULL, 0,
3109 session, cap, issued); 3181 inline_version, inline_data, inline_len,
3182 msg->middle, session, cap, issued);
3110 goto done_unlocked; 3183 goto done_unlocked;
3111 3184
3112 case CEPH_CAP_OP_FLUSH_ACK: 3185 case CEPH_CAP_OP_FLUSH_ACK:
@@ -3137,8 +3210,7 @@ flush_cap_releases:
3137done: 3210done:
3138 mutex_unlock(&session->s_mutex); 3211 mutex_unlock(&session->s_mutex);
3139done_unlocked: 3212done_unlocked:
3140 if (inode) 3213 iput(inode);
3141 iput(inode);
3142 return; 3214 return;
3143 3215
3144bad: 3216bad:
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 681a8537b64f..c241603764fd 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -183,7 +183,7 @@ more:
183 spin_unlock(&parent->d_lock); 183 spin_unlock(&parent->d_lock);
184 184
185 /* make sure a dentry wasn't dropped while we didn't have parent lock */ 185 /* make sure a dentry wasn't dropped while we didn't have parent lock */
186 if (!ceph_dir_is_complete(dir)) { 186 if (!ceph_dir_is_complete_ordered(dir)) {
187 dout(" lost dir complete on %p; falling back to mds\n", dir); 187 dout(" lost dir complete on %p; falling back to mds\n", dir);
188 dput(dentry); 188 dput(dentry);
189 err = -EAGAIN; 189 err = -EAGAIN;
@@ -261,10 +261,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
261 261
262 /* always start with . and .. */ 262 /* always start with . and .. */
263 if (ctx->pos == 0) { 263 if (ctx->pos == 0) {
264 /* note dir version at start of readdir so we can tell
265 * if any dentries get dropped */
266 fi->dir_release_count = atomic_read(&ci->i_release_count);
267
268 dout("readdir off 0 -> '.'\n"); 264 dout("readdir off 0 -> '.'\n");
269 if (!dir_emit(ctx, ".", 1, 265 if (!dir_emit(ctx, ".", 1,
270 ceph_translate_ino(inode->i_sb, inode->i_ino), 266 ceph_translate_ino(inode->i_sb, inode->i_ino),
@@ -289,7 +285,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
289 if ((ctx->pos == 2 || fi->dentry) && 285 if ((ctx->pos == 2 || fi->dentry) &&
290 !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && 286 !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
291 ceph_snap(inode) != CEPH_SNAPDIR && 287 ceph_snap(inode) != CEPH_SNAPDIR &&
292 __ceph_dir_is_complete(ci) && 288 __ceph_dir_is_complete_ordered(ci) &&
293 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { 289 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
294 u32 shared_gen = ci->i_shared_gen; 290 u32 shared_gen = ci->i_shared_gen;
295 spin_unlock(&ci->i_ceph_lock); 291 spin_unlock(&ci->i_ceph_lock);
@@ -312,6 +308,13 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
312 308
313 /* proceed with a normal readdir */ 309 /* proceed with a normal readdir */
314 310
311 if (ctx->pos == 2) {
312 /* note dir version at start of readdir so we can tell
313 * if any dentries get dropped */
314 fi->dir_release_count = atomic_read(&ci->i_release_count);
315 fi->dir_ordered_count = ci->i_ordered_count;
316 }
317
315more: 318more:
316 /* do we have the correct frag content buffered? */ 319 /* do we have the correct frag content buffered? */
317 if (fi->frag != frag || fi->last_readdir == NULL) { 320 if (fi->frag != frag || fi->last_readdir == NULL) {
@@ -446,8 +449,12 @@ more:
446 */ 449 */
447 spin_lock(&ci->i_ceph_lock); 450 spin_lock(&ci->i_ceph_lock);
448 if (atomic_read(&ci->i_release_count) == fi->dir_release_count) { 451 if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
449 dout(" marking %p complete\n", inode); 452 if (ci->i_ordered_count == fi->dir_ordered_count)
450 __ceph_dir_set_complete(ci, fi->dir_release_count); 453 dout(" marking %p complete and ordered\n", inode);
454 else
455 dout(" marking %p complete\n", inode);
456 __ceph_dir_set_complete(ci, fi->dir_release_count,
457 fi->dir_ordered_count);
451 } 458 }
452 spin_unlock(&ci->i_ceph_lock); 459 spin_unlock(&ci->i_ceph_lock);
453 460
@@ -805,7 +812,9 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
805 acls.pagelist = NULL; 812 acls.pagelist = NULL;
806 } 813 }
807 err = ceph_mdsc_do_request(mdsc, dir, req); 814 err = ceph_mdsc_do_request(mdsc, dir, req);
808 if (!err && !req->r_reply_info.head->is_dentry) 815 if (!err &&
816 !req->r_reply_info.head->is_target &&
817 !req->r_reply_info.head->is_dentry)
809 err = ceph_handle_notrace_create(dir, dentry); 818 err = ceph_handle_notrace_create(dir, dentry);
810 ceph_mdsc_put_request(req); 819 ceph_mdsc_put_request(req);
811out: 820out:
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 9f8e3572040e..ce74b394b49d 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -333,6 +333,11 @@ int ceph_release(struct inode *inode, struct file *file)
333 return 0; 333 return 0;
334} 334}
335 335
336enum {
337 CHECK_EOF = 1,
338 READ_INLINE = 2,
339};
340
336/* 341/*
337 * Read a range of bytes striped over one or more objects. Iterate over 342 * Read a range of bytes striped over one or more objects. Iterate over
338 * objects we stripe over. (That's not atomic, but good enough for now.) 343 * objects we stripe over. (That's not atomic, but good enough for now.)
@@ -412,7 +417,7 @@ more:
412 ret = read; 417 ret = read;
413 /* did we bounce off eof? */ 418 /* did we bounce off eof? */
414 if (pos + left > inode->i_size) 419 if (pos + left > inode->i_size)
415 *checkeof = 1; 420 *checkeof = CHECK_EOF;
416 } 421 }
417 422
418 dout("striped_read returns %d\n", ret); 423 dout("striped_read returns %d\n", ret);
@@ -598,7 +603,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
598 snapc = ci->i_snap_realm->cached_context; 603 snapc = ci->i_snap_realm->cached_context;
599 vino = ceph_vino(inode); 604 vino = ceph_vino(inode);
600 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 605 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
601 vino, pos, &len, 606 vino, pos, &len, 0,
602 2,/*include a 'startsync' command*/ 607 2,/*include a 'startsync' command*/
603 CEPH_OSD_OP_WRITE, flags, snapc, 608 CEPH_OSD_OP_WRITE, flags, snapc,
604 ci->i_truncate_seq, 609 ci->i_truncate_seq,
@@ -609,6 +614,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
609 break; 614 break;
610 } 615 }
611 616
617 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
618
612 n = iov_iter_get_pages_alloc(from, &pages, len, &start); 619 n = iov_iter_get_pages_alloc(from, &pages, len, &start);
613 if (unlikely(n < 0)) { 620 if (unlikely(n < 0)) {
614 ret = n; 621 ret = n;
@@ -713,7 +720,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
713 snapc = ci->i_snap_realm->cached_context; 720 snapc = ci->i_snap_realm->cached_context;
714 vino = ceph_vino(inode); 721 vino = ceph_vino(inode);
715 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 722 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
716 vino, pos, &len, 1, 723 vino, pos, &len, 0, 1,
717 CEPH_OSD_OP_WRITE, flags, snapc, 724 CEPH_OSD_OP_WRITE, flags, snapc,
718 ci->i_truncate_seq, 725 ci->i_truncate_seq,
719 ci->i_truncate_size, 726 ci->i_truncate_size,
@@ -803,9 +810,10 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
803 size_t len = iocb->ki_nbytes; 810 size_t len = iocb->ki_nbytes;
804 struct inode *inode = file_inode(filp); 811 struct inode *inode = file_inode(filp);
805 struct ceph_inode_info *ci = ceph_inode(inode); 812 struct ceph_inode_info *ci = ceph_inode(inode);
813 struct page *pinned_page = NULL;
806 ssize_t ret; 814 ssize_t ret;
807 int want, got = 0; 815 int want, got = 0;
808 int checkeof = 0, read = 0; 816 int retry_op = 0, read = 0;
809 817
810again: 818again:
811 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", 819 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
@@ -815,7 +823,7 @@ again:
815 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; 823 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
816 else 824 else
817 want = CEPH_CAP_FILE_CACHE; 825 want = CEPH_CAP_FILE_CACHE;
818 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); 826 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
819 if (ret < 0) 827 if (ret < 0)
820 return ret; 828 return ret;
821 829
@@ -827,8 +835,12 @@ again:
827 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, 835 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
828 ceph_cap_string(got)); 836 ceph_cap_string(got));
829 837
830 /* hmm, this isn't really async... */ 838 if (ci->i_inline_version == CEPH_INLINE_NONE) {
831 ret = ceph_sync_read(iocb, to, &checkeof); 839 /* hmm, this isn't really async... */
840 ret = ceph_sync_read(iocb, to, &retry_op);
841 } else {
842 retry_op = READ_INLINE;
843 }
832 } else { 844 } else {
833 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", 845 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
834 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, 846 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
@@ -838,13 +850,55 @@ again:
838 } 850 }
839 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", 851 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
840 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); 852 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
853 if (pinned_page) {
854 page_cache_release(pinned_page);
855 pinned_page = NULL;
856 }
841 ceph_put_cap_refs(ci, got); 857 ceph_put_cap_refs(ci, got);
858 if (retry_op && ret >= 0) {
859 int statret;
860 struct page *page = NULL;
861 loff_t i_size;
862 if (retry_op == READ_INLINE) {
863 page = __page_cache_alloc(GFP_NOFS);
864 if (!page)
865 return -ENOMEM;
866 }
842 867
843 if (checkeof && ret >= 0) { 868 statret = __ceph_do_getattr(inode, page,
844 int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false); 869 CEPH_STAT_CAP_INLINE_DATA, !!page);
870 if (statret < 0) {
871 __free_page(page);
872 if (statret == -ENODATA) {
873 BUG_ON(retry_op != READ_INLINE);
874 goto again;
875 }
876 return statret;
877 }
878
879 i_size = i_size_read(inode);
880 if (retry_op == READ_INLINE) {
881 /* does not support inline data > PAGE_SIZE */
882 if (i_size > PAGE_CACHE_SIZE) {
883 ret = -EIO;
884 } else if (iocb->ki_pos < i_size) {
885 loff_t end = min_t(loff_t, i_size,
886 iocb->ki_pos + len);
887 if (statret < end)
888 zero_user_segment(page, statret, end);
889 ret = copy_page_to_iter(page,
890 iocb->ki_pos & ~PAGE_MASK,
891 end - iocb->ki_pos, to);
892 iocb->ki_pos += ret;
893 } else {
894 ret = 0;
895 }
896 __free_pages(page, 0);
897 return ret;
898 }
845 899
846 /* hit EOF or hole? */ 900 /* hit EOF or hole? */
847 if (statret == 0 && iocb->ki_pos < inode->i_size && 901 if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
848 ret < len) { 902 ret < len) {
849 dout("sync_read hit hole, ppos %lld < size %lld" 903 dout("sync_read hit hole, ppos %lld < size %lld"
850 ", reading more\n", iocb->ki_pos, 904 ", reading more\n", iocb->ki_pos,
@@ -852,7 +906,7 @@ again:
852 906
853 read += ret; 907 read += ret;
854 len -= ret; 908 len -= ret;
855 checkeof = 0; 909 retry_op = 0;
856 goto again; 910 goto again;
857 } 911 }
858 } 912 }
@@ -909,6 +963,12 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
909 if (err) 963 if (err)
910 goto out; 964 goto out;
911 965
966 if (ci->i_inline_version != CEPH_INLINE_NONE) {
967 err = ceph_uninline_data(file, NULL);
968 if (err < 0)
969 goto out;
970 }
971
912retry_snap: 972retry_snap:
913 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) { 973 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) {
914 err = -ENOSPC; 974 err = -ENOSPC;
@@ -922,7 +982,8 @@ retry_snap:
922 else 982 else
923 want = CEPH_CAP_FILE_BUFFER; 983 want = CEPH_CAP_FILE_BUFFER;
924 got = 0; 984 got = 0;
925 err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos + count); 985 err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
986 &got, NULL);
926 if (err < 0) 987 if (err < 0)
927 goto out; 988 goto out;
928 989
@@ -969,6 +1030,7 @@ retry_snap:
969 if (written >= 0) { 1030 if (written >= 0) {
970 int dirty; 1031 int dirty;
971 spin_lock(&ci->i_ceph_lock); 1032 spin_lock(&ci->i_ceph_lock);
1033 ci->i_inline_version = CEPH_INLINE_NONE;
972 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); 1034 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
973 spin_unlock(&ci->i_ceph_lock); 1035 spin_unlock(&ci->i_ceph_lock);
974 if (dirty) 1036 if (dirty)
@@ -1111,7 +1173,7 @@ static int ceph_zero_partial_object(struct inode *inode,
1111 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 1173 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1112 ceph_vino(inode), 1174 ceph_vino(inode),
1113 offset, length, 1175 offset, length,
1114 1, op, 1176 0, 1, op,
1115 CEPH_OSD_FLAG_WRITE | 1177 CEPH_OSD_FLAG_WRITE |
1116 CEPH_OSD_FLAG_ONDISK, 1178 CEPH_OSD_FLAG_ONDISK,
1117 NULL, 0, 0, false); 1179 NULL, 0, 0, false);
@@ -1214,6 +1276,12 @@ static long ceph_fallocate(struct file *file, int mode,
1214 goto unlock; 1276 goto unlock;
1215 } 1277 }
1216 1278
1279 if (ci->i_inline_version != CEPH_INLINE_NONE) {
1280 ret = ceph_uninline_data(file, NULL);
1281 if (ret < 0)
1282 goto unlock;
1283 }
1284
1217 size = i_size_read(inode); 1285 size = i_size_read(inode);
1218 if (!(mode & FALLOC_FL_KEEP_SIZE)) 1286 if (!(mode & FALLOC_FL_KEEP_SIZE))
1219 endoff = offset + length; 1287 endoff = offset + length;
@@ -1223,7 +1291,7 @@ static long ceph_fallocate(struct file *file, int mode,
1223 else 1291 else
1224 want = CEPH_CAP_FILE_BUFFER; 1292 want = CEPH_CAP_FILE_BUFFER;
1225 1293
1226 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff); 1294 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
1227 if (ret < 0) 1295 if (ret < 0)
1228 goto unlock; 1296 goto unlock;
1229 1297
@@ -1240,6 +1308,7 @@ static long ceph_fallocate(struct file *file, int mode,
1240 1308
1241 if (!ret) { 1309 if (!ret) {
1242 spin_lock(&ci->i_ceph_lock); 1310 spin_lock(&ci->i_ceph_lock);
1311 ci->i_inline_version = CEPH_INLINE_NONE;
1243 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); 1312 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
1244 spin_unlock(&ci->i_ceph_lock); 1313 spin_unlock(&ci->i_ceph_lock);
1245 if (dirty) 1314 if (dirty)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index a5593d51d035..f61a74115beb 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -387,8 +387,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
387 spin_lock_init(&ci->i_ceph_lock); 387 spin_lock_init(&ci->i_ceph_lock);
388 388
389 ci->i_version = 0; 389 ci->i_version = 0;
390 ci->i_inline_version = 0;
390 ci->i_time_warp_seq = 0; 391 ci->i_time_warp_seq = 0;
391 ci->i_ceph_flags = 0; 392 ci->i_ceph_flags = 0;
393 ci->i_ordered_count = 0;
392 atomic_set(&ci->i_release_count, 1); 394 atomic_set(&ci->i_release_count, 1);
393 atomic_set(&ci->i_complete_count, 0); 395 atomic_set(&ci->i_complete_count, 0);
394 ci->i_symlink = NULL; 396 ci->i_symlink = NULL;
@@ -657,7 +659,7 @@ void ceph_fill_file_time(struct inode *inode, int issued,
657 * Populate an inode based on info from mds. May be called on new or 659 * Populate an inode based on info from mds. May be called on new or
658 * existing inodes. 660 * existing inodes.
659 */ 661 */
660static int fill_inode(struct inode *inode, 662static int fill_inode(struct inode *inode, struct page *locked_page,
661 struct ceph_mds_reply_info_in *iinfo, 663 struct ceph_mds_reply_info_in *iinfo,
662 struct ceph_mds_reply_dirfrag *dirinfo, 664 struct ceph_mds_reply_dirfrag *dirinfo,
663 struct ceph_mds_session *session, 665 struct ceph_mds_session *session,
@@ -675,6 +677,7 @@ static int fill_inode(struct inode *inode,
675 bool wake = false; 677 bool wake = false;
676 bool queue_trunc = false; 678 bool queue_trunc = false;
677 bool new_version = false; 679 bool new_version = false;
680 bool fill_inline = false;
678 681
679 dout("fill_inode %p ino %llx.%llx v %llu had %llu\n", 682 dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
680 inode, ceph_vinop(inode), le64_to_cpu(info->version), 683 inode, ceph_vinop(inode), le64_to_cpu(info->version),
@@ -845,7 +848,8 @@ static int fill_inode(struct inode *inode,
845 (issued & CEPH_CAP_FILE_EXCL) == 0 && 848 (issued & CEPH_CAP_FILE_EXCL) == 0 &&
846 !__ceph_dir_is_complete(ci)) { 849 !__ceph_dir_is_complete(ci)) {
847 dout(" marking %p complete (empty)\n", inode); 850 dout(" marking %p complete (empty)\n", inode);
848 __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count)); 851 __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
852 ci->i_ordered_count);
849 } 853 }
850 854
851 /* were we issued a capability? */ 855 /* were we issued a capability? */
@@ -873,8 +877,23 @@ static int fill_inode(struct inode *inode,
873 ceph_vinop(inode)); 877 ceph_vinop(inode));
874 __ceph_get_fmode(ci, cap_fmode); 878 __ceph_get_fmode(ci, cap_fmode);
875 } 879 }
880
881 if (iinfo->inline_version > 0 &&
882 iinfo->inline_version >= ci->i_inline_version) {
883 int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
884 ci->i_inline_version = iinfo->inline_version;
885 if (ci->i_inline_version != CEPH_INLINE_NONE &&
886 (locked_page ||
887 (le32_to_cpu(info->cap.caps) & cache_caps)))
888 fill_inline = true;
889 }
890
876 spin_unlock(&ci->i_ceph_lock); 891 spin_unlock(&ci->i_ceph_lock);
877 892
893 if (fill_inline)
894 ceph_fill_inline_data(inode, locked_page,
895 iinfo->inline_data, iinfo->inline_len);
896
878 if (wake) 897 if (wake)
879 wake_up_all(&ci->i_cap_wq); 898 wake_up_all(&ci->i_cap_wq);
880 899
@@ -1062,7 +1081,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
1062 struct inode *dir = req->r_locked_dir; 1081 struct inode *dir = req->r_locked_dir;
1063 1082
1064 if (dir) { 1083 if (dir) {
1065 err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag, 1084 err = fill_inode(dir, NULL,
1085 &rinfo->diri, rinfo->dirfrag,
1066 session, req->r_request_started, -1, 1086 session, req->r_request_started, -1,
1067 &req->r_caps_reservation); 1087 &req->r_caps_reservation);
1068 if (err < 0) 1088 if (err < 0)
@@ -1132,7 +1152,7 @@ retry_lookup:
1132 } 1152 }
1133 req->r_target_inode = in; 1153 req->r_target_inode = in;
1134 1154
1135 err = fill_inode(in, &rinfo->targeti, NULL, 1155 err = fill_inode(in, req->r_locked_page, &rinfo->targeti, NULL,
1136 session, req->r_request_started, 1156 session, req->r_request_started,
1137 (!req->r_aborted && rinfo->head->result == 0) ? 1157 (!req->r_aborted && rinfo->head->result == 0) ?
1138 req->r_fmode : -1, 1158 req->r_fmode : -1,
@@ -1204,8 +1224,8 @@ retry_lookup:
1204 ceph_invalidate_dentry_lease(dn); 1224 ceph_invalidate_dentry_lease(dn);
1205 1225
1206 /* d_move screws up sibling dentries' offsets */ 1226 /* d_move screws up sibling dentries' offsets */
1207 ceph_dir_clear_complete(dir); 1227 ceph_dir_clear_ordered(dir);
1208 ceph_dir_clear_complete(olddir); 1228 ceph_dir_clear_ordered(olddir);
1209 1229
1210 dout("dn %p gets new offset %lld\n", req->r_old_dentry, 1230 dout("dn %p gets new offset %lld\n", req->r_old_dentry,
1211 ceph_dentry(req->r_old_dentry)->offset); 1231 ceph_dentry(req->r_old_dentry)->offset);
@@ -1217,6 +1237,7 @@ retry_lookup:
1217 if (!rinfo->head->is_target) { 1237 if (!rinfo->head->is_target) {
1218 dout("fill_trace null dentry\n"); 1238 dout("fill_trace null dentry\n");
1219 if (dn->d_inode) { 1239 if (dn->d_inode) {
1240 ceph_dir_clear_ordered(dir);
1220 dout("d_delete %p\n", dn); 1241 dout("d_delete %p\n", dn);
1221 d_delete(dn); 1242 d_delete(dn);
1222 } else { 1243 } else {
@@ -1233,7 +1254,7 @@ retry_lookup:
1233 1254
1234 /* attach proper inode */ 1255 /* attach proper inode */
1235 if (!dn->d_inode) { 1256 if (!dn->d_inode) {
1236 ceph_dir_clear_complete(dir); 1257 ceph_dir_clear_ordered(dir);
1237 ihold(in); 1258 ihold(in);
1238 dn = splice_dentry(dn, in, &have_lease); 1259 dn = splice_dentry(dn, in, &have_lease);
1239 if (IS_ERR(dn)) { 1260 if (IS_ERR(dn)) {
@@ -1263,7 +1284,7 @@ retry_lookup:
1263 BUG_ON(!dir); 1284 BUG_ON(!dir);
1264 BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR); 1285 BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
1265 dout(" linking snapped dir %p to dn %p\n", in, dn); 1286 dout(" linking snapped dir %p to dn %p\n", in, dn);
1266 ceph_dir_clear_complete(dir); 1287 ceph_dir_clear_ordered(dir);
1267 ihold(in); 1288 ihold(in);
1268 dn = splice_dentry(dn, in, NULL); 1289 dn = splice_dentry(dn, in, NULL);
1269 if (IS_ERR(dn)) { 1290 if (IS_ERR(dn)) {
@@ -1300,7 +1321,7 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
1300 dout("new_inode badness got %d\n", err); 1321 dout("new_inode badness got %d\n", err);
1301 continue; 1322 continue;
1302 } 1323 }
1303 rc = fill_inode(in, &rinfo->dir_in[i], NULL, session, 1324 rc = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
1304 req->r_request_started, -1, 1325 req->r_request_started, -1,
1305 &req->r_caps_reservation); 1326 &req->r_caps_reservation);
1306 if (rc < 0) { 1327 if (rc < 0) {
@@ -1416,7 +1437,7 @@ retry_lookup:
1416 } 1437 }
1417 } 1438 }
1418 1439
1419 if (fill_inode(in, &rinfo->dir_in[i], NULL, session, 1440 if (fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
1420 req->r_request_started, -1, 1441 req->r_request_started, -1,
1421 &req->r_caps_reservation) < 0) { 1442 &req->r_caps_reservation) < 0) {
1422 pr_err("fill_inode badness on %p\n", in); 1443 pr_err("fill_inode badness on %p\n", in);
@@ -1899,7 +1920,8 @@ out_put:
1899 * Verify that we have a lease on the given mask. If not, 1920 * Verify that we have a lease on the given mask. If not,
1900 * do a getattr against an mds. 1921 * do a getattr against an mds.
1901 */ 1922 */
1902int ceph_do_getattr(struct inode *inode, int mask, bool force) 1923int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
1924 int mask, bool force)
1903{ 1925{
1904 struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); 1926 struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
1905 struct ceph_mds_client *mdsc = fsc->mdsc; 1927 struct ceph_mds_client *mdsc = fsc->mdsc;
@@ -1911,7 +1933,8 @@ int ceph_do_getattr(struct inode *inode, int mask, bool force)
1911 return 0; 1933 return 0;
1912 } 1934 }
1913 1935
1914 dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode); 1936 dout("do_getattr inode %p mask %s mode 0%o\n",
1937 inode, ceph_cap_string(mask), inode->i_mode);
1915 if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1)) 1938 if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
1916 return 0; 1939 return 0;
1917 1940
@@ -1922,7 +1945,19 @@ int ceph_do_getattr(struct inode *inode, int mask, bool force)
1922 ihold(inode); 1945 ihold(inode);
1923 req->r_num_caps = 1; 1946 req->r_num_caps = 1;
1924 req->r_args.getattr.mask = cpu_to_le32(mask); 1947 req->r_args.getattr.mask = cpu_to_le32(mask);
1948 req->r_locked_page = locked_page;
1925 err = ceph_mdsc_do_request(mdsc, NULL, req); 1949 err = ceph_mdsc_do_request(mdsc, NULL, req);
1950 if (locked_page && err == 0) {
1951 u64 inline_version = req->r_reply_info.targeti.inline_version;
1952 if (inline_version == 0) {
1953 /* the reply is supposed to contain inline data */
1954 err = -EINVAL;
1955 } else if (inline_version == CEPH_INLINE_NONE) {
1956 err = -ENODATA;
1957 } else {
1958 err = req->r_reply_info.targeti.inline_len;
1959 }
1960 }
1926 ceph_mdsc_put_request(req); 1961 ceph_mdsc_put_request(req);
1927 dout("do_getattr result=%d\n", err); 1962 dout("do_getattr result=%d\n", err);
1928 return err; 1963 return err;
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index fbc39c47bacd..c35c5c614e38 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -9,6 +9,8 @@
9#include <linux/ceph/pagelist.h> 9#include <linux/ceph/pagelist.h>
10 10
11static u64 lock_secret; 11static u64 lock_secret;
12static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
13 struct ceph_mds_request *req);
12 14
13static inline u64 secure_addr(void *addr) 15static inline u64 secure_addr(void *addr)
14{ 16{
@@ -40,6 +42,9 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
40 u64 length = 0; 42 u64 length = 0;
41 u64 owner; 43 u64 owner;
42 44
45 if (operation != CEPH_MDS_OP_SETFILELOCK || cmd == CEPH_LOCK_UNLOCK)
46 wait = 0;
47
43 req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS); 48 req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS);
44 if (IS_ERR(req)) 49 if (IS_ERR(req))
45 return PTR_ERR(req); 50 return PTR_ERR(req);
@@ -68,6 +73,9 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
68 req->r_args.filelock_change.length = cpu_to_le64(length); 73 req->r_args.filelock_change.length = cpu_to_le64(length);
69 req->r_args.filelock_change.wait = wait; 74 req->r_args.filelock_change.wait = wait;
70 75
76 if (wait)
77 req->r_wait_for_completion = ceph_lock_wait_for_completion;
78
71 err = ceph_mdsc_do_request(mdsc, inode, req); 79 err = ceph_mdsc_do_request(mdsc, inode, req);
72 80
73 if (operation == CEPH_MDS_OP_GETFILELOCK) { 81 if (operation == CEPH_MDS_OP_GETFILELOCK) {
@@ -96,6 +104,52 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
96 return err; 104 return err;
97} 105}
98 106
107static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
108 struct ceph_mds_request *req)
109{
110 struct ceph_mds_request *intr_req;
111 struct inode *inode = req->r_inode;
112 int err, lock_type;
113
114 BUG_ON(req->r_op != CEPH_MDS_OP_SETFILELOCK);
115 if (req->r_args.filelock_change.rule == CEPH_LOCK_FCNTL)
116 lock_type = CEPH_LOCK_FCNTL_INTR;
117 else if (req->r_args.filelock_change.rule == CEPH_LOCK_FLOCK)
118 lock_type = CEPH_LOCK_FLOCK_INTR;
119 else
120 BUG_ON(1);
121 BUG_ON(req->r_args.filelock_change.type == CEPH_LOCK_UNLOCK);
122
123 err = wait_for_completion_interruptible(&req->r_completion);
124 if (!err)
125 return 0;
126
127 dout("ceph_lock_wait_for_completion: request %llu was interrupted\n",
128 req->r_tid);
129
130 intr_req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETFILELOCK,
131 USE_AUTH_MDS);
132 if (IS_ERR(intr_req))
133 return PTR_ERR(intr_req);
134
135 intr_req->r_inode = inode;
136 ihold(inode);
137 intr_req->r_num_caps = 1;
138
139 intr_req->r_args.filelock_change = req->r_args.filelock_change;
140 intr_req->r_args.filelock_change.rule = lock_type;
141 intr_req->r_args.filelock_change.type = CEPH_LOCK_UNLOCK;
142
143 err = ceph_mdsc_do_request(mdsc, inode, intr_req);
144 ceph_mdsc_put_request(intr_req);
145
146 if (err && err != -ERESTARTSYS)
147 return err;
148
149 wait_for_completion(&req->r_completion);
150 return 0;
151}
152
99/** 153/**
100 * Attempt to set an fcntl lock. 154 * Attempt to set an fcntl lock.
101 * For now, this just goes away to the server. Later it may be more awesome. 155 * For now, this just goes away to the server. Later it may be more awesome.
@@ -143,11 +197,6 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
143 err); 197 err);
144 } 198 }
145 } 199 }
146
147 } else if (err == -ERESTARTSYS) {
148 dout("undoing lock\n");
149 ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
150 CEPH_LOCK_UNLOCK, 0, fl);
151 } 200 }
152 return err; 201 return err;
153} 202}
@@ -186,11 +235,6 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
186 file, CEPH_LOCK_UNLOCK, 0, fl); 235 file, CEPH_LOCK_UNLOCK, 0, fl);
187 dout("got %d on flock_lock_file_wait, undid lock", err); 236 dout("got %d on flock_lock_file_wait, undid lock", err);
188 } 237 }
189 } else if (err == -ERESTARTSYS) {
190 dout("undoing lock\n");
191 ceph_lock_message(CEPH_LOCK_FLOCK,
192 CEPH_MDS_OP_SETFILELOCK,
193 file, CEPH_LOCK_UNLOCK, 0, fl);
194 } 238 }
195 return err; 239 return err;
196} 240}
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index a92d3f5c6c12..d2171f4a6980 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -89,6 +89,16 @@ static int parse_reply_info_in(void **p, void *end,
89 ceph_decode_need(p, end, info->xattr_len, bad); 89 ceph_decode_need(p, end, info->xattr_len, bad);
90 info->xattr_data = *p; 90 info->xattr_data = *p;
91 *p += info->xattr_len; 91 *p += info->xattr_len;
92
93 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
94 ceph_decode_64_safe(p, end, info->inline_version, bad);
95 ceph_decode_32_safe(p, end, info->inline_len, bad);
96 ceph_decode_need(p, end, info->inline_len, bad);
97 info->inline_data = *p;
98 *p += info->inline_len;
99 } else
100 info->inline_version = CEPH_INLINE_NONE;
101
92 return 0; 102 return 0;
93bad: 103bad:
94 return err; 104 return err;
@@ -524,8 +534,7 @@ void ceph_mdsc_release_request(struct kref *kref)
524 } 534 }
525 if (req->r_locked_dir) 535 if (req->r_locked_dir)
526 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 536 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
527 if (req->r_target_inode) 537 iput(req->r_target_inode);
528 iput(req->r_target_inode);
529 if (req->r_dentry) 538 if (req->r_dentry)
530 dput(req->r_dentry); 539 dput(req->r_dentry);
531 if (req->r_old_dentry) 540 if (req->r_old_dentry)
@@ -861,8 +870,11 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
861 /* 870 /*
862 * Serialize client metadata into waiting buffer space, using 871 * Serialize client metadata into waiting buffer space, using
863 * the format that userspace expects for map<string, string> 872 * the format that userspace expects for map<string, string>
873 *
874 * ClientSession messages with metadata are v2
864 */ 875 */
865 msg->hdr.version = 2; /* ClientSession messages with metadata are v2 */ 876 msg->hdr.version = cpu_to_le16(2);
877 msg->hdr.compat_version = cpu_to_le16(1);
866 878
867 /* The write pointer, following the session_head structure */ 879 /* The write pointer, following the session_head structure */
868 p = msg->front.iov_base + sizeof(*h); 880 p = msg->front.iov_base + sizeof(*h);
@@ -1066,8 +1078,7 @@ out:
1066 session->s_cap_iterator = NULL; 1078 session->s_cap_iterator = NULL;
1067 spin_unlock(&session->s_cap_lock); 1079 spin_unlock(&session->s_cap_lock);
1068 1080
1069 if (last_inode) 1081 iput(last_inode);
1070 iput(last_inode);
1071 if (old_cap) 1082 if (old_cap)
1072 ceph_put_cap(session->s_mdsc, old_cap); 1083 ceph_put_cap(session->s_mdsc, old_cap);
1073 1084
@@ -1874,7 +1885,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1874 goto out_free2; 1885 goto out_free2;
1875 } 1886 }
1876 1887
1877 msg->hdr.version = 2; 1888 msg->hdr.version = cpu_to_le16(2);
1878 msg->hdr.tid = cpu_to_le64(req->r_tid); 1889 msg->hdr.tid = cpu_to_le64(req->r_tid);
1879 1890
1880 head = msg->front.iov_base; 1891 head = msg->front.iov_base;
@@ -2208,6 +2219,8 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2208 &req->r_completion, req->r_timeout); 2219 &req->r_completion, req->r_timeout);
2209 if (err == 0) 2220 if (err == 0)
2210 err = -EIO; 2221 err = -EIO;
2222 } else if (req->r_wait_for_completion) {
2223 err = req->r_wait_for_completion(mdsc, req);
2211 } else { 2224 } else {
2212 err = wait_for_completion_killable(&req->r_completion); 2225 err = wait_for_completion_killable(&req->r_completion);
2213 } 2226 }
@@ -3744,6 +3757,20 @@ static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
3744 return msg; 3757 return msg;
3745} 3758}
3746 3759
3760static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
3761{
3762 struct ceph_mds_session *s = con->private;
3763 struct ceph_auth_handshake *auth = &s->s_auth;
3764 return ceph_auth_sign_message(auth, msg);
3765}
3766
3767static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
3768{
3769 struct ceph_mds_session *s = con->private;
3770 struct ceph_auth_handshake *auth = &s->s_auth;
3771 return ceph_auth_check_message_signature(auth, msg);
3772}
3773
3747static const struct ceph_connection_operations mds_con_ops = { 3774static const struct ceph_connection_operations mds_con_ops = {
3748 .get = con_get, 3775 .get = con_get,
3749 .put = con_put, 3776 .put = con_put,
@@ -3753,6 +3780,8 @@ static const struct ceph_connection_operations mds_con_ops = {
3753 .invalidate_authorizer = invalidate_authorizer, 3780 .invalidate_authorizer = invalidate_authorizer,
3754 .peer_reset = peer_reset, 3781 .peer_reset = peer_reset,
3755 .alloc_msg = mds_alloc_msg, 3782 .alloc_msg = mds_alloc_msg,
3783 .sign_message = sign_message,
3784 .check_message_signature = check_message_signature,
3756}; 3785};
3757 3786
3758/* eof */ 3787/* eof */
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 3288359353e9..e2817d00f7d9 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -41,6 +41,9 @@ struct ceph_mds_reply_info_in {
41 char *symlink; 41 char *symlink;
42 u32 xattr_len; 42 u32 xattr_len;
43 char *xattr_data; 43 char *xattr_data;
44 u64 inline_version;
45 u32 inline_len;
46 char *inline_data;
44}; 47};
45 48
46/* 49/*
@@ -166,6 +169,11 @@ struct ceph_mds_client;
166 */ 169 */
167typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc, 170typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc,
168 struct ceph_mds_request *req); 171 struct ceph_mds_request *req);
172/*
173 * wait for request completion callback
174 */
175typedef int (*ceph_mds_request_wait_callback_t) (struct ceph_mds_client *mdsc,
176 struct ceph_mds_request *req);
169 177
170/* 178/*
171 * an in-flight mds request 179 * an in-flight mds request
@@ -215,6 +223,7 @@ struct ceph_mds_request {
215 int r_request_release_offset; 223 int r_request_release_offset;
216 struct ceph_msg *r_reply; 224 struct ceph_msg *r_reply;
217 struct ceph_mds_reply_info_parsed r_reply_info; 225 struct ceph_mds_reply_info_parsed r_reply_info;
226 struct page *r_locked_page;
218 int r_err; 227 int r_err;
219 bool r_aborted; 228 bool r_aborted;
220 229
@@ -239,6 +248,7 @@ struct ceph_mds_request {
239 struct completion r_completion; 248 struct completion r_completion;
240 struct completion r_safe_completion; 249 struct completion r_safe_completion;
241 ceph_mds_request_callback_t r_callback; 250 ceph_mds_request_callback_t r_callback;
251 ceph_mds_request_wait_callback_t r_wait_for_completion;
242 struct list_head r_unsafe_item; /* per-session unsafe list item */ 252 struct list_head r_unsafe_item; /* per-session unsafe list item */
243 bool r_got_unsafe, r_got_safe, r_got_result; 253 bool r_got_unsafe, r_got_safe, r_got_result;
244 254
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index f01645a27752..ce35fbd4ba5d 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -288,6 +288,9 @@ static int cmpu64_rev(const void *a, const void *b)
288 return 0; 288 return 0;
289} 289}
290 290
291
292static struct ceph_snap_context *empty_snapc;
293
291/* 294/*
292 * build the snap context for a given realm. 295 * build the snap context for a given realm.
293 */ 296 */
@@ -328,6 +331,12 @@ static int build_snap_context(struct ceph_snap_realm *realm)
328 return 0; 331 return 0;
329 } 332 }
330 333
334 if (num == 0 && realm->seq == empty_snapc->seq) {
335 ceph_get_snap_context(empty_snapc);
336 snapc = empty_snapc;
337 goto done;
338 }
339
331 /* alloc new snap context */ 340 /* alloc new snap context */
332 err = -ENOMEM; 341 err = -ENOMEM;
333 if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64)) 342 if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64))
@@ -365,8 +374,8 @@ static int build_snap_context(struct ceph_snap_realm *realm)
365 realm->ino, realm, snapc, snapc->seq, 374 realm->ino, realm, snapc, snapc->seq,
366 (unsigned int) snapc->num_snaps); 375 (unsigned int) snapc->num_snaps);
367 376
368 if (realm->cached_context) 377done:
369 ceph_put_snap_context(realm->cached_context); 378 ceph_put_snap_context(realm->cached_context);
370 realm->cached_context = snapc; 379 realm->cached_context = snapc;
371 return 0; 380 return 0;
372 381
@@ -466,6 +475,9 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
466 cap_snap. lucky us. */ 475 cap_snap. lucky us. */
467 dout("queue_cap_snap %p already pending\n", inode); 476 dout("queue_cap_snap %p already pending\n", inode);
468 kfree(capsnap); 477 kfree(capsnap);
478 } else if (ci->i_snap_realm->cached_context == empty_snapc) {
479 dout("queue_cap_snap %p empty snapc\n", inode);
480 kfree(capsnap);
469 } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL| 481 } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
470 CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) { 482 CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
471 struct ceph_snap_context *snapc = ci->i_head_snapc; 483 struct ceph_snap_context *snapc = ci->i_head_snapc;
@@ -504,6 +516,8 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
504 capsnap->xattr_version = 0; 516 capsnap->xattr_version = 0;
505 } 517 }
506 518
519 capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
520
507 /* dirty page count moved from _head to this cap_snap; 521 /* dirty page count moved from _head to this cap_snap;
508 all subsequent writes page dirties occur _after_ this 522 all subsequent writes page dirties occur _after_ this
509 snapshot. */ 523 snapshot. */
@@ -590,15 +604,13 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
590 if (!inode) 604 if (!inode)
591 continue; 605 continue;
592 spin_unlock(&realm->inodes_with_caps_lock); 606 spin_unlock(&realm->inodes_with_caps_lock);
593 if (lastinode) 607 iput(lastinode);
594 iput(lastinode);
595 lastinode = inode; 608 lastinode = inode;
596 ceph_queue_cap_snap(ci); 609 ceph_queue_cap_snap(ci);
597 spin_lock(&realm->inodes_with_caps_lock); 610 spin_lock(&realm->inodes_with_caps_lock);
598 } 611 }
599 spin_unlock(&realm->inodes_with_caps_lock); 612 spin_unlock(&realm->inodes_with_caps_lock);
600 if (lastinode) 613 iput(lastinode);
601 iput(lastinode);
602 614
603 list_for_each_entry(child, &realm->children, child_item) { 615 list_for_each_entry(child, &realm->children, child_item) {
604 dout("queue_realm_cap_snaps %p %llx queue child %p %llx\n", 616 dout("queue_realm_cap_snaps %p %llx queue child %p %llx\n",
@@ -928,5 +940,16 @@ out:
928 return; 940 return;
929} 941}
930 942
943int __init ceph_snap_init(void)
944{
945 empty_snapc = ceph_create_snap_context(0, GFP_NOFS);
946 if (!empty_snapc)
947 return -ENOMEM;
948 empty_snapc->seq = 1;
949 return 0;
950}
931 951
932 952void ceph_snap_exit(void)
953{
954 ceph_put_snap_context(empty_snapc);
955}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index f6e12377335c..50f06cddc94b 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -515,7 +515,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
515 struct ceph_fs_client *fsc; 515 struct ceph_fs_client *fsc;
516 const u64 supported_features = 516 const u64 supported_features =
517 CEPH_FEATURE_FLOCK | 517 CEPH_FEATURE_FLOCK |
518 CEPH_FEATURE_DIRLAYOUTHASH; 518 CEPH_FEATURE_DIRLAYOUTHASH |
519 CEPH_FEATURE_MDS_INLINE_DATA;
519 const u64 required_features = 0; 520 const u64 required_features = 0;
520 int page_count; 521 int page_count;
521 size_t size; 522 size_t size;
@@ -1017,9 +1018,6 @@ static struct file_system_type ceph_fs_type = {
1017}; 1018};
1018MODULE_ALIAS_FS("ceph"); 1019MODULE_ALIAS_FS("ceph");
1019 1020
1020#define _STRINGIFY(x) #x
1021#define STRINGIFY(x) _STRINGIFY(x)
1022
1023static int __init init_ceph(void) 1021static int __init init_ceph(void)
1024{ 1022{
1025 int ret = init_caches(); 1023 int ret = init_caches();
@@ -1028,15 +1026,20 @@ static int __init init_ceph(void)
1028 1026
1029 ceph_flock_init(); 1027 ceph_flock_init();
1030 ceph_xattr_init(); 1028 ceph_xattr_init();
1029 ret = ceph_snap_init();
1030 if (ret)
1031 goto out_xattr;
1031 ret = register_filesystem(&ceph_fs_type); 1032 ret = register_filesystem(&ceph_fs_type);
1032 if (ret) 1033 if (ret)
1033 goto out_icache; 1034 goto out_snap;
1034 1035
1035 pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL); 1036 pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL);
1036 1037
1037 return 0; 1038 return 0;
1038 1039
1039out_icache: 1040out_snap:
1041 ceph_snap_exit();
1042out_xattr:
1040 ceph_xattr_exit(); 1043 ceph_xattr_exit();
1041 destroy_caches(); 1044 destroy_caches();
1042out: 1045out:
@@ -1047,6 +1050,7 @@ static void __exit exit_ceph(void)
1047{ 1050{
1048 dout("exit_ceph\n"); 1051 dout("exit_ceph\n");
1049 unregister_filesystem(&ceph_fs_type); 1052 unregister_filesystem(&ceph_fs_type);
1053 ceph_snap_exit();
1050 ceph_xattr_exit(); 1054 ceph_xattr_exit();
1051 destroy_caches(); 1055 destroy_caches();
1052} 1056}
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index b82f507979b8..e1aa32d0759d 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -161,6 +161,7 @@ struct ceph_cap_snap {
161 u64 time_warp_seq; 161 u64 time_warp_seq;
162 int writing; /* a sync write is still in progress */ 162 int writing; /* a sync write is still in progress */
163 int dirty_pages; /* dirty pages awaiting writeback */ 163 int dirty_pages; /* dirty pages awaiting writeback */
164 bool inline_data;
164}; 165};
165 166
166static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) 167static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
@@ -253,9 +254,11 @@ struct ceph_inode_info {
253 spinlock_t i_ceph_lock; 254 spinlock_t i_ceph_lock;
254 255
255 u64 i_version; 256 u64 i_version;
257 u64 i_inline_version;
256 u32 i_time_warp_seq; 258 u32 i_time_warp_seq;
257 259
258 unsigned i_ceph_flags; 260 unsigned i_ceph_flags;
261 int i_ordered_count;
259 atomic_t i_release_count; 262 atomic_t i_release_count;
260 atomic_t i_complete_count; 263 atomic_t i_complete_count;
261 264
@@ -434,14 +437,19 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
434/* 437/*
435 * Ceph inode. 438 * Ceph inode.
436 */ 439 */
437#define CEPH_I_NODELAY 4 /* do not delay cap release */ 440#define CEPH_I_DIR_ORDERED 1 /* dentries in dir are ordered */
438#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */ 441#define CEPH_I_NODELAY 4 /* do not delay cap release */
439#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */ 442#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */
443#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */
440 444
441static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, 445static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
442 int release_count) 446 int release_count, int ordered_count)
443{ 447{
444 atomic_set(&ci->i_complete_count, release_count); 448 atomic_set(&ci->i_complete_count, release_count);
449 if (ci->i_ordered_count == ordered_count)
450 ci->i_ceph_flags |= CEPH_I_DIR_ORDERED;
451 else
452 ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
445} 453}
446 454
447static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci) 455static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci)
@@ -455,16 +463,35 @@ static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci)
455 atomic_read(&ci->i_release_count); 463 atomic_read(&ci->i_release_count);
456} 464}
457 465
466static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci)
467{
468 return __ceph_dir_is_complete(ci) &&
469 (ci->i_ceph_flags & CEPH_I_DIR_ORDERED);
470}
471
458static inline void ceph_dir_clear_complete(struct inode *inode) 472static inline void ceph_dir_clear_complete(struct inode *inode)
459{ 473{
460 __ceph_dir_clear_complete(ceph_inode(inode)); 474 __ceph_dir_clear_complete(ceph_inode(inode));
461} 475}
462 476
463static inline bool ceph_dir_is_complete(struct inode *inode) 477static inline void ceph_dir_clear_ordered(struct inode *inode)
464{ 478{
465 return __ceph_dir_is_complete(ceph_inode(inode)); 479 struct ceph_inode_info *ci = ceph_inode(inode);
480 spin_lock(&ci->i_ceph_lock);
481 ci->i_ordered_count++;
482 ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
483 spin_unlock(&ci->i_ceph_lock);
466} 484}
467 485
486static inline bool ceph_dir_is_complete_ordered(struct inode *inode)
487{
488 struct ceph_inode_info *ci = ceph_inode(inode);
489 bool ret;
490 spin_lock(&ci->i_ceph_lock);
491 ret = __ceph_dir_is_complete_ordered(ci);
492 spin_unlock(&ci->i_ceph_lock);
493 return ret;
494}
468 495
469/* find a specific frag @f */ 496/* find a specific frag @f */
470extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, 497extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci,
@@ -580,6 +607,7 @@ struct ceph_file_info {
580 char *last_name; /* last entry in previous chunk */ 607 char *last_name; /* last entry in previous chunk */
581 struct dentry *dentry; /* next dentry (for dcache readdir) */ 608 struct dentry *dentry; /* next dentry (for dcache readdir) */
582 int dir_release_count; 609 int dir_release_count;
610 int dir_ordered_count;
583 611
584 /* used for -o dirstat read() on directory thing */ 612 /* used for -o dirstat read() on directory thing */
585 char *dir_info; 613 char *dir_info;
@@ -673,6 +701,8 @@ extern void ceph_queue_cap_snap(struct ceph_inode_info *ci);
673extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci, 701extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
674 struct ceph_cap_snap *capsnap); 702 struct ceph_cap_snap *capsnap);
675extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc); 703extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
704extern int ceph_snap_init(void);
705extern void ceph_snap_exit(void);
676 706
677/* 707/*
678 * a cap_snap is "pending" if it is still awaiting an in-progress 708 * a cap_snap is "pending" if it is still awaiting an in-progress
@@ -715,7 +745,12 @@ extern void ceph_queue_vmtruncate(struct inode *inode);
715extern void ceph_queue_invalidate(struct inode *inode); 745extern void ceph_queue_invalidate(struct inode *inode);
716extern void ceph_queue_writeback(struct inode *inode); 746extern void ceph_queue_writeback(struct inode *inode);
717 747
718extern int ceph_do_getattr(struct inode *inode, int mask, bool force); 748extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
749 int mask, bool force);
750static inline int ceph_do_getattr(struct inode *inode, int mask, bool force)
751{
752 return __ceph_do_getattr(inode, NULL, mask, force);
753}
719extern int ceph_permission(struct inode *inode, int mask); 754extern int ceph_permission(struct inode *inode, int mask);
720extern int ceph_setattr(struct dentry *dentry, struct iattr *attr); 755extern int ceph_setattr(struct dentry *dentry, struct iattr *attr);
721extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry, 756extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
@@ -830,7 +865,7 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
830 int mds, int drop, int unless); 865 int mds, int drop, int unless);
831 866
832extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, 867extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
833 int *got, loff_t endoff); 868 loff_t endoff, int *got, struct page **pinned_page);
834 869
835/* for counting open files by mode */ 870/* for counting open files by mode */
836static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode) 871static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode)
@@ -852,7 +887,9 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
852 struct file *file, unsigned flags, umode_t mode, 887 struct file *file, unsigned flags, umode_t mode,
853 int *opened); 888 int *opened);
854extern int ceph_release(struct inode *inode, struct file *filp); 889extern int ceph_release(struct inode *inode, struct file *filp);
855 890extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
891 char *data, size_t len);
892int ceph_uninline_data(struct file *filp, struct page *locked_page);
856/* dir.c */ 893/* dir.c */
857extern const struct file_operations ceph_dir_fops; 894extern const struct file_operations ceph_dir_fops;
858extern const struct inode_operations ceph_dir_iops; 895extern const struct inode_operations ceph_dir_iops;
diff --git a/fs/ceph/super.h.rej b/fs/ceph/super.h.rej
new file mode 100644
index 000000000000..88fe3dfadb29
--- /dev/null
+++ b/fs/ceph/super.h.rej
@@ -0,0 +1,10 @@
1--- fs/ceph/super.h
2+++ fs/ceph/super.h
3@@ -254,6 +255,7 @@
4 spinlock_t i_ceph_lock;
5
6 u64 i_version;
7+ u64 i_inline_version;
8 u32 i_time_warp_seq;
9
10 unsigned i_ceph_flags;
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 678b0d2bbbc4..5a492caf34cb 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -854,7 +854,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
854 struct ceph_pagelist *pagelist = NULL; 854 struct ceph_pagelist *pagelist = NULL;
855 int err; 855 int err;
856 856
857 if (value) { 857 if (size > 0) {
858 /* copy value into pagelist */ 858 /* copy value into pagelist */
859 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 859 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
860 if (!pagelist) 860 if (!pagelist)
@@ -864,7 +864,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
864 err = ceph_pagelist_append(pagelist, value, size); 864 err = ceph_pagelist_append(pagelist, value, size);
865 if (err) 865 if (err)
866 goto out; 866 goto out;
867 } else { 867 } else if (!value) {
868 flags |= CEPH_XATTR_REMOVE; 868 flags |= CEPH_XATTR_REMOVE;
869 } 869 }
870 870
@@ -1001,6 +1001,9 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
1001 if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN)) 1001 if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
1002 return generic_setxattr(dentry, name, value, size, flags); 1002 return generic_setxattr(dentry, name, value, size, flags);
1003 1003
1004 if (size == 0)
1005 value = ""; /* empty EA, do not remove */
1006
1004 return __ceph_setxattr(dentry, name, value, size, flags); 1007 return __ceph_setxattr(dentry, name, value, size, flags);
1005} 1008}
1006 1009