aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--drivers/block/rbd.c11
-rw-r--r--fs/ceph/addr.c273
-rw-r--r--fs/ceph/caps.c132
-rw-r--r--fs/ceph/dir.c27
-rw-r--r--fs/ceph/file.c97
-rw-r--r--fs/ceph/inode.c59
-rw-r--r--fs/ceph/locks.c64
-rw-r--r--fs/ceph/mds_client.c41
-rw-r--r--fs/ceph/mds_client.h10
-rw-r--r--fs/ceph/snap.c37
-rw-r--r--fs/ceph/super.c16
-rw-r--r--fs/ceph/super.h55
-rw-r--r--fs/ceph/super.h.rej10
-rw-r--r--fs/ceph/xattr.c7
-rw-r--r--include/linux/ceph/auth.h26
-rw-r--r--include/linux/ceph/buffer.h3
-rw-r--r--include/linux/ceph/ceph_features.h1
-rw-r--r--include/linux/ceph/ceph_fs.h10
-rw-r--r--include/linux/ceph/libceph.h2
-rw-r--r--include/linux/ceph/messenger.h9
-rw-r--r--include/linux/ceph/msgr.h11
-rw-r--r--include/linux/ceph/osd_client.h13
-rw-r--r--include/linux/ceph/pagelist.h4
-rw-r--r--net/ceph/auth_x.c76
-rw-r--r--net/ceph/auth_x.h1
-rw-r--r--net/ceph/buffer.c4
-rw-r--r--net/ceph/ceph_common.c21
-rw-r--r--net/ceph/messenger.c34
-rw-r--r--net/ceph/osd_client.c118
29 files changed, 992 insertions, 180 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 27b71a0b72d0..3ec85dfce124 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -2370,8 +2370,12 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2370 opcode = CEPH_OSD_OP_READ; 2370 opcode = CEPH_OSD_OP_READ;
2371 } 2371 }
2372 2372
2373 osd_req_op_extent_init(osd_request, num_ops, opcode, offset, length, 2373 if (opcode == CEPH_OSD_OP_DELETE)
2374 0, 0); 2374 osd_req_op_init(osd_request, num_ops, opcode);
2375 else
2376 osd_req_op_extent_init(osd_request, num_ops, opcode,
2377 offset, length, 0, 0);
2378
2375 if (obj_request->type == OBJ_REQUEST_BIO) 2379 if (obj_request->type == OBJ_REQUEST_BIO)
2376 osd_req_op_extent_osd_data_bio(osd_request, num_ops, 2380 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2377 obj_request->bio_list, length); 2381 obj_request->bio_list, length);
@@ -3405,8 +3409,7 @@ err_rq:
3405 if (result) 3409 if (result)
3406 rbd_warn(rbd_dev, "%s %llx at %llx result %d", 3410 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3407 obj_op_name(op_type), length, offset, result); 3411 obj_op_name(op_type), length, offset, result);
3408 if (snapc) 3412 ceph_put_snap_context(snapc);
3409 ceph_put_snap_context(snapc);
3410 blk_end_request_all(rq, result); 3413 blk_end_request_all(rq, result);
3411} 3414}
3412 3415
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 18c06bbaf136..f5013d92a7e6 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -192,17 +192,30 @@ static int readpage_nounlock(struct file *filp, struct page *page)
192 struct ceph_osd_client *osdc = 192 struct ceph_osd_client *osdc =
193 &ceph_inode_to_client(inode)->client->osdc; 193 &ceph_inode_to_client(inode)->client->osdc;
194 int err = 0; 194 int err = 0;
195 u64 off = page_offset(page);
195 u64 len = PAGE_CACHE_SIZE; 196 u64 len = PAGE_CACHE_SIZE;
196 197
197 err = ceph_readpage_from_fscache(inode, page); 198 if (off >= i_size_read(inode)) {
199 zero_user_segment(page, err, PAGE_CACHE_SIZE);
200 SetPageUptodate(page);
201 return 0;
202 }
198 203
204 /*
205 * Uptodate inline data should have been added into page cache
206 * while getting Fcr caps.
207 */
208 if (ci->i_inline_version != CEPH_INLINE_NONE)
209 return -EINVAL;
210
211 err = ceph_readpage_from_fscache(inode, page);
199 if (err == 0) 212 if (err == 0)
200 goto out; 213 goto out;
201 214
202 dout("readpage inode %p file %p page %p index %lu\n", 215 dout("readpage inode %p file %p page %p index %lu\n",
203 inode, filp, page, page->index); 216 inode, filp, page, page->index);
204 err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, 217 err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
205 (u64) page_offset(page), &len, 218 off, &len,
206 ci->i_truncate_seq, ci->i_truncate_size, 219 ci->i_truncate_seq, ci->i_truncate_size,
207 &page, 1, 0); 220 &page, 1, 0);
208 if (err == -ENOENT) 221 if (err == -ENOENT)
@@ -319,7 +332,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
319 off, len); 332 off, len);
320 vino = ceph_vino(inode); 333 vino = ceph_vino(inode);
321 req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, 334 req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
322 1, CEPH_OSD_OP_READ, 335 0, 1, CEPH_OSD_OP_READ,
323 CEPH_OSD_FLAG_READ, NULL, 336 CEPH_OSD_FLAG_READ, NULL,
324 ci->i_truncate_seq, ci->i_truncate_size, 337 ci->i_truncate_seq, ci->i_truncate_size,
325 false); 338 false);
@@ -384,6 +397,9 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
384 int rc = 0; 397 int rc = 0;
385 int max = 0; 398 int max = 0;
386 399
400 if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
401 return -EINVAL;
402
387 rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list, 403 rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
388 &nr_pages); 404 &nr_pages);
389 405
@@ -673,7 +689,7 @@ static int ceph_writepages_start(struct address_space *mapping,
673 int rc = 0; 689 int rc = 0;
674 unsigned wsize = 1 << inode->i_blkbits; 690 unsigned wsize = 1 << inode->i_blkbits;
675 struct ceph_osd_request *req = NULL; 691 struct ceph_osd_request *req = NULL;
676 int do_sync; 692 int do_sync = 0;
677 u64 truncate_size, snap_size; 693 u64 truncate_size, snap_size;
678 u32 truncate_seq; 694 u32 truncate_seq;
679 695
@@ -750,7 +766,6 @@ retry:
750 last_snapc = snapc; 766 last_snapc = snapc;
751 767
752 while (!done && index <= end) { 768 while (!done && index <= end) {
753 int num_ops = do_sync ? 2 : 1;
754 unsigned i; 769 unsigned i;
755 int first; 770 int first;
756 pgoff_t next; 771 pgoff_t next;
@@ -850,7 +865,8 @@ get_more_pages:
850 len = wsize; 865 len = wsize;
851 req = ceph_osdc_new_request(&fsc->client->osdc, 866 req = ceph_osdc_new_request(&fsc->client->osdc,
852 &ci->i_layout, vino, 867 &ci->i_layout, vino,
853 offset, &len, num_ops, 868 offset, &len, 0,
869 do_sync ? 2 : 1,
854 CEPH_OSD_OP_WRITE, 870 CEPH_OSD_OP_WRITE,
855 CEPH_OSD_FLAG_WRITE | 871 CEPH_OSD_FLAG_WRITE |
856 CEPH_OSD_FLAG_ONDISK, 872 CEPH_OSD_FLAG_ONDISK,
@@ -862,6 +878,9 @@ get_more_pages:
862 break; 878 break;
863 } 879 }
864 880
881 if (do_sync)
882 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
883
865 req->r_callback = writepages_finish; 884 req->r_callback = writepages_finish;
866 req->r_inode = inode; 885 req->r_inode = inode;
867 886
@@ -1204,6 +1223,7 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1204 struct inode *inode = file_inode(vma->vm_file); 1223 struct inode *inode = file_inode(vma->vm_file);
1205 struct ceph_inode_info *ci = ceph_inode(inode); 1224 struct ceph_inode_info *ci = ceph_inode(inode);
1206 struct ceph_file_info *fi = vma->vm_file->private_data; 1225 struct ceph_file_info *fi = vma->vm_file->private_data;
1226 struct page *pinned_page = NULL;
1207 loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT; 1227 loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT;
1208 int want, got, ret; 1228 int want, got, ret;
1209 1229
@@ -1215,7 +1235,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1215 want = CEPH_CAP_FILE_CACHE; 1235 want = CEPH_CAP_FILE_CACHE;
1216 while (1) { 1236 while (1) {
1217 got = 0; 1237 got = 0;
1218 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); 1238 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want,
1239 -1, &got, &pinned_page);
1219 if (ret == 0) 1240 if (ret == 0)
1220 break; 1241 break;
1221 if (ret != -ERESTARTSYS) { 1242 if (ret != -ERESTARTSYS) {
@@ -1226,12 +1247,54 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
1226 dout("filemap_fault %p %llu~%zd got cap refs on %s\n", 1247 dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
1227 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got)); 1248 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got));
1228 1249
1229 ret = filemap_fault(vma, vmf); 1250 if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
1251 ci->i_inline_version == CEPH_INLINE_NONE)
1252 ret = filemap_fault(vma, vmf);
1253 else
1254 ret = -EAGAIN;
1230 1255
1231 dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", 1256 dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
1232 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret); 1257 inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret);
1258 if (pinned_page)
1259 page_cache_release(pinned_page);
1233 ceph_put_cap_refs(ci, got); 1260 ceph_put_cap_refs(ci, got);
1234 1261
1262 if (ret != -EAGAIN)
1263 return ret;
1264
1265 /* read inline data */
1266 if (off >= PAGE_CACHE_SIZE) {
1267 /* does not support inline data > PAGE_SIZE */
1268 ret = VM_FAULT_SIGBUS;
1269 } else {
1270 int ret1;
1271 struct address_space *mapping = inode->i_mapping;
1272 struct page *page = find_or_create_page(mapping, 0,
1273 mapping_gfp_mask(mapping) &
1274 ~__GFP_FS);
1275 if (!page) {
1276 ret = VM_FAULT_OOM;
1277 goto out;
1278 }
1279 ret1 = __ceph_do_getattr(inode, page,
1280 CEPH_STAT_CAP_INLINE_DATA, true);
1281 if (ret1 < 0 || off >= i_size_read(inode)) {
1282 unlock_page(page);
1283 page_cache_release(page);
1284 ret = VM_FAULT_SIGBUS;
1285 goto out;
1286 }
1287 if (ret1 < PAGE_CACHE_SIZE)
1288 zero_user_segment(page, ret1, PAGE_CACHE_SIZE);
1289 else
1290 flush_dcache_page(page);
1291 SetPageUptodate(page);
1292 vmf->page = page;
1293 ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
1294 }
1295out:
1296 dout("filemap_fault %p %llu~%zd read inline data ret %d\n",
1297 inode, off, (size_t)PAGE_CACHE_SIZE, ret);
1235 return ret; 1298 return ret;
1236} 1299}
1237 1300
@@ -1250,6 +1313,19 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1250 size_t len; 1313 size_t len;
1251 int want, got, ret; 1314 int want, got, ret;
1252 1315
1316 if (ci->i_inline_version != CEPH_INLINE_NONE) {
1317 struct page *locked_page = NULL;
1318 if (off == 0) {
1319 lock_page(page);
1320 locked_page = page;
1321 }
1322 ret = ceph_uninline_data(vma->vm_file, locked_page);
1323 if (locked_page)
1324 unlock_page(locked_page);
1325 if (ret < 0)
1326 return VM_FAULT_SIGBUS;
1327 }
1328
1253 if (off + PAGE_CACHE_SIZE <= size) 1329 if (off + PAGE_CACHE_SIZE <= size)
1254 len = PAGE_CACHE_SIZE; 1330 len = PAGE_CACHE_SIZE;
1255 else 1331 else
@@ -1263,7 +1339,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1263 want = CEPH_CAP_FILE_BUFFER; 1339 want = CEPH_CAP_FILE_BUFFER;
1264 while (1) { 1340 while (1) {
1265 got = 0; 1341 got = 0;
1266 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len); 1342 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
1343 &got, NULL);
1267 if (ret == 0) 1344 if (ret == 0)
1268 break; 1345 break;
1269 if (ret != -ERESTARTSYS) { 1346 if (ret != -ERESTARTSYS) {
@@ -1297,11 +1374,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1297 ret = VM_FAULT_SIGBUS; 1374 ret = VM_FAULT_SIGBUS;
1298 } 1375 }
1299out: 1376out:
1300 if (ret != VM_FAULT_LOCKED) { 1377 if (ret != VM_FAULT_LOCKED)
1301 unlock_page(page); 1378 unlock_page(page);
1302 } else { 1379 if (ret == VM_FAULT_LOCKED ||
1380 ci->i_inline_version != CEPH_INLINE_NONE) {
1303 int dirty; 1381 int dirty;
1304 spin_lock(&ci->i_ceph_lock); 1382 spin_lock(&ci->i_ceph_lock);
1383 ci->i_inline_version = CEPH_INLINE_NONE;
1305 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); 1384 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
1306 spin_unlock(&ci->i_ceph_lock); 1385 spin_unlock(&ci->i_ceph_lock);
1307 if (dirty) 1386 if (dirty)
@@ -1315,6 +1394,178 @@ out:
1315 return ret; 1394 return ret;
1316} 1395}
1317 1396
1397void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
1398 char *data, size_t len)
1399{
1400 struct address_space *mapping = inode->i_mapping;
1401 struct page *page;
1402
1403 if (locked_page) {
1404 page = locked_page;
1405 } else {
1406 if (i_size_read(inode) == 0)
1407 return;
1408 page = find_or_create_page(mapping, 0,
1409 mapping_gfp_mask(mapping) & ~__GFP_FS);
1410 if (!page)
1411 return;
1412 if (PageUptodate(page)) {
1413 unlock_page(page);
1414 page_cache_release(page);
1415 return;
1416 }
1417 }
1418
1419 dout("fill_inline_data %p %llx.%llx len %lu locked_page %p\n",
1420 inode, ceph_vinop(inode), len, locked_page);
1421
1422 if (len > 0) {
1423 void *kaddr = kmap_atomic(page);
1424 memcpy(kaddr, data, len);
1425 kunmap_atomic(kaddr);
1426 }
1427
1428 if (page != locked_page) {
1429 if (len < PAGE_CACHE_SIZE)
1430 zero_user_segment(page, len, PAGE_CACHE_SIZE);
1431 else
1432 flush_dcache_page(page);
1433
1434 SetPageUptodate(page);
1435 unlock_page(page);
1436 page_cache_release(page);
1437 }
1438}
1439
1440int ceph_uninline_data(struct file *filp, struct page *locked_page)
1441{
1442 struct inode *inode = file_inode(filp);
1443 struct ceph_inode_info *ci = ceph_inode(inode);
1444 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1445 struct ceph_osd_request *req;
1446 struct page *page = NULL;
1447 u64 len, inline_version;
1448 int err = 0;
1449 bool from_pagecache = false;
1450
1451 spin_lock(&ci->i_ceph_lock);
1452 inline_version = ci->i_inline_version;
1453 spin_unlock(&ci->i_ceph_lock);
1454
1455 dout("uninline_data %p %llx.%llx inline_version %llu\n",
1456 inode, ceph_vinop(inode), inline_version);
1457
1458 if (inline_version == 1 || /* initial version, no data */
1459 inline_version == CEPH_INLINE_NONE)
1460 goto out;
1461
1462 if (locked_page) {
1463 page = locked_page;
1464 WARN_ON(!PageUptodate(page));
1465 } else if (ceph_caps_issued(ci) &
1466 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
1467 page = find_get_page(inode->i_mapping, 0);
1468 if (page) {
1469 if (PageUptodate(page)) {
1470 from_pagecache = true;
1471 lock_page(page);
1472 } else {
1473 page_cache_release(page);
1474 page = NULL;
1475 }
1476 }
1477 }
1478
1479 if (page) {
1480 len = i_size_read(inode);
1481 if (len > PAGE_CACHE_SIZE)
1482 len = PAGE_CACHE_SIZE;
1483 } else {
1484 page = __page_cache_alloc(GFP_NOFS);
1485 if (!page) {
1486 err = -ENOMEM;
1487 goto out;
1488 }
1489 err = __ceph_do_getattr(inode, page,
1490 CEPH_STAT_CAP_INLINE_DATA, true);
1491 if (err < 0) {
1492 /* no inline data */
1493 if (err == -ENODATA)
1494 err = 0;
1495 goto out;
1496 }
1497 len = err;
1498 }
1499
1500 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1501 ceph_vino(inode), 0, &len, 0, 1,
1502 CEPH_OSD_OP_CREATE,
1503 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1504 ci->i_snap_realm->cached_context,
1505 0, 0, false);
1506 if (IS_ERR(req)) {
1507 err = PTR_ERR(req);
1508 goto out;
1509 }
1510
1511 ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
1512 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1513 if (!err)
1514 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1515 ceph_osdc_put_request(req);
1516 if (err < 0)
1517 goto out;
1518
1519 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1520 ceph_vino(inode), 0, &len, 1, 3,
1521 CEPH_OSD_OP_WRITE,
1522 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1523 ci->i_snap_realm->cached_context,
1524 ci->i_truncate_seq, ci->i_truncate_size,
1525 false);
1526 if (IS_ERR(req)) {
1527 err = PTR_ERR(req);
1528 goto out;
1529 }
1530
1531 osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
1532
1533 err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
1534 "inline_version", &inline_version,
1535 sizeof(inline_version),
1536 CEPH_OSD_CMPXATTR_OP_GT,
1537 CEPH_OSD_CMPXATTR_MODE_U64);
1538 if (err)
1539 goto out_put;
1540
1541 err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
1542 "inline_version", &inline_version,
1543 sizeof(inline_version), 0, 0);
1544 if (err)
1545 goto out_put;
1546
1547 ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
1548 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1549 if (!err)
1550 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1551out_put:
1552 ceph_osdc_put_request(req);
1553 if (err == -ECANCELED)
1554 err = 0;
1555out:
1556 if (page && page != locked_page) {
1557 if (from_pagecache) {
1558 unlock_page(page);
1559 page_cache_release(page);
1560 } else
1561 __free_pages(page, 0);
1562 }
1563
1564 dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
1565 inode, ceph_vinop(inode), inline_version, err);
1566 return err;
1567}
1568
1318static struct vm_operations_struct ceph_vmops = { 1569static struct vm_operations_struct ceph_vmops = {
1319 .fault = ceph_filemap_fault, 1570 .fault = ceph_filemap_fault,
1320 .page_mkwrite = ceph_page_mkwrite, 1571 .page_mkwrite = ceph_page_mkwrite,
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index cefca661464b..b93c631c6c87 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -975,10 +975,12 @@ static int send_cap_msg(struct ceph_mds_session *session,
975 kuid_t uid, kgid_t gid, umode_t mode, 975 kuid_t uid, kgid_t gid, umode_t mode,
976 u64 xattr_version, 976 u64 xattr_version,
977 struct ceph_buffer *xattrs_buf, 977 struct ceph_buffer *xattrs_buf,
978 u64 follows) 978 u64 follows, bool inline_data)
979{ 979{
980 struct ceph_mds_caps *fc; 980 struct ceph_mds_caps *fc;
981 struct ceph_msg *msg; 981 struct ceph_msg *msg;
982 void *p;
983 size_t extra_len;
982 984
983 dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" 985 dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
984 " seq %u/%u mseq %u follows %lld size %llu/%llu" 986 " seq %u/%u mseq %u follows %lld size %llu/%llu"
@@ -988,7 +990,10 @@ static int send_cap_msg(struct ceph_mds_session *session,
988 seq, issue_seq, mseq, follows, size, max_size, 990 seq, issue_seq, mseq, follows, size, max_size,
989 xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); 991 xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
990 992
991 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false); 993 /* flock buffer size + inline version + inline data size */
994 extra_len = 4 + 8 + 4;
995 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
996 GFP_NOFS, false);
992 if (!msg) 997 if (!msg)
993 return -ENOMEM; 998 return -ENOMEM;
994 999
@@ -1020,6 +1025,14 @@ static int send_cap_msg(struct ceph_mds_session *session,
1020 fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid)); 1025 fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid));
1021 fc->mode = cpu_to_le32(mode); 1026 fc->mode = cpu_to_le32(mode);
1022 1027
1028 p = fc + 1;
1029 /* flock buffer size */
1030 ceph_encode_32(&p, 0);
1031 /* inline version */
1032 ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE);
1033 /* inline data size */
1034 ceph_encode_32(&p, 0);
1035
1023 fc->xattr_version = cpu_to_le64(xattr_version); 1036 fc->xattr_version = cpu_to_le64(xattr_version);
1024 if (xattrs_buf) { 1037 if (xattrs_buf) {
1025 msg->middle = ceph_buffer_get(xattrs_buf); 1038 msg->middle = ceph_buffer_get(xattrs_buf);
@@ -1126,6 +1139,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1126 u64 flush_tid = 0; 1139 u64 flush_tid = 0;
1127 int i; 1140 int i;
1128 int ret; 1141 int ret;
1142 bool inline_data;
1129 1143
1130 held = cap->issued | cap->implemented; 1144 held = cap->issued | cap->implemented;
1131 revoking = cap->implemented & ~cap->issued; 1145 revoking = cap->implemented & ~cap->issued;
@@ -1209,13 +1223,15 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1209 xattr_version = ci->i_xattrs.version; 1223 xattr_version = ci->i_xattrs.version;
1210 } 1224 }
1211 1225
1226 inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
1227
1212 spin_unlock(&ci->i_ceph_lock); 1228 spin_unlock(&ci->i_ceph_lock);
1213 1229
1214 ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, 1230 ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
1215 op, keep, want, flushing, seq, flush_tid, issue_seq, mseq, 1231 op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
1216 size, max_size, &mtime, &atime, time_warp_seq, 1232 size, max_size, &mtime, &atime, time_warp_seq,
1217 uid, gid, mode, xattr_version, xattr_blob, 1233 uid, gid, mode, xattr_version, xattr_blob,
1218 follows); 1234 follows, inline_data);
1219 if (ret < 0) { 1235 if (ret < 0) {
1220 dout("error sending cap msg, must requeue %p\n", inode); 1236 dout("error sending cap msg, must requeue %p\n", inode);
1221 delayed = 1; 1237 delayed = 1;
@@ -1336,7 +1352,7 @@ retry:
1336 capsnap->time_warp_seq, 1352 capsnap->time_warp_seq,
1337 capsnap->uid, capsnap->gid, capsnap->mode, 1353 capsnap->uid, capsnap->gid, capsnap->mode,
1338 capsnap->xattr_version, capsnap->xattr_blob, 1354 capsnap->xattr_version, capsnap->xattr_blob,
1339 capsnap->follows); 1355 capsnap->follows, capsnap->inline_data);
1340 1356
1341 next_follows = capsnap->follows + 1; 1357 next_follows = capsnap->follows + 1;
1342 ceph_put_cap_snap(capsnap); 1358 ceph_put_cap_snap(capsnap);
@@ -2057,15 +2073,17 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
2057 * requested from the MDS. 2073 * requested from the MDS.
2058 */ 2074 */
2059static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, 2075static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2060 int *got, loff_t endoff, int *check_max, int *err) 2076 loff_t endoff, int *got, struct page **pinned_page,
2077 int *check_max, int *err)
2061{ 2078{
2062 struct inode *inode = &ci->vfs_inode; 2079 struct inode *inode = &ci->vfs_inode;
2063 int ret = 0; 2080 int ret = 0;
2064 int have, implemented; 2081 int have, implemented, _got = 0;
2065 int file_wanted; 2082 int file_wanted;
2066 2083
2067 dout("get_cap_refs %p need %s want %s\n", inode, 2084 dout("get_cap_refs %p need %s want %s\n", inode,
2068 ceph_cap_string(need), ceph_cap_string(want)); 2085 ceph_cap_string(need), ceph_cap_string(want));
2086again:
2069 spin_lock(&ci->i_ceph_lock); 2087 spin_lock(&ci->i_ceph_lock);
2070 2088
2071 /* make sure file is actually open */ 2089 /* make sure file is actually open */
@@ -2075,7 +2093,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2075 ceph_cap_string(need), ceph_cap_string(file_wanted)); 2093 ceph_cap_string(need), ceph_cap_string(file_wanted));
2076 *err = -EBADF; 2094 *err = -EBADF;
2077 ret = 1; 2095 ret = 1;
2078 goto out; 2096 goto out_unlock;
2079 } 2097 }
2080 2098
2081 /* finish pending truncate */ 2099 /* finish pending truncate */
@@ -2095,7 +2113,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2095 *check_max = 1; 2113 *check_max = 1;
2096 ret = 1; 2114 ret = 1;
2097 } 2115 }
2098 goto out; 2116 goto out_unlock;
2099 } 2117 }
2100 /* 2118 /*
2101 * If a sync write is in progress, we must wait, so that we 2119 * If a sync write is in progress, we must wait, so that we
@@ -2103,7 +2121,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2103 */ 2121 */
2104 if (__ceph_have_pending_cap_snap(ci)) { 2122 if (__ceph_have_pending_cap_snap(ci)) {
2105 dout("get_cap_refs %p cap_snap_pending\n", inode); 2123 dout("get_cap_refs %p cap_snap_pending\n", inode);
2106 goto out; 2124 goto out_unlock;
2107 } 2125 }
2108 } 2126 }
2109 2127
@@ -2120,18 +2138,50 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2120 inode, ceph_cap_string(have), ceph_cap_string(not), 2138 inode, ceph_cap_string(have), ceph_cap_string(not),
2121 ceph_cap_string(revoking)); 2139 ceph_cap_string(revoking));
2122 if ((revoking & not) == 0) { 2140 if ((revoking & not) == 0) {
2123 *got = need | (have & want); 2141 _got = need | (have & want);
2124 __take_cap_refs(ci, *got); 2142 __take_cap_refs(ci, _got);
2125 ret = 1; 2143 ret = 1;
2126 } 2144 }
2127 } else { 2145 } else {
2128 dout("get_cap_refs %p have %s needed %s\n", inode, 2146 dout("get_cap_refs %p have %s needed %s\n", inode,
2129 ceph_cap_string(have), ceph_cap_string(need)); 2147 ceph_cap_string(have), ceph_cap_string(need));
2130 } 2148 }
2131out: 2149out_unlock:
2132 spin_unlock(&ci->i_ceph_lock); 2150 spin_unlock(&ci->i_ceph_lock);
2151
2152 if (ci->i_inline_version != CEPH_INLINE_NONE &&
2153 (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
2154 i_size_read(inode) > 0) {
2155 int ret1;
2156 struct page *page = find_get_page(inode->i_mapping, 0);
2157 if (page) {
2158 if (PageUptodate(page)) {
2159 *pinned_page = page;
2160 goto out;
2161 }
2162 page_cache_release(page);
2163 }
2164 /*
2165 * drop cap refs first because getattr while holding
2166 * caps refs can cause deadlock.
2167 */
2168 ceph_put_cap_refs(ci, _got);
2169 _got = 0;
2170
2171 /* getattr request will bring inline data into page cache */
2172 ret1 = __ceph_do_getattr(inode, NULL,
2173 CEPH_STAT_CAP_INLINE_DATA, true);
2174 if (ret1 >= 0) {
2175 ret = 0;
2176 goto again;
2177 }
2178 *err = ret1;
2179 ret = 1;
2180 }
2181out:
2133 dout("get_cap_refs %p ret %d got %s\n", inode, 2182 dout("get_cap_refs %p ret %d got %s\n", inode,
2134 ret, ceph_cap_string(*got)); 2183 ret, ceph_cap_string(_got));
2184 *got = _got;
2135 return ret; 2185 return ret;
2136} 2186}
2137 2187
@@ -2168,8 +2218,8 @@ static void check_max_size(struct inode *inode, loff_t endoff)
2168 * due to a small max_size, make sure we check_max_size (and possibly 2218 * due to a small max_size, make sure we check_max_size (and possibly
2169 * ask the mds) so we don't get hung up indefinitely. 2219 * ask the mds) so we don't get hung up indefinitely.
2170 */ 2220 */
2171int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got, 2221int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
2172 loff_t endoff) 2222 loff_t endoff, int *got, struct page **pinned_page)
2173{ 2223{
2174 int check_max, ret, err; 2224 int check_max, ret, err;
2175 2225
@@ -2179,8 +2229,8 @@ retry:
2179 check_max = 0; 2229 check_max = 0;
2180 err = 0; 2230 err = 0;
2181 ret = wait_event_interruptible(ci->i_cap_wq, 2231 ret = wait_event_interruptible(ci->i_cap_wq,
2182 try_get_cap_refs(ci, need, want, 2232 try_get_cap_refs(ci, need, want, endoff,
2183 got, endoff, 2233 got, pinned_page,
2184 &check_max, &err)); 2234 &check_max, &err));
2185 if (err) 2235 if (err)
2186 ret = err; 2236 ret = err;
@@ -2383,6 +2433,8 @@ static void invalidate_aliases(struct inode *inode)
2383static void handle_cap_grant(struct ceph_mds_client *mdsc, 2433static void handle_cap_grant(struct ceph_mds_client *mdsc,
2384 struct inode *inode, struct ceph_mds_caps *grant, 2434 struct inode *inode, struct ceph_mds_caps *grant,
2385 void *snaptrace, int snaptrace_len, 2435 void *snaptrace, int snaptrace_len,
2436 u64 inline_version,
2437 void *inline_data, int inline_len,
2386 struct ceph_buffer *xattr_buf, 2438 struct ceph_buffer *xattr_buf,
2387 struct ceph_mds_session *session, 2439 struct ceph_mds_session *session,
2388 struct ceph_cap *cap, int issued) 2440 struct ceph_cap *cap, int issued)
@@ -2403,6 +2455,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
2403 bool queue_invalidate = false; 2455 bool queue_invalidate = false;
2404 bool queue_revalidate = false; 2456 bool queue_revalidate = false;
2405 bool deleted_inode = false; 2457 bool deleted_inode = false;
2458 bool fill_inline = false;
2406 2459
2407 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", 2460 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
2408 inode, cap, mds, seq, ceph_cap_string(newcaps)); 2461 inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2576,6 +2629,13 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
2576 } 2629 }
2577 BUG_ON(cap->issued & ~cap->implemented); 2630 BUG_ON(cap->issued & ~cap->implemented);
2578 2631
2632 if (inline_version > 0 && inline_version >= ci->i_inline_version) {
2633 ci->i_inline_version = inline_version;
2634 if (ci->i_inline_version != CEPH_INLINE_NONE &&
2635 (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
2636 fill_inline = true;
2637 }
2638
2579 spin_unlock(&ci->i_ceph_lock); 2639 spin_unlock(&ci->i_ceph_lock);
2580 2640
2581 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { 2641 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
@@ -2589,6 +2649,9 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
2589 wake = true; 2649 wake = true;
2590 } 2650 }
2591 2651
2652 if (fill_inline)
2653 ceph_fill_inline_data(inode, NULL, inline_data, inline_len);
2654
2592 if (queue_trunc) { 2655 if (queue_trunc) {
2593 ceph_queue_vmtruncate(inode); 2656 ceph_queue_vmtruncate(inode);
2594 ceph_queue_revalidate(inode); 2657 ceph_queue_revalidate(inode);
@@ -2996,11 +3059,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2996 u64 cap_id; 3059 u64 cap_id;
2997 u64 size, max_size; 3060 u64 size, max_size;
2998 u64 tid; 3061 u64 tid;
3062 u64 inline_version = 0;
3063 void *inline_data = NULL;
3064 u32 inline_len = 0;
2999 void *snaptrace; 3065 void *snaptrace;
3000 size_t snaptrace_len; 3066 size_t snaptrace_len;
3001 void *flock; 3067 void *p, *end;
3002 void *end;
3003 u32 flock_len;
3004 3068
3005 dout("handle_caps from mds%d\n", mds); 3069 dout("handle_caps from mds%d\n", mds);
3006 3070
@@ -3021,30 +3085,37 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3021 3085
3022 snaptrace = h + 1; 3086 snaptrace = h + 1;
3023 snaptrace_len = le32_to_cpu(h->snap_trace_len); 3087 snaptrace_len = le32_to_cpu(h->snap_trace_len);
3088 p = snaptrace + snaptrace_len;
3024 3089
3025 if (le16_to_cpu(msg->hdr.version) >= 2) { 3090 if (le16_to_cpu(msg->hdr.version) >= 2) {
3026 void *p = snaptrace + snaptrace_len; 3091 u32 flock_len;
3027 ceph_decode_32_safe(&p, end, flock_len, bad); 3092 ceph_decode_32_safe(&p, end, flock_len, bad);
3028 if (p + flock_len > end) 3093 if (p + flock_len > end)
3029 goto bad; 3094 goto bad;
3030 flock = p; 3095 p += flock_len;
3031 } else {
3032 flock = NULL;
3033 flock_len = 0;
3034 } 3096 }
3035 3097
3036 if (le16_to_cpu(msg->hdr.version) >= 3) { 3098 if (le16_to_cpu(msg->hdr.version) >= 3) {
3037 if (op == CEPH_CAP_OP_IMPORT) { 3099 if (op == CEPH_CAP_OP_IMPORT) {
3038 void *p = flock + flock_len;
3039 if (p + sizeof(*peer) > end) 3100 if (p + sizeof(*peer) > end)
3040 goto bad; 3101 goto bad;
3041 peer = p; 3102 peer = p;
3103 p += sizeof(*peer);
3042 } else if (op == CEPH_CAP_OP_EXPORT) { 3104 } else if (op == CEPH_CAP_OP_EXPORT) {
3043 /* recorded in unused fields */ 3105 /* recorded in unused fields */
3044 peer = (void *)&h->size; 3106 peer = (void *)&h->size;
3045 } 3107 }
3046 } 3108 }
3047 3109
3110 if (le16_to_cpu(msg->hdr.version) >= 4) {
3111 ceph_decode_64_safe(&p, end, inline_version, bad);
3112 ceph_decode_32_safe(&p, end, inline_len, bad);
3113 if (p + inline_len > end)
3114 goto bad;
3115 inline_data = p;
3116 p += inline_len;
3117 }
3118
3048 /* lookup ino */ 3119 /* lookup ino */
3049 inode = ceph_find_inode(sb, vino); 3120 inode = ceph_find_inode(sb, vino);
3050 ci = ceph_inode(inode); 3121 ci = ceph_inode(inode);
@@ -3085,6 +3156,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3085 handle_cap_import(mdsc, inode, h, peer, session, 3156 handle_cap_import(mdsc, inode, h, peer, session,
3086 &cap, &issued); 3157 &cap, &issued);
3087 handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len, 3158 handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len,
3159 inline_version, inline_data, inline_len,
3088 msg->middle, session, cap, issued); 3160 msg->middle, session, cap, issued);
3089 goto done_unlocked; 3161 goto done_unlocked;
3090 } 3162 }
@@ -3105,8 +3177,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3105 case CEPH_CAP_OP_GRANT: 3177 case CEPH_CAP_OP_GRANT:
3106 __ceph_caps_issued(ci, &issued); 3178 __ceph_caps_issued(ci, &issued);
3107 issued |= __ceph_caps_dirty(ci); 3179 issued |= __ceph_caps_dirty(ci);
3108 handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle, 3180 handle_cap_grant(mdsc, inode, h, NULL, 0,
3109 session, cap, issued); 3181 inline_version, inline_data, inline_len,
3182 msg->middle, session, cap, issued);
3110 goto done_unlocked; 3183 goto done_unlocked;
3111 3184
3112 case CEPH_CAP_OP_FLUSH_ACK: 3185 case CEPH_CAP_OP_FLUSH_ACK:
@@ -3137,8 +3210,7 @@ flush_cap_releases:
3137done: 3210done:
3138 mutex_unlock(&session->s_mutex); 3211 mutex_unlock(&session->s_mutex);
3139done_unlocked: 3212done_unlocked:
3140 if (inode) 3213 iput(inode);
3141 iput(inode);
3142 return; 3214 return;
3143 3215
3144bad: 3216bad:
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 681a8537b64f..c241603764fd 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -183,7 +183,7 @@ more:
183 spin_unlock(&parent->d_lock); 183 spin_unlock(&parent->d_lock);
184 184
185 /* make sure a dentry wasn't dropped while we didn't have parent lock */ 185 /* make sure a dentry wasn't dropped while we didn't have parent lock */
186 if (!ceph_dir_is_complete(dir)) { 186 if (!ceph_dir_is_complete_ordered(dir)) {
187 dout(" lost dir complete on %p; falling back to mds\n", dir); 187 dout(" lost dir complete on %p; falling back to mds\n", dir);
188 dput(dentry); 188 dput(dentry);
189 err = -EAGAIN; 189 err = -EAGAIN;
@@ -261,10 +261,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
261 261
262 /* always start with . and .. */ 262 /* always start with . and .. */
263 if (ctx->pos == 0) { 263 if (ctx->pos == 0) {
264 /* note dir version at start of readdir so we can tell
265 * if any dentries get dropped */
266 fi->dir_release_count = atomic_read(&ci->i_release_count);
267
268 dout("readdir off 0 -> '.'\n"); 264 dout("readdir off 0 -> '.'\n");
269 if (!dir_emit(ctx, ".", 1, 265 if (!dir_emit(ctx, ".", 1,
270 ceph_translate_ino(inode->i_sb, inode->i_ino), 266 ceph_translate_ino(inode->i_sb, inode->i_ino),
@@ -289,7 +285,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
289 if ((ctx->pos == 2 || fi->dentry) && 285 if ((ctx->pos == 2 || fi->dentry) &&
290 !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && 286 !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
291 ceph_snap(inode) != CEPH_SNAPDIR && 287 ceph_snap(inode) != CEPH_SNAPDIR &&
292 __ceph_dir_is_complete(ci) && 288 __ceph_dir_is_complete_ordered(ci) &&
293 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { 289 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
294 u32 shared_gen = ci->i_shared_gen; 290 u32 shared_gen = ci->i_shared_gen;
295 spin_unlock(&ci->i_ceph_lock); 291 spin_unlock(&ci->i_ceph_lock);
@@ -312,6 +308,13 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
312 308
313 /* proceed with a normal readdir */ 309 /* proceed with a normal readdir */
314 310
311 if (ctx->pos == 2) {
312 /* note dir version at start of readdir so we can tell
313 * if any dentries get dropped */
314 fi->dir_release_count = atomic_read(&ci->i_release_count);
315 fi->dir_ordered_count = ci->i_ordered_count;
316 }
317
315more: 318more:
316 /* do we have the correct frag content buffered? */ 319 /* do we have the correct frag content buffered? */
317 if (fi->frag != frag || fi->last_readdir == NULL) { 320 if (fi->frag != frag || fi->last_readdir == NULL) {
@@ -446,8 +449,12 @@ more:
446 */ 449 */
447 spin_lock(&ci->i_ceph_lock); 450 spin_lock(&ci->i_ceph_lock);
448 if (atomic_read(&ci->i_release_count) == fi->dir_release_count) { 451 if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
449 dout(" marking %p complete\n", inode); 452 if (ci->i_ordered_count == fi->dir_ordered_count)
450 __ceph_dir_set_complete(ci, fi->dir_release_count); 453 dout(" marking %p complete and ordered\n", inode);
454 else
455 dout(" marking %p complete\n", inode);
456 __ceph_dir_set_complete(ci, fi->dir_release_count,
457 fi->dir_ordered_count);
451 } 458 }
452 spin_unlock(&ci->i_ceph_lock); 459 spin_unlock(&ci->i_ceph_lock);
453 460
@@ -805,7 +812,9 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
805 acls.pagelist = NULL; 812 acls.pagelist = NULL;
806 } 813 }
807 err = ceph_mdsc_do_request(mdsc, dir, req); 814 err = ceph_mdsc_do_request(mdsc, dir, req);
808 if (!err && !req->r_reply_info.head->is_dentry) 815 if (!err &&
816 !req->r_reply_info.head->is_target &&
817 !req->r_reply_info.head->is_dentry)
809 err = ceph_handle_notrace_create(dir, dentry); 818 err = ceph_handle_notrace_create(dir, dentry);
810 ceph_mdsc_put_request(req); 819 ceph_mdsc_put_request(req);
811out: 820out:
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 9f8e3572040e..ce74b394b49d 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -333,6 +333,11 @@ int ceph_release(struct inode *inode, struct file *file)
333 return 0; 333 return 0;
334} 334}
335 335
336enum {
337 CHECK_EOF = 1,
338 READ_INLINE = 2,
339};
340
336/* 341/*
337 * Read a range of bytes striped over one or more objects. Iterate over 342 * Read a range of bytes striped over one or more objects. Iterate over
338 * objects we stripe over. (That's not atomic, but good enough for now.) 343 * objects we stripe over. (That's not atomic, but good enough for now.)
@@ -412,7 +417,7 @@ more:
412 ret = read; 417 ret = read;
413 /* did we bounce off eof? */ 418 /* did we bounce off eof? */
414 if (pos + left > inode->i_size) 419 if (pos + left > inode->i_size)
415 *checkeof = 1; 420 *checkeof = CHECK_EOF;
416 } 421 }
417 422
418 dout("striped_read returns %d\n", ret); 423 dout("striped_read returns %d\n", ret);
@@ -598,7 +603,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
598 snapc = ci->i_snap_realm->cached_context; 603 snapc = ci->i_snap_realm->cached_context;
599 vino = ceph_vino(inode); 604 vino = ceph_vino(inode);
600 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 605 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
601 vino, pos, &len, 606 vino, pos, &len, 0,
602 2,/*include a 'startsync' command*/ 607 2,/*include a 'startsync' command*/
603 CEPH_OSD_OP_WRITE, flags, snapc, 608 CEPH_OSD_OP_WRITE, flags, snapc,
604 ci->i_truncate_seq, 609 ci->i_truncate_seq,
@@ -609,6 +614,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
609 break; 614 break;
610 } 615 }
611 616
617 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
618
612 n = iov_iter_get_pages_alloc(from, &pages, len, &start); 619 n = iov_iter_get_pages_alloc(from, &pages, len, &start);
613 if (unlikely(n < 0)) { 620 if (unlikely(n < 0)) {
614 ret = n; 621 ret = n;
@@ -713,7 +720,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
713 snapc = ci->i_snap_realm->cached_context; 720 snapc = ci->i_snap_realm->cached_context;
714 vino = ceph_vino(inode); 721 vino = ceph_vino(inode);
715 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 722 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
716 vino, pos, &len, 1, 723 vino, pos, &len, 0, 1,
717 CEPH_OSD_OP_WRITE, flags, snapc, 724 CEPH_OSD_OP_WRITE, flags, snapc,
718 ci->i_truncate_seq, 725 ci->i_truncate_seq,
719 ci->i_truncate_size, 726 ci->i_truncate_size,
@@ -803,9 +810,10 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
803 size_t len = iocb->ki_nbytes; 810 size_t len = iocb->ki_nbytes;
804 struct inode *inode = file_inode(filp); 811 struct inode *inode = file_inode(filp);
805 struct ceph_inode_info *ci = ceph_inode(inode); 812 struct ceph_inode_info *ci = ceph_inode(inode);
813 struct page *pinned_page = NULL;
806 ssize_t ret; 814 ssize_t ret;
807 int want, got = 0; 815 int want, got = 0;
808 int checkeof = 0, read = 0; 816 int retry_op = 0, read = 0;
809 817
810again: 818again:
811 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", 819 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
@@ -815,7 +823,7 @@ again:
815 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; 823 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
816 else 824 else
817 want = CEPH_CAP_FILE_CACHE; 825 want = CEPH_CAP_FILE_CACHE;
818 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); 826 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
819 if (ret < 0) 827 if (ret < 0)
820 return ret; 828 return ret;
821 829
@@ -827,8 +835,12 @@ again:
827 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, 835 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
828 ceph_cap_string(got)); 836 ceph_cap_string(got));
829 837
830 /* hmm, this isn't really async... */ 838 if (ci->i_inline_version == CEPH_INLINE_NONE) {
831 ret = ceph_sync_read(iocb, to, &checkeof); 839 /* hmm, this isn't really async... */
840 ret = ceph_sync_read(iocb, to, &retry_op);
841 } else {
842 retry_op = READ_INLINE;
843 }
832 } else { 844 } else {
833 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", 845 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
834 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, 846 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
@@ -838,13 +850,55 @@ again:
838 } 850 }
839 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", 851 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
840 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); 852 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
853 if (pinned_page) {
854 page_cache_release(pinned_page);
855 pinned_page = NULL;
856 }
841 ceph_put_cap_refs(ci, got); 857 ceph_put_cap_refs(ci, got);
858 if (retry_op && ret >= 0) {
859 int statret;
860 struct page *page = NULL;
861 loff_t i_size;
862 if (retry_op == READ_INLINE) {
863 page = __page_cache_alloc(GFP_NOFS);
864 if (!page)
865 return -ENOMEM;
866 }
842 867
843 if (checkeof && ret >= 0) { 868 statret = __ceph_do_getattr(inode, page,
844 int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false); 869 CEPH_STAT_CAP_INLINE_DATA, !!page);
870 if (statret < 0) {
871 __free_page(page);
872 if (statret == -ENODATA) {
873 BUG_ON(retry_op != READ_INLINE);
874 goto again;
875 }
876 return statret;
877 }
878
879 i_size = i_size_read(inode);
880 if (retry_op == READ_INLINE) {
881 /* does not support inline data > PAGE_SIZE */
882 if (i_size > PAGE_CACHE_SIZE) {
883 ret = -EIO;
884 } else if (iocb->ki_pos < i_size) {
885 loff_t end = min_t(loff_t, i_size,
886 iocb->ki_pos + len);
887 if (statret < end)
888 zero_user_segment(page, statret, end);
889 ret = copy_page_to_iter(page,
890 iocb->ki_pos & ~PAGE_MASK,
891 end - iocb->ki_pos, to);
892 iocb->ki_pos += ret;
893 } else {
894 ret = 0;
895 }
896 __free_pages(page, 0);
897 return ret;
898 }
845 899
846 /* hit EOF or hole? */ 900 /* hit EOF or hole? */
847 if (statret == 0 && iocb->ki_pos < inode->i_size && 901 if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
848 ret < len) { 902 ret < len) {
849 dout("sync_read hit hole, ppos %lld < size %lld" 903 dout("sync_read hit hole, ppos %lld < size %lld"
850 ", reading more\n", iocb->ki_pos, 904 ", reading more\n", iocb->ki_pos,
@@ -852,7 +906,7 @@ again:
852 906
853 read += ret; 907 read += ret;
854 len -= ret; 908 len -= ret;
855 checkeof = 0; 909 retry_op = 0;
856 goto again; 910 goto again;
857 } 911 }
858 } 912 }
@@ -909,6 +963,12 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
909 if (err) 963 if (err)
910 goto out; 964 goto out;
911 965
966 if (ci->i_inline_version != CEPH_INLINE_NONE) {
967 err = ceph_uninline_data(file, NULL);
968 if (err < 0)
969 goto out;
970 }
971
912retry_snap: 972retry_snap:
913 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) { 973 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) {
914 err = -ENOSPC; 974 err = -ENOSPC;
@@ -922,7 +982,8 @@ retry_snap:
922 else 982 else
923 want = CEPH_CAP_FILE_BUFFER; 983 want = CEPH_CAP_FILE_BUFFER;
924 got = 0; 984 got = 0;
925 err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos + count); 985 err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
986 &got, NULL);
926 if (err < 0) 987 if (err < 0)
927 goto out; 988 goto out;
928 989
@@ -969,6 +1030,7 @@ retry_snap:
969 if (written >= 0) { 1030 if (written >= 0) {
970 int dirty; 1031 int dirty;
971 spin_lock(&ci->i_ceph_lock); 1032 spin_lock(&ci->i_ceph_lock);
1033 ci->i_inline_version = CEPH_INLINE_NONE;
972 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); 1034 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
973 spin_unlock(&ci->i_ceph_lock); 1035 spin_unlock(&ci->i_ceph_lock);
974 if (dirty) 1036 if (dirty)
@@ -1111,7 +1173,7 @@ static int ceph_zero_partial_object(struct inode *inode,
1111 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 1173 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1112 ceph_vino(inode), 1174 ceph_vino(inode),
1113 offset, length, 1175 offset, length,
1114 1, op, 1176 0, 1, op,
1115 CEPH_OSD_FLAG_WRITE | 1177 CEPH_OSD_FLAG_WRITE |
1116 CEPH_OSD_FLAG_ONDISK, 1178 CEPH_OSD_FLAG_ONDISK,
1117 NULL, 0, 0, false); 1179 NULL, 0, 0, false);
@@ -1214,6 +1276,12 @@ static long ceph_fallocate(struct file *file, int mode,
1214 goto unlock; 1276 goto unlock;
1215 } 1277 }
1216 1278
1279 if (ci->i_inline_version != CEPH_INLINE_NONE) {
1280 ret = ceph_uninline_data(file, NULL);
1281 if (ret < 0)
1282 goto unlock;
1283 }
1284
1217 size = i_size_read(inode); 1285 size = i_size_read(inode);
1218 if (!(mode & FALLOC_FL_KEEP_SIZE)) 1286 if (!(mode & FALLOC_FL_KEEP_SIZE))
1219 endoff = offset + length; 1287 endoff = offset + length;
@@ -1223,7 +1291,7 @@ static long ceph_fallocate(struct file *file, int mode,
1223 else 1291 else
1224 want = CEPH_CAP_FILE_BUFFER; 1292 want = CEPH_CAP_FILE_BUFFER;
1225 1293
1226 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff); 1294 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
1227 if (ret < 0) 1295 if (ret < 0)
1228 goto unlock; 1296 goto unlock;
1229 1297
@@ -1240,6 +1308,7 @@ static long ceph_fallocate(struct file *file, int mode,
1240 1308
1241 if (!ret) { 1309 if (!ret) {
1242 spin_lock(&ci->i_ceph_lock); 1310 spin_lock(&ci->i_ceph_lock);
1311 ci->i_inline_version = CEPH_INLINE_NONE;
1243 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); 1312 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
1244 spin_unlock(&ci->i_ceph_lock); 1313 spin_unlock(&ci->i_ceph_lock);
1245 if (dirty) 1314 if (dirty)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index a5593d51d035..f61a74115beb 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -387,8 +387,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
387 spin_lock_init(&ci->i_ceph_lock); 387 spin_lock_init(&ci->i_ceph_lock);
388 388
389 ci->i_version = 0; 389 ci->i_version = 0;
390 ci->i_inline_version = 0;
390 ci->i_time_warp_seq = 0; 391 ci->i_time_warp_seq = 0;
391 ci->i_ceph_flags = 0; 392 ci->i_ceph_flags = 0;
393 ci->i_ordered_count = 0;
392 atomic_set(&ci->i_release_count, 1); 394 atomic_set(&ci->i_release_count, 1);
393 atomic_set(&ci->i_complete_count, 0); 395 atomic_set(&ci->i_complete_count, 0);
394 ci->i_symlink = NULL; 396 ci->i_symlink = NULL;
@@ -657,7 +659,7 @@ void ceph_fill_file_time(struct inode *inode, int issued,
657 * Populate an inode based on info from mds. May be called on new or 659 * Populate an inode based on info from mds. May be called on new or
658 * existing inodes. 660 * existing inodes.
659 */ 661 */
660static int fill_inode(struct inode *inode, 662static int fill_inode(struct inode *inode, struct page *locked_page,
661 struct ceph_mds_reply_info_in *iinfo, 663 struct ceph_mds_reply_info_in *iinfo,
662 struct ceph_mds_reply_dirfrag *dirinfo, 664 struct ceph_mds_reply_dirfrag *dirinfo,
663 struct ceph_mds_session *session, 665 struct ceph_mds_session *session,
@@ -675,6 +677,7 @@ static int fill_inode(struct inode *inode,
675 bool wake = false; 677 bool wake = false;
676 bool queue_trunc = false; 678 bool queue_trunc = false;
677 bool new_version = false; 679 bool new_version = false;
680 bool fill_inline = false;
678 681
679 dout("fill_inode %p ino %llx.%llx v %llu had %llu\n", 682 dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
680 inode, ceph_vinop(inode), le64_to_cpu(info->version), 683 inode, ceph_vinop(inode), le64_to_cpu(info->version),
@@ -845,7 +848,8 @@ static int fill_inode(struct inode *inode,
845 (issued & CEPH_CAP_FILE_EXCL) == 0 && 848 (issued & CEPH_CAP_FILE_EXCL) == 0 &&
846 !__ceph_dir_is_complete(ci)) { 849 !__ceph_dir_is_complete(ci)) {
847 dout(" marking %p complete (empty)\n", inode); 850 dout(" marking %p complete (empty)\n", inode);
848 __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count)); 851 __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
852 ci->i_ordered_count);
849 } 853 }
850 854
851 /* were we issued a capability? */ 855 /* were we issued a capability? */
@@ -873,8 +877,23 @@ static int fill_inode(struct inode *inode,
873 ceph_vinop(inode)); 877 ceph_vinop(inode));
874 __ceph_get_fmode(ci, cap_fmode); 878 __ceph_get_fmode(ci, cap_fmode);
875 } 879 }
880
881 if (iinfo->inline_version > 0 &&
882 iinfo->inline_version >= ci->i_inline_version) {
883 int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
884 ci->i_inline_version = iinfo->inline_version;
885 if (ci->i_inline_version != CEPH_INLINE_NONE &&
886 (locked_page ||
887 (le32_to_cpu(info->cap.caps) & cache_caps)))
888 fill_inline = true;
889 }
890
876 spin_unlock(&ci->i_ceph_lock); 891 spin_unlock(&ci->i_ceph_lock);
877 892
893 if (fill_inline)
894 ceph_fill_inline_data(inode, locked_page,
895 iinfo->inline_data, iinfo->inline_len);
896
878 if (wake) 897 if (wake)
879 wake_up_all(&ci->i_cap_wq); 898 wake_up_all(&ci->i_cap_wq);
880 899
@@ -1062,7 +1081,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
1062 struct inode *dir = req->r_locked_dir; 1081 struct inode *dir = req->r_locked_dir;
1063 1082
1064 if (dir) { 1083 if (dir) {
1065 err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag, 1084 err = fill_inode(dir, NULL,
1085 &rinfo->diri, rinfo->dirfrag,
1066 session, req->r_request_started, -1, 1086 session, req->r_request_started, -1,
1067 &req->r_caps_reservation); 1087 &req->r_caps_reservation);
1068 if (err < 0) 1088 if (err < 0)
@@ -1132,7 +1152,7 @@ retry_lookup:
1132 } 1152 }
1133 req->r_target_inode = in; 1153 req->r_target_inode = in;
1134 1154
1135 err = fill_inode(in, &rinfo->targeti, NULL, 1155 err = fill_inode(in, req->r_locked_page, &rinfo->targeti, NULL,
1136 session, req->r_request_started, 1156 session, req->r_request_started,
1137 (!req->r_aborted && rinfo->head->result == 0) ? 1157 (!req->r_aborted && rinfo->head->result == 0) ?
1138 req->r_fmode : -1, 1158 req->r_fmode : -1,
@@ -1204,8 +1224,8 @@ retry_lookup:
1204 ceph_invalidate_dentry_lease(dn); 1224 ceph_invalidate_dentry_lease(dn);
1205 1225
1206 /* d_move screws up sibling dentries' offsets */ 1226 /* d_move screws up sibling dentries' offsets */
1207 ceph_dir_clear_complete(dir); 1227 ceph_dir_clear_ordered(dir);
1208 ceph_dir_clear_complete(olddir); 1228 ceph_dir_clear_ordered(olddir);
1209 1229
1210 dout("dn %p gets new offset %lld\n", req->r_old_dentry, 1230 dout("dn %p gets new offset %lld\n", req->r_old_dentry,
1211 ceph_dentry(req->r_old_dentry)->offset); 1231 ceph_dentry(req->r_old_dentry)->offset);
@@ -1217,6 +1237,7 @@ retry_lookup:
1217 if (!rinfo->head->is_target) { 1237 if (!rinfo->head->is_target) {
1218 dout("fill_trace null dentry\n"); 1238 dout("fill_trace null dentry\n");
1219 if (dn->d_inode) { 1239 if (dn->d_inode) {
1240 ceph_dir_clear_ordered(dir);
1220 dout("d_delete %p\n", dn); 1241 dout("d_delete %p\n", dn);
1221 d_delete(dn); 1242 d_delete(dn);
1222 } else { 1243 } else {
@@ -1233,7 +1254,7 @@ retry_lookup:
1233 1254
1234 /* attach proper inode */ 1255 /* attach proper inode */
1235 if (!dn->d_inode) { 1256 if (!dn->d_inode) {
1236 ceph_dir_clear_complete(dir); 1257 ceph_dir_clear_ordered(dir);
1237 ihold(in); 1258 ihold(in);
1238 dn = splice_dentry(dn, in, &have_lease); 1259 dn = splice_dentry(dn, in, &have_lease);
1239 if (IS_ERR(dn)) { 1260 if (IS_ERR(dn)) {
@@ -1263,7 +1284,7 @@ retry_lookup:
1263 BUG_ON(!dir); 1284 BUG_ON(!dir);
1264 BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR); 1285 BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
1265 dout(" linking snapped dir %p to dn %p\n", in, dn); 1286 dout(" linking snapped dir %p to dn %p\n", in, dn);
1266 ceph_dir_clear_complete(dir); 1287 ceph_dir_clear_ordered(dir);
1267 ihold(in); 1288 ihold(in);
1268 dn = splice_dentry(dn, in, NULL); 1289 dn = splice_dentry(dn, in, NULL);
1269 if (IS_ERR(dn)) { 1290 if (IS_ERR(dn)) {
@@ -1300,7 +1321,7 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
1300 dout("new_inode badness got %d\n", err); 1321 dout("new_inode badness got %d\n", err);
1301 continue; 1322 continue;
1302 } 1323 }
1303 rc = fill_inode(in, &rinfo->dir_in[i], NULL, session, 1324 rc = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
1304 req->r_request_started, -1, 1325 req->r_request_started, -1,
1305 &req->r_caps_reservation); 1326 &req->r_caps_reservation);
1306 if (rc < 0) { 1327 if (rc < 0) {
@@ -1416,7 +1437,7 @@ retry_lookup:
1416 } 1437 }
1417 } 1438 }
1418 1439
1419 if (fill_inode(in, &rinfo->dir_in[i], NULL, session, 1440 if (fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
1420 req->r_request_started, -1, 1441 req->r_request_started, -1,
1421 &req->r_caps_reservation) < 0) { 1442 &req->r_caps_reservation) < 0) {
1422 pr_err("fill_inode badness on %p\n", in); 1443 pr_err("fill_inode badness on %p\n", in);
@@ -1899,7 +1920,8 @@ out_put:
1899 * Verify that we have a lease on the given mask. If not, 1920 * Verify that we have a lease on the given mask. If not,
1900 * do a getattr against an mds. 1921 * do a getattr against an mds.
1901 */ 1922 */
1902int ceph_do_getattr(struct inode *inode, int mask, bool force) 1923int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
1924 int mask, bool force)
1903{ 1925{
1904 struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); 1926 struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
1905 struct ceph_mds_client *mdsc = fsc->mdsc; 1927 struct ceph_mds_client *mdsc = fsc->mdsc;
@@ -1911,7 +1933,8 @@ int ceph_do_getattr(struct inode *inode, int mask, bool force)
1911 return 0; 1933 return 0;
1912 } 1934 }
1913 1935
1914 dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode); 1936 dout("do_getattr inode %p mask %s mode 0%o\n",
1937 inode, ceph_cap_string(mask), inode->i_mode);
1915 if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1)) 1938 if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
1916 return 0; 1939 return 0;
1917 1940
@@ -1922,7 +1945,19 @@ int ceph_do_getattr(struct inode *inode, int mask, bool force)
1922 ihold(inode); 1945 ihold(inode);
1923 req->r_num_caps = 1; 1946 req->r_num_caps = 1;
1924 req->r_args.getattr.mask = cpu_to_le32(mask); 1947 req->r_args.getattr.mask = cpu_to_le32(mask);
1948 req->r_locked_page = locked_page;
1925 err = ceph_mdsc_do_request(mdsc, NULL, req); 1949 err = ceph_mdsc_do_request(mdsc, NULL, req);
1950 if (locked_page && err == 0) {
1951 u64 inline_version = req->r_reply_info.targeti.inline_version;
1952 if (inline_version == 0) {
1953 /* the reply is supposed to contain inline data */
1954 err = -EINVAL;
1955 } else if (inline_version == CEPH_INLINE_NONE) {
1956 err = -ENODATA;
1957 } else {
1958 err = req->r_reply_info.targeti.inline_len;
1959 }
1960 }
1926 ceph_mdsc_put_request(req); 1961 ceph_mdsc_put_request(req);
1927 dout("do_getattr result=%d\n", err); 1962 dout("do_getattr result=%d\n", err);
1928 return err; 1963 return err;
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index fbc39c47bacd..c35c5c614e38 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -9,6 +9,8 @@
9#include <linux/ceph/pagelist.h> 9#include <linux/ceph/pagelist.h>
10 10
11static u64 lock_secret; 11static u64 lock_secret;
12static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
13 struct ceph_mds_request *req);
12 14
13static inline u64 secure_addr(void *addr) 15static inline u64 secure_addr(void *addr)
14{ 16{
@@ -40,6 +42,9 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
40 u64 length = 0; 42 u64 length = 0;
41 u64 owner; 43 u64 owner;
42 44
45 if (operation != CEPH_MDS_OP_SETFILELOCK || cmd == CEPH_LOCK_UNLOCK)
46 wait = 0;
47
43 req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS); 48 req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS);
44 if (IS_ERR(req)) 49 if (IS_ERR(req))
45 return PTR_ERR(req); 50 return PTR_ERR(req);
@@ -68,6 +73,9 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
68 req->r_args.filelock_change.length = cpu_to_le64(length); 73 req->r_args.filelock_change.length = cpu_to_le64(length);
69 req->r_args.filelock_change.wait = wait; 74 req->r_args.filelock_change.wait = wait;
70 75
76 if (wait)
77 req->r_wait_for_completion = ceph_lock_wait_for_completion;
78
71 err = ceph_mdsc_do_request(mdsc, inode, req); 79 err = ceph_mdsc_do_request(mdsc, inode, req);
72 80
73 if (operation == CEPH_MDS_OP_GETFILELOCK) { 81 if (operation == CEPH_MDS_OP_GETFILELOCK) {
@@ -96,6 +104,52 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
96 return err; 104 return err;
97} 105}
98 106
107static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
108 struct ceph_mds_request *req)
109{
110 struct ceph_mds_request *intr_req;
111 struct inode *inode = req->r_inode;
112 int err, lock_type;
113
114 BUG_ON(req->r_op != CEPH_MDS_OP_SETFILELOCK);
115 if (req->r_args.filelock_change.rule == CEPH_LOCK_FCNTL)
116 lock_type = CEPH_LOCK_FCNTL_INTR;
117 else if (req->r_args.filelock_change.rule == CEPH_LOCK_FLOCK)
118 lock_type = CEPH_LOCK_FLOCK_INTR;
119 else
120 BUG_ON(1);
121 BUG_ON(req->r_args.filelock_change.type == CEPH_LOCK_UNLOCK);
122
123 err = wait_for_completion_interruptible(&req->r_completion);
124 if (!err)
125 return 0;
126
127 dout("ceph_lock_wait_for_completion: request %llu was interrupted\n",
128 req->r_tid);
129
130 intr_req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETFILELOCK,
131 USE_AUTH_MDS);
132 if (IS_ERR(intr_req))
133 return PTR_ERR(intr_req);
134
135 intr_req->r_inode = inode;
136 ihold(inode);
137 intr_req->r_num_caps = 1;
138
139 intr_req->r_args.filelock_change = req->r_args.filelock_change;
140 intr_req->r_args.filelock_change.rule = lock_type;
141 intr_req->r_args.filelock_change.type = CEPH_LOCK_UNLOCK;
142
143 err = ceph_mdsc_do_request(mdsc, inode, intr_req);
144 ceph_mdsc_put_request(intr_req);
145
146 if (err && err != -ERESTARTSYS)
147 return err;
148
149 wait_for_completion(&req->r_completion);
150 return 0;
151}
152
99/** 153/**
100 * Attempt to set an fcntl lock. 154 * Attempt to set an fcntl lock.
101 * For now, this just goes away to the server. Later it may be more awesome. 155 * For now, this just goes away to the server. Later it may be more awesome.
@@ -143,11 +197,6 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
143 err); 197 err);
144 } 198 }
145 } 199 }
146
147 } else if (err == -ERESTARTSYS) {
148 dout("undoing lock\n");
149 ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
150 CEPH_LOCK_UNLOCK, 0, fl);
151 } 200 }
152 return err; 201 return err;
153} 202}
@@ -186,11 +235,6 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
186 file, CEPH_LOCK_UNLOCK, 0, fl); 235 file, CEPH_LOCK_UNLOCK, 0, fl);
187 dout("got %d on flock_lock_file_wait, undid lock", err); 236 dout("got %d on flock_lock_file_wait, undid lock", err);
188 } 237 }
189 } else if (err == -ERESTARTSYS) {
190 dout("undoing lock\n");
191 ceph_lock_message(CEPH_LOCK_FLOCK,
192 CEPH_MDS_OP_SETFILELOCK,
193 file, CEPH_LOCK_UNLOCK, 0, fl);
194 } 238 }
195 return err; 239 return err;
196} 240}
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index a92d3f5c6c12..d2171f4a6980 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -89,6 +89,16 @@ static int parse_reply_info_in(void **p, void *end,
89 ceph_decode_need(p, end, info->xattr_len, bad); 89 ceph_decode_need(p, end, info->xattr_len, bad);
90 info->xattr_data = *p; 90 info->xattr_data = *p;
91 *p += info->xattr_len; 91 *p += info->xattr_len;
92
93 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
94 ceph_decode_64_safe(p, end, info->inline_version, bad);
95 ceph_decode_32_safe(p, end, info->inline_len, bad);
96 ceph_decode_need(p, end, info->inline_len, bad);
97 info->inline_data = *p;
98 *p += info->inline_len;
99 } else
100 info->inline_version = CEPH_INLINE_NONE;
101
92 return 0; 102 return 0;
93bad: 103bad:
94 return err; 104 return err;
@@ -524,8 +534,7 @@ void ceph_mdsc_release_request(struct kref *kref)
524 } 534 }
525 if (req->r_locked_dir) 535 if (req->r_locked_dir)
526 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 536 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
527 if (req->r_target_inode) 537 iput(req->r_target_inode);
528 iput(req->r_target_inode);
529 if (req->r_dentry) 538 if (req->r_dentry)
530 dput(req->r_dentry); 539 dput(req->r_dentry);
531 if (req->r_old_dentry) 540 if (req->r_old_dentry)
@@ -861,8 +870,11 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
861 /* 870 /*
862 * Serialize client metadata into waiting buffer space, using 871 * Serialize client metadata into waiting buffer space, using
863 * the format that userspace expects for map<string, string> 872 * the format that userspace expects for map<string, string>
873 *
874 * ClientSession messages with metadata are v2
864 */ 875 */
865 msg->hdr.version = 2; /* ClientSession messages with metadata are v2 */ 876 msg->hdr.version = cpu_to_le16(2);
877 msg->hdr.compat_version = cpu_to_le16(1);
866 878
867 /* The write pointer, following the session_head structure */ 879 /* The write pointer, following the session_head structure */
868 p = msg->front.iov_base + sizeof(*h); 880 p = msg->front.iov_base + sizeof(*h);
@@ -1066,8 +1078,7 @@ out:
1066 session->s_cap_iterator = NULL; 1078 session->s_cap_iterator = NULL;
1067 spin_unlock(&session->s_cap_lock); 1079 spin_unlock(&session->s_cap_lock);
1068 1080
1069 if (last_inode) 1081 iput(last_inode);
1070 iput(last_inode);
1071 if (old_cap) 1082 if (old_cap)
1072 ceph_put_cap(session->s_mdsc, old_cap); 1083 ceph_put_cap(session->s_mdsc, old_cap);
1073 1084
@@ -1874,7 +1885,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1874 goto out_free2; 1885 goto out_free2;
1875 } 1886 }
1876 1887
1877 msg->hdr.version = 2; 1888 msg->hdr.version = cpu_to_le16(2);
1878 msg->hdr.tid = cpu_to_le64(req->r_tid); 1889 msg->hdr.tid = cpu_to_le64(req->r_tid);
1879 1890
1880 head = msg->front.iov_base; 1891 head = msg->front.iov_base;
@@ -2208,6 +2219,8 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2208 &req->r_completion, req->r_timeout); 2219 &req->r_completion, req->r_timeout);
2209 if (err == 0) 2220 if (err == 0)
2210 err = -EIO; 2221 err = -EIO;
2222 } else if (req->r_wait_for_completion) {
2223 err = req->r_wait_for_completion(mdsc, req);
2211 } else { 2224 } else {
2212 err = wait_for_completion_killable(&req->r_completion); 2225 err = wait_for_completion_killable(&req->r_completion);
2213 } 2226 }
@@ -3744,6 +3757,20 @@ static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
3744 return msg; 3757 return msg;
3745} 3758}
3746 3759
3760static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
3761{
3762 struct ceph_mds_session *s = con->private;
3763 struct ceph_auth_handshake *auth = &s->s_auth;
3764 return ceph_auth_sign_message(auth, msg);
3765}
3766
3767static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
3768{
3769 struct ceph_mds_session *s = con->private;
3770 struct ceph_auth_handshake *auth = &s->s_auth;
3771 return ceph_auth_check_message_signature(auth, msg);
3772}
3773
3747static const struct ceph_connection_operations mds_con_ops = { 3774static const struct ceph_connection_operations mds_con_ops = {
3748 .get = con_get, 3775 .get = con_get,
3749 .put = con_put, 3776 .put = con_put,
@@ -3753,6 +3780,8 @@ static const struct ceph_connection_operations mds_con_ops = {
3753 .invalidate_authorizer = invalidate_authorizer, 3780 .invalidate_authorizer = invalidate_authorizer,
3754 .peer_reset = peer_reset, 3781 .peer_reset = peer_reset,
3755 .alloc_msg = mds_alloc_msg, 3782 .alloc_msg = mds_alloc_msg,
3783 .sign_message = sign_message,
3784 .check_message_signature = check_message_signature,
3756}; 3785};
3757 3786
3758/* eof */ 3787/* eof */
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 3288359353e9..e2817d00f7d9 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -41,6 +41,9 @@ struct ceph_mds_reply_info_in {
41 char *symlink; 41 char *symlink;
42 u32 xattr_len; 42 u32 xattr_len;
43 char *xattr_data; 43 char *xattr_data;
44 u64 inline_version;
45 u32 inline_len;
46 char *inline_data;
44}; 47};
45 48
46/* 49/*
@@ -166,6 +169,11 @@ struct ceph_mds_client;
166 */ 169 */
167typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc, 170typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc,
168 struct ceph_mds_request *req); 171 struct ceph_mds_request *req);
172/*
173 * wait for request completion callback
174 */
175typedef int (*ceph_mds_request_wait_callback_t) (struct ceph_mds_client *mdsc,
176 struct ceph_mds_request *req);
169 177
170/* 178/*
171 * an in-flight mds request 179 * an in-flight mds request
@@ -215,6 +223,7 @@ struct ceph_mds_request {
215 int r_request_release_offset; 223 int r_request_release_offset;
216 struct ceph_msg *r_reply; 224 struct ceph_msg *r_reply;
217 struct ceph_mds_reply_info_parsed r_reply_info; 225 struct ceph_mds_reply_info_parsed r_reply_info;
226 struct page *r_locked_page;
218 int r_err; 227 int r_err;
219 bool r_aborted; 228 bool r_aborted;
220 229
@@ -239,6 +248,7 @@ struct ceph_mds_request {
239 struct completion r_completion; 248 struct completion r_completion;
240 struct completion r_safe_completion; 249 struct completion r_safe_completion;
241 ceph_mds_request_callback_t r_callback; 250 ceph_mds_request_callback_t r_callback;
251 ceph_mds_request_wait_callback_t r_wait_for_completion;
242 struct list_head r_unsafe_item; /* per-session unsafe list item */ 252 struct list_head r_unsafe_item; /* per-session unsafe list item */
243 bool r_got_unsafe, r_got_safe, r_got_result; 253 bool r_got_unsafe, r_got_safe, r_got_result;
244 254
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index f01645a27752..ce35fbd4ba5d 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -288,6 +288,9 @@ static int cmpu64_rev(const void *a, const void *b)
288 return 0; 288 return 0;
289} 289}
290 290
291
292static struct ceph_snap_context *empty_snapc;
293
291/* 294/*
292 * build the snap context for a given realm. 295 * build the snap context for a given realm.
293 */ 296 */
@@ -328,6 +331,12 @@ static int build_snap_context(struct ceph_snap_realm *realm)
328 return 0; 331 return 0;
329 } 332 }
330 333
334 if (num == 0 && realm->seq == empty_snapc->seq) {
335 ceph_get_snap_context(empty_snapc);
336 snapc = empty_snapc;
337 goto done;
338 }
339
331 /* alloc new snap context */ 340 /* alloc new snap context */
332 err = -ENOMEM; 341 err = -ENOMEM;
333 if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64)) 342 if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64))
@@ -365,8 +374,8 @@ static int build_snap_context(struct ceph_snap_realm *realm)
365 realm->ino, realm, snapc, snapc->seq, 374 realm->ino, realm, snapc, snapc->seq,
366 (unsigned int) snapc->num_snaps); 375 (unsigned int) snapc->num_snaps);
367 376
368 if (realm->cached_context) 377done:
369 ceph_put_snap_context(realm->cached_context); 378 ceph_put_snap_context(realm->cached_context);
370 realm->cached_context = snapc; 379 realm->cached_context = snapc;
371 return 0; 380 return 0;
372 381
@@ -466,6 +475,9 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
466 cap_snap. lucky us. */ 475 cap_snap. lucky us. */
467 dout("queue_cap_snap %p already pending\n", inode); 476 dout("queue_cap_snap %p already pending\n", inode);
468 kfree(capsnap); 477 kfree(capsnap);
478 } else if (ci->i_snap_realm->cached_context == empty_snapc) {
479 dout("queue_cap_snap %p empty snapc\n", inode);
480 kfree(capsnap);
469 } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL| 481 } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
470 CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) { 482 CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
471 struct ceph_snap_context *snapc = ci->i_head_snapc; 483 struct ceph_snap_context *snapc = ci->i_head_snapc;
@@ -504,6 +516,8 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
504 capsnap->xattr_version = 0; 516 capsnap->xattr_version = 0;
505 } 517 }
506 518
519 capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
520
507 /* dirty page count moved from _head to this cap_snap; 521 /* dirty page count moved from _head to this cap_snap;
508 all subsequent writes page dirties occur _after_ this 522 all subsequent writes page dirties occur _after_ this
509 snapshot. */ 523 snapshot. */
@@ -590,15 +604,13 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
590 if (!inode) 604 if (!inode)
591 continue; 605 continue;
592 spin_unlock(&realm->inodes_with_caps_lock); 606 spin_unlock(&realm->inodes_with_caps_lock);
593 if (lastinode) 607 iput(lastinode);
594 iput(lastinode);
595 lastinode = inode; 608 lastinode = inode;
596 ceph_queue_cap_snap(ci); 609 ceph_queue_cap_snap(ci);
597 spin_lock(&realm->inodes_with_caps_lock); 610 spin_lock(&realm->inodes_with_caps_lock);
598 } 611 }
599 spin_unlock(&realm->inodes_with_caps_lock); 612 spin_unlock(&realm->inodes_with_caps_lock);
600 if (lastinode) 613 iput(lastinode);
601 iput(lastinode);
602 614
603 list_for_each_entry(child, &realm->children, child_item) { 615 list_for_each_entry(child, &realm->children, child_item) {
604 dout("queue_realm_cap_snaps %p %llx queue child %p %llx\n", 616 dout("queue_realm_cap_snaps %p %llx queue child %p %llx\n",
@@ -928,5 +940,16 @@ out:
928 return; 940 return;
929} 941}
930 942
943int __init ceph_snap_init(void)
944{
945 empty_snapc = ceph_create_snap_context(0, GFP_NOFS);
946 if (!empty_snapc)
947 return -ENOMEM;
948 empty_snapc->seq = 1;
949 return 0;
950}
931 951
932 952void ceph_snap_exit(void)
953{
954 ceph_put_snap_context(empty_snapc);
955}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index f6e12377335c..50f06cddc94b 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -515,7 +515,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
515 struct ceph_fs_client *fsc; 515 struct ceph_fs_client *fsc;
516 const u64 supported_features = 516 const u64 supported_features =
517 CEPH_FEATURE_FLOCK | 517 CEPH_FEATURE_FLOCK |
518 CEPH_FEATURE_DIRLAYOUTHASH; 518 CEPH_FEATURE_DIRLAYOUTHASH |
519 CEPH_FEATURE_MDS_INLINE_DATA;
519 const u64 required_features = 0; 520 const u64 required_features = 0;
520 int page_count; 521 int page_count;
521 size_t size; 522 size_t size;
@@ -1017,9 +1018,6 @@ static struct file_system_type ceph_fs_type = {
1017}; 1018};
1018MODULE_ALIAS_FS("ceph"); 1019MODULE_ALIAS_FS("ceph");
1019 1020
1020#define _STRINGIFY(x) #x
1021#define STRINGIFY(x) _STRINGIFY(x)
1022
1023static int __init init_ceph(void) 1021static int __init init_ceph(void)
1024{ 1022{
1025 int ret = init_caches(); 1023 int ret = init_caches();
@@ -1028,15 +1026,20 @@ static int __init init_ceph(void)
1028 1026
1029 ceph_flock_init(); 1027 ceph_flock_init();
1030 ceph_xattr_init(); 1028 ceph_xattr_init();
1029 ret = ceph_snap_init();
1030 if (ret)
1031 goto out_xattr;
1031 ret = register_filesystem(&ceph_fs_type); 1032 ret = register_filesystem(&ceph_fs_type);
1032 if (ret) 1033 if (ret)
1033 goto out_icache; 1034 goto out_snap;
1034 1035
1035 pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL); 1036 pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL);
1036 1037
1037 return 0; 1038 return 0;
1038 1039
1039out_icache: 1040out_snap:
1041 ceph_snap_exit();
1042out_xattr:
1040 ceph_xattr_exit(); 1043 ceph_xattr_exit();
1041 destroy_caches(); 1044 destroy_caches();
1042out: 1045out:
@@ -1047,6 +1050,7 @@ static void __exit exit_ceph(void)
1047{ 1050{
1048 dout("exit_ceph\n"); 1051 dout("exit_ceph\n");
1049 unregister_filesystem(&ceph_fs_type); 1052 unregister_filesystem(&ceph_fs_type);
1053 ceph_snap_exit();
1050 ceph_xattr_exit(); 1054 ceph_xattr_exit();
1051 destroy_caches(); 1055 destroy_caches();
1052} 1056}
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index b82f507979b8..e1aa32d0759d 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -161,6 +161,7 @@ struct ceph_cap_snap {
161 u64 time_warp_seq; 161 u64 time_warp_seq;
162 int writing; /* a sync write is still in progress */ 162 int writing; /* a sync write is still in progress */
163 int dirty_pages; /* dirty pages awaiting writeback */ 163 int dirty_pages; /* dirty pages awaiting writeback */
164 bool inline_data;
164}; 165};
165 166
166static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) 167static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
@@ -253,9 +254,11 @@ struct ceph_inode_info {
253 spinlock_t i_ceph_lock; 254 spinlock_t i_ceph_lock;
254 255
255 u64 i_version; 256 u64 i_version;
257 u64 i_inline_version;
256 u32 i_time_warp_seq; 258 u32 i_time_warp_seq;
257 259
258 unsigned i_ceph_flags; 260 unsigned i_ceph_flags;
261 int i_ordered_count;
259 atomic_t i_release_count; 262 atomic_t i_release_count;
260 atomic_t i_complete_count; 263 atomic_t i_complete_count;
261 264
@@ -434,14 +437,19 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
434/* 437/*
435 * Ceph inode. 438 * Ceph inode.
436 */ 439 */
437#define CEPH_I_NODELAY 4 /* do not delay cap release */ 440#define CEPH_I_DIR_ORDERED 1 /* dentries in dir are ordered */
438#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */ 441#define CEPH_I_NODELAY 4 /* do not delay cap release */
439#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */ 442#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */
443#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */
440 444
441static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, 445static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
442 int release_count) 446 int release_count, int ordered_count)
443{ 447{
444 atomic_set(&ci->i_complete_count, release_count); 448 atomic_set(&ci->i_complete_count, release_count);
449 if (ci->i_ordered_count == ordered_count)
450 ci->i_ceph_flags |= CEPH_I_DIR_ORDERED;
451 else
452 ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
445} 453}
446 454
447static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci) 455static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci)
@@ -455,16 +463,35 @@ static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci)
455 atomic_read(&ci->i_release_count); 463 atomic_read(&ci->i_release_count);
456} 464}
457 465
466static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci)
467{
468 return __ceph_dir_is_complete(ci) &&
469 (ci->i_ceph_flags & CEPH_I_DIR_ORDERED);
470}
471
458static inline void ceph_dir_clear_complete(struct inode *inode) 472static inline void ceph_dir_clear_complete(struct inode *inode)
459{ 473{
460 __ceph_dir_clear_complete(ceph_inode(inode)); 474 __ceph_dir_clear_complete(ceph_inode(inode));
461} 475}
462 476
463static inline bool ceph_dir_is_complete(struct inode *inode) 477static inline void ceph_dir_clear_ordered(struct inode *inode)
464{ 478{
465 return __ceph_dir_is_complete(ceph_inode(inode)); 479 struct ceph_inode_info *ci = ceph_inode(inode);
480 spin_lock(&ci->i_ceph_lock);
481 ci->i_ordered_count++;
482 ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
483 spin_unlock(&ci->i_ceph_lock);
466} 484}
467 485
486static inline bool ceph_dir_is_complete_ordered(struct inode *inode)
487{
488 struct ceph_inode_info *ci = ceph_inode(inode);
489 bool ret;
490 spin_lock(&ci->i_ceph_lock);
491 ret = __ceph_dir_is_complete_ordered(ci);
492 spin_unlock(&ci->i_ceph_lock);
493 return ret;
494}
468 495
469/* find a specific frag @f */ 496/* find a specific frag @f */
470extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, 497extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci,
@@ -580,6 +607,7 @@ struct ceph_file_info {
580 char *last_name; /* last entry in previous chunk */ 607 char *last_name; /* last entry in previous chunk */
581 struct dentry *dentry; /* next dentry (for dcache readdir) */ 608 struct dentry *dentry; /* next dentry (for dcache readdir) */
582 int dir_release_count; 609 int dir_release_count;
610 int dir_ordered_count;
583 611
584 /* used for -o dirstat read() on directory thing */ 612 /* used for -o dirstat read() on directory thing */
585 char *dir_info; 613 char *dir_info;
@@ -673,6 +701,8 @@ extern void ceph_queue_cap_snap(struct ceph_inode_info *ci);
673extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci, 701extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
674 struct ceph_cap_snap *capsnap); 702 struct ceph_cap_snap *capsnap);
675extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc); 703extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
704extern int ceph_snap_init(void);
705extern void ceph_snap_exit(void);
676 706
677/* 707/*
678 * a cap_snap is "pending" if it is still awaiting an in-progress 708 * a cap_snap is "pending" if it is still awaiting an in-progress
@@ -715,7 +745,12 @@ extern void ceph_queue_vmtruncate(struct inode *inode);
715extern void ceph_queue_invalidate(struct inode *inode); 745extern void ceph_queue_invalidate(struct inode *inode);
716extern void ceph_queue_writeback(struct inode *inode); 746extern void ceph_queue_writeback(struct inode *inode);
717 747
718extern int ceph_do_getattr(struct inode *inode, int mask, bool force); 748extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
749 int mask, bool force);
750static inline int ceph_do_getattr(struct inode *inode, int mask, bool force)
751{
752 return __ceph_do_getattr(inode, NULL, mask, force);
753}
719extern int ceph_permission(struct inode *inode, int mask); 754extern int ceph_permission(struct inode *inode, int mask);
720extern int ceph_setattr(struct dentry *dentry, struct iattr *attr); 755extern int ceph_setattr(struct dentry *dentry, struct iattr *attr);
721extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry, 756extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
@@ -830,7 +865,7 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
830 int mds, int drop, int unless); 865 int mds, int drop, int unless);
831 866
832extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, 867extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
833 int *got, loff_t endoff); 868 loff_t endoff, int *got, struct page **pinned_page);
834 869
835/* for counting open files by mode */ 870/* for counting open files by mode */
836static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode) 871static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode)
@@ -852,7 +887,9 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
852 struct file *file, unsigned flags, umode_t mode, 887 struct file *file, unsigned flags, umode_t mode,
853 int *opened); 888 int *opened);
854extern int ceph_release(struct inode *inode, struct file *filp); 889extern int ceph_release(struct inode *inode, struct file *filp);
855 890extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
891 char *data, size_t len);
892int ceph_uninline_data(struct file *filp, struct page *locked_page);
856/* dir.c */ 893/* dir.c */
857extern const struct file_operations ceph_dir_fops; 894extern const struct file_operations ceph_dir_fops;
858extern const struct inode_operations ceph_dir_iops; 895extern const struct inode_operations ceph_dir_iops;
diff --git a/fs/ceph/super.h.rej b/fs/ceph/super.h.rej
new file mode 100644
index 000000000000..88fe3dfadb29
--- /dev/null
+++ b/fs/ceph/super.h.rej
@@ -0,0 +1,10 @@
1--- fs/ceph/super.h
2+++ fs/ceph/super.h
3@@ -254,6 +255,7 @@
4 spinlock_t i_ceph_lock;
5
6 u64 i_version;
7+ u64 i_inline_version;
8 u32 i_time_warp_seq;
9
10 unsigned i_ceph_flags;
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 678b0d2bbbc4..5a492caf34cb 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -854,7 +854,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
854 struct ceph_pagelist *pagelist = NULL; 854 struct ceph_pagelist *pagelist = NULL;
855 int err; 855 int err;
856 856
857 if (value) { 857 if (size > 0) {
858 /* copy value into pagelist */ 858 /* copy value into pagelist */
859 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 859 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
860 if (!pagelist) 860 if (!pagelist)
@@ -864,7 +864,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
864 err = ceph_pagelist_append(pagelist, value, size); 864 err = ceph_pagelist_append(pagelist, value, size);
865 if (err) 865 if (err)
866 goto out; 866 goto out;
867 } else { 867 } else if (!value) {
868 flags |= CEPH_XATTR_REMOVE; 868 flags |= CEPH_XATTR_REMOVE;
869 } 869 }
870 870
@@ -1001,6 +1001,9 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
1001 if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN)) 1001 if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
1002 return generic_setxattr(dentry, name, value, size, flags); 1002 return generic_setxattr(dentry, name, value, size, flags);
1003 1003
1004 if (size == 0)
1005 value = ""; /* empty EA, do not remove */
1006
1004 return __ceph_setxattr(dentry, name, value, size, flags); 1007 return __ceph_setxattr(dentry, name, value, size, flags);
1005} 1008}
1006 1009
diff --git a/include/linux/ceph/auth.h b/include/linux/ceph/auth.h
index 5f3386844134..260d78b587c4 100644
--- a/include/linux/ceph/auth.h
+++ b/include/linux/ceph/auth.h
@@ -13,6 +13,7 @@
13 13
14struct ceph_auth_client; 14struct ceph_auth_client;
15struct ceph_authorizer; 15struct ceph_authorizer;
16struct ceph_msg;
16 17
17struct ceph_auth_handshake { 18struct ceph_auth_handshake {
18 struct ceph_authorizer *authorizer; 19 struct ceph_authorizer *authorizer;
@@ -20,6 +21,10 @@ struct ceph_auth_handshake {
20 size_t authorizer_buf_len; 21 size_t authorizer_buf_len;
21 void *authorizer_reply_buf; 22 void *authorizer_reply_buf;
22 size_t authorizer_reply_buf_len; 23 size_t authorizer_reply_buf_len;
24 int (*sign_message)(struct ceph_auth_handshake *auth,
25 struct ceph_msg *msg);
26 int (*check_message_signature)(struct ceph_auth_handshake *auth,
27 struct ceph_msg *msg);
23}; 28};
24 29
25struct ceph_auth_client_ops { 30struct ceph_auth_client_ops {
@@ -66,6 +71,11 @@ struct ceph_auth_client_ops {
66 void (*reset)(struct ceph_auth_client *ac); 71 void (*reset)(struct ceph_auth_client *ac);
67 72
68 void (*destroy)(struct ceph_auth_client *ac); 73 void (*destroy)(struct ceph_auth_client *ac);
74
75 int (*sign_message)(struct ceph_auth_handshake *auth,
76 struct ceph_msg *msg);
77 int (*check_message_signature)(struct ceph_auth_handshake *auth,
78 struct ceph_msg *msg);
69}; 79};
70 80
71struct ceph_auth_client { 81struct ceph_auth_client {
@@ -113,4 +123,20 @@ extern int ceph_auth_verify_authorizer_reply(struct ceph_auth_client *ac,
113extern void ceph_auth_invalidate_authorizer(struct ceph_auth_client *ac, 123extern void ceph_auth_invalidate_authorizer(struct ceph_auth_client *ac,
114 int peer_type); 124 int peer_type);
115 125
126static inline int ceph_auth_sign_message(struct ceph_auth_handshake *auth,
127 struct ceph_msg *msg)
128{
129 if (auth->sign_message)
130 return auth->sign_message(auth, msg);
131 return 0;
132}
133
134static inline
135int ceph_auth_check_message_signature(struct ceph_auth_handshake *auth,
136 struct ceph_msg *msg)
137{
138 if (auth->check_message_signature)
139 return auth->check_message_signature(auth, msg);
140 return 0;
141}
116#endif 142#endif
diff --git a/include/linux/ceph/buffer.h b/include/linux/ceph/buffer.h
index 07ad423cc37f..07ca15e76100 100644
--- a/include/linux/ceph/buffer.h
+++ b/include/linux/ceph/buffer.h
@@ -10,8 +10,7 @@
10/* 10/*
11 * a simple reference counted buffer. 11 * a simple reference counted buffer.
12 * 12 *
13 * use kmalloc for small sizes (<= one page), vmalloc for larger 13 * use kmalloc for smaller sizes, vmalloc for larger sizes.
14 * sizes.
15 */ 14 */
16struct ceph_buffer { 15struct ceph_buffer {
17 struct kref kref; 16 struct kref kref;
diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h
index d12659ce550d..71e05bbf8ceb 100644
--- a/include/linux/ceph/ceph_features.h
+++ b/include/linux/ceph/ceph_features.h
@@ -84,6 +84,7 @@ static inline u64 ceph_sanitize_features(u64 features)
84 CEPH_FEATURE_PGPOOL3 | \ 84 CEPH_FEATURE_PGPOOL3 | \
85 CEPH_FEATURE_OSDENC | \ 85 CEPH_FEATURE_OSDENC | \
86 CEPH_FEATURE_CRUSH_TUNABLES | \ 86 CEPH_FEATURE_CRUSH_TUNABLES | \
87 CEPH_FEATURE_MSG_AUTH | \
87 CEPH_FEATURE_CRUSH_TUNABLES2 | \ 88 CEPH_FEATURE_CRUSH_TUNABLES2 | \
88 CEPH_FEATURE_REPLY_CREATE_INODE | \ 89 CEPH_FEATURE_REPLY_CREATE_INODE | \
89 CEPH_FEATURE_OSDHASHPSPOOL | \ 90 CEPH_FEATURE_OSDHASHPSPOOL | \
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index 3c97d5e9b951..c0dadaac26e3 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -522,8 +522,11 @@ struct ceph_mds_reply_dirfrag {
522 __le32 dist[]; 522 __le32 dist[];
523} __attribute__ ((packed)); 523} __attribute__ ((packed));
524 524
525#define CEPH_LOCK_FCNTL 1 525#define CEPH_LOCK_FCNTL 1
526#define CEPH_LOCK_FLOCK 2 526#define CEPH_LOCK_FLOCK 2
527#define CEPH_LOCK_FCNTL_INTR 3
528#define CEPH_LOCK_FLOCK_INTR 4
529
527 530
528#define CEPH_LOCK_SHARED 1 531#define CEPH_LOCK_SHARED 1
529#define CEPH_LOCK_EXCL 2 532#define CEPH_LOCK_EXCL 2
@@ -549,6 +552,7 @@ struct ceph_filelock {
549 552
550int ceph_flags_to_mode(int flags); 553int ceph_flags_to_mode(int flags);
551 554
555#define CEPH_INLINE_NONE ((__u64)-1)
552 556
553/* capability bits */ 557/* capability bits */
554#define CEPH_CAP_PIN 1 /* no specific capabilities beyond the pin */ 558#define CEPH_CAP_PIN 1 /* no specific capabilities beyond the pin */
@@ -613,6 +617,8 @@ int ceph_flags_to_mode(int flags);
613 CEPH_CAP_LINK_SHARED | \ 617 CEPH_CAP_LINK_SHARED | \
614 CEPH_CAP_FILE_SHARED | \ 618 CEPH_CAP_FILE_SHARED | \
615 CEPH_CAP_XATTR_SHARED) 619 CEPH_CAP_XATTR_SHARED)
620#define CEPH_STAT_CAP_INLINE_DATA (CEPH_CAP_FILE_SHARED | \
621 CEPH_CAP_FILE_RD)
616 622
617#define CEPH_CAP_ANY_SHARED (CEPH_CAP_AUTH_SHARED | \ 623#define CEPH_CAP_ANY_SHARED (CEPH_CAP_AUTH_SHARED | \
618 CEPH_CAP_LINK_SHARED | \ 624 CEPH_CAP_LINK_SHARED | \
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 07bc359b88ac..8b11a79ca1cb 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -29,6 +29,7 @@
29#define CEPH_OPT_NOSHARE (1<<1) /* don't share client with other sbs */ 29#define CEPH_OPT_NOSHARE (1<<1) /* don't share client with other sbs */
30#define CEPH_OPT_MYIP (1<<2) /* specified my ip */ 30#define CEPH_OPT_MYIP (1<<2) /* specified my ip */
31#define CEPH_OPT_NOCRC (1<<3) /* no data crc on writes */ 31#define CEPH_OPT_NOCRC (1<<3) /* no data crc on writes */
32#define CEPH_OPT_NOMSGAUTH (1<<4) /* not require cephx message signature */
32 33
33#define CEPH_OPT_DEFAULT (0) 34#define CEPH_OPT_DEFAULT (0)
34 35
@@ -184,7 +185,6 @@ extern bool libceph_compatible(void *data);
184extern const char *ceph_msg_type_name(int type); 185extern const char *ceph_msg_type_name(int type);
185extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid); 186extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid);
186extern void *ceph_kvmalloc(size_t size, gfp_t flags); 187extern void *ceph_kvmalloc(size_t size, gfp_t flags);
187extern void ceph_kvfree(const void *ptr);
188 188
189extern struct ceph_options *ceph_parse_options(char *options, 189extern struct ceph_options *ceph_parse_options(char *options,
190 const char *dev_name, const char *dev_name_end, 190 const char *dev_name, const char *dev_name_end,
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 40ae58e3e9db..d9d396c16503 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -42,6 +42,10 @@ struct ceph_connection_operations {
42 struct ceph_msg * (*alloc_msg) (struct ceph_connection *con, 42 struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
43 struct ceph_msg_header *hdr, 43 struct ceph_msg_header *hdr,
44 int *skip); 44 int *skip);
45 int (*sign_message) (struct ceph_connection *con, struct ceph_msg *msg);
46
47 int (*check_message_signature) (struct ceph_connection *con,
48 struct ceph_msg *msg);
45}; 49};
46 50
47/* use format string %s%d */ 51/* use format string %s%d */
@@ -142,7 +146,10 @@ struct ceph_msg_data_cursor {
142 */ 146 */
143struct ceph_msg { 147struct ceph_msg {
144 struct ceph_msg_header hdr; /* header */ 148 struct ceph_msg_header hdr; /* header */
145 struct ceph_msg_footer footer; /* footer */ 149 union {
150 struct ceph_msg_footer footer; /* footer */
151 struct ceph_msg_footer_old old_footer; /* old format footer */
152 };
146 struct kvec front; /* unaligned blobs of message */ 153 struct kvec front; /* unaligned blobs of message */
147 struct ceph_buffer *middle; 154 struct ceph_buffer *middle;
148 155
diff --git a/include/linux/ceph/msgr.h b/include/linux/ceph/msgr.h
index 3d94a73b5f30..1c1887206ffa 100644
--- a/include/linux/ceph/msgr.h
+++ b/include/linux/ceph/msgr.h
@@ -152,7 +152,8 @@ struct ceph_msg_header {
152 receiver: mask against ~PAGE_MASK */ 152 receiver: mask against ~PAGE_MASK */
153 153
154 struct ceph_entity_name src; 154 struct ceph_entity_name src;
155 __le32 reserved; 155 __le16 compat_version;
156 __le16 reserved;
156 __le32 crc; /* header crc32c */ 157 __le32 crc; /* header crc32c */
157} __attribute__ ((packed)); 158} __attribute__ ((packed));
158 159
@@ -164,13 +165,21 @@ struct ceph_msg_header {
164/* 165/*
165 * follows data payload 166 * follows data payload
166 */ 167 */
168struct ceph_msg_footer_old {
169 __le32 front_crc, middle_crc, data_crc;
170 __u8 flags;
171} __attribute__ ((packed));
172
167struct ceph_msg_footer { 173struct ceph_msg_footer {
168 __le32 front_crc, middle_crc, data_crc; 174 __le32 front_crc, middle_crc, data_crc;
175 // sig holds the 64 bits of the digital signature for the message PLR
176 __le64 sig;
169 __u8 flags; 177 __u8 flags;
170} __attribute__ ((packed)); 178} __attribute__ ((packed));
171 179
172#define CEPH_MSG_FOOTER_COMPLETE (1<<0) /* msg wasn't aborted */ 180#define CEPH_MSG_FOOTER_COMPLETE (1<<0) /* msg wasn't aborted */
173#define CEPH_MSG_FOOTER_NOCRC (1<<1) /* no data crc */ 181#define CEPH_MSG_FOOTER_NOCRC (1<<1) /* no data crc */
182#define CEPH_MSG_FOOTER_SIGNED (1<<2) /* msg was signed */
174 183
175 184
176#endif 185#endif
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 03aeb27fcc69..5d86416d35f2 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -87,6 +87,13 @@ struct ceph_osd_req_op {
87 struct ceph_osd_data osd_data; 87 struct ceph_osd_data osd_data;
88 } extent; 88 } extent;
89 struct { 89 struct {
90 __le32 name_len;
91 __le32 value_len;
92 __u8 cmp_op; /* CEPH_OSD_CMPXATTR_OP_* */
93 __u8 cmp_mode; /* CEPH_OSD_CMPXATTR_MODE_* */
94 struct ceph_osd_data osd_data;
95 } xattr;
96 struct {
90 const char *class_name; 97 const char *class_name;
91 const char *method_name; 98 const char *method_name;
92 struct ceph_osd_data request_info; 99 struct ceph_osd_data request_info;
@@ -295,6 +302,9 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
295extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req, 302extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
296 unsigned int which, u16 opcode, 303 unsigned int which, u16 opcode,
297 const char *class, const char *method); 304 const char *class, const char *method);
305extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
306 u16 opcode, const char *name, const void *value,
307 size_t size, u8 cmp_op, u8 cmp_mode);
298extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req, 308extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
299 unsigned int which, u16 opcode, 309 unsigned int which, u16 opcode,
300 u64 cookie, u64 version, int flag); 310 u64 cookie, u64 version, int flag);
@@ -318,7 +328,8 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
318 struct ceph_file_layout *layout, 328 struct ceph_file_layout *layout,
319 struct ceph_vino vino, 329 struct ceph_vino vino,
320 u64 offset, u64 *len, 330 u64 offset, u64 *len,
321 int num_ops, int opcode, int flags, 331 unsigned int which, int num_ops,
332 int opcode, int flags,
322 struct ceph_snap_context *snapc, 333 struct ceph_snap_context *snapc,
323 u32 truncate_seq, u64 truncate_size, 334 u32 truncate_seq, u64 truncate_size,
324 bool use_mempool); 335 bool use_mempool);
diff --git a/include/linux/ceph/pagelist.h b/include/linux/ceph/pagelist.h
index 5f871d84ddce..13d71fe18b0c 100644
--- a/include/linux/ceph/pagelist.h
+++ b/include/linux/ceph/pagelist.h
@@ -1,8 +1,10 @@
1#ifndef __FS_CEPH_PAGELIST_H 1#ifndef __FS_CEPH_PAGELIST_H
2#define __FS_CEPH_PAGELIST_H 2#define __FS_CEPH_PAGELIST_H
3 3
4#include <linux/list.h> 4#include <asm/byteorder.h>
5#include <linux/atomic.h> 5#include <linux/atomic.h>
6#include <linux/list.h>
7#include <linux/types.h>
6 8
7struct ceph_pagelist { 9struct ceph_pagelist {
8 struct list_head head; 10 struct list_head head;
diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c
index 7e38b729696a..15845814a0f2 100644
--- a/net/ceph/auth_x.c
+++ b/net/ceph/auth_x.c
@@ -8,6 +8,7 @@
8 8
9#include <linux/ceph/decode.h> 9#include <linux/ceph/decode.h>
10#include <linux/ceph/auth.h> 10#include <linux/ceph/auth.h>
11#include <linux/ceph/messenger.h>
11 12
12#include "crypto.h" 13#include "crypto.h"
13#include "auth_x.h" 14#include "auth_x.h"
@@ -293,6 +294,11 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
293 dout("build_authorizer for %s %p\n", 294 dout("build_authorizer for %s %p\n",
294 ceph_entity_type_name(th->service), au); 295 ceph_entity_type_name(th->service), au);
295 296
297 ceph_crypto_key_destroy(&au->session_key);
298 ret = ceph_crypto_key_clone(&au->session_key, &th->session_key);
299 if (ret)
300 return ret;
301
296 maxlen = sizeof(*msg_a) + sizeof(msg_b) + 302 maxlen = sizeof(*msg_a) + sizeof(msg_b) +
297 ceph_x_encrypt_buflen(ticket_blob_len); 303 ceph_x_encrypt_buflen(ticket_blob_len);
298 dout(" need len %d\n", maxlen); 304 dout(" need len %d\n", maxlen);
@@ -302,8 +308,10 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
302 } 308 }
303 if (!au->buf) { 309 if (!au->buf) {
304 au->buf = ceph_buffer_new(maxlen, GFP_NOFS); 310 au->buf = ceph_buffer_new(maxlen, GFP_NOFS);
305 if (!au->buf) 311 if (!au->buf) {
312 ceph_crypto_key_destroy(&au->session_key);
306 return -ENOMEM; 313 return -ENOMEM;
314 }
307 } 315 }
308 au->service = th->service; 316 au->service = th->service;
309 au->secret_id = th->secret_id; 317 au->secret_id = th->secret_id;
@@ -329,7 +337,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
329 get_random_bytes(&au->nonce, sizeof(au->nonce)); 337 get_random_bytes(&au->nonce, sizeof(au->nonce));
330 msg_b.struct_v = 1; 338 msg_b.struct_v = 1;
331 msg_b.nonce = cpu_to_le64(au->nonce); 339 msg_b.nonce = cpu_to_le64(au->nonce);
332 ret = ceph_x_encrypt(&th->session_key, &msg_b, sizeof(msg_b), 340 ret = ceph_x_encrypt(&au->session_key, &msg_b, sizeof(msg_b),
333 p, end - p); 341 p, end - p);
334 if (ret < 0) 342 if (ret < 0)
335 goto out_buf; 343 goto out_buf;
@@ -560,6 +568,8 @@ static int ceph_x_create_authorizer(
560 auth->authorizer_buf_len = au->buf->vec.iov_len; 568 auth->authorizer_buf_len = au->buf->vec.iov_len;
561 auth->authorizer_reply_buf = au->reply_buf; 569 auth->authorizer_reply_buf = au->reply_buf;
562 auth->authorizer_reply_buf_len = sizeof (au->reply_buf); 570 auth->authorizer_reply_buf_len = sizeof (au->reply_buf);
571 auth->sign_message = ac->ops->sign_message;
572 auth->check_message_signature = ac->ops->check_message_signature;
563 573
564 return 0; 574 return 0;
565} 575}
@@ -588,17 +598,13 @@ static int ceph_x_verify_authorizer_reply(struct ceph_auth_client *ac,
588 struct ceph_authorizer *a, size_t len) 598 struct ceph_authorizer *a, size_t len)
589{ 599{
590 struct ceph_x_authorizer *au = (void *)a; 600 struct ceph_x_authorizer *au = (void *)a;
591 struct ceph_x_ticket_handler *th;
592 int ret = 0; 601 int ret = 0;
593 struct ceph_x_authorize_reply reply; 602 struct ceph_x_authorize_reply reply;
594 void *preply = &reply; 603 void *preply = &reply;
595 void *p = au->reply_buf; 604 void *p = au->reply_buf;
596 void *end = p + sizeof(au->reply_buf); 605 void *end = p + sizeof(au->reply_buf);
597 606
598 th = get_ticket_handler(ac, au->service); 607 ret = ceph_x_decrypt(&au->session_key, &p, end, &preply, sizeof(reply));
599 if (IS_ERR(th))
600 return PTR_ERR(th);
601 ret = ceph_x_decrypt(&th->session_key, &p, end, &preply, sizeof(reply));
602 if (ret < 0) 608 if (ret < 0)
603 return ret; 609 return ret;
604 if (ret != sizeof(reply)) 610 if (ret != sizeof(reply))
@@ -618,6 +624,7 @@ static void ceph_x_destroy_authorizer(struct ceph_auth_client *ac,
618{ 624{
619 struct ceph_x_authorizer *au = (void *)a; 625 struct ceph_x_authorizer *au = (void *)a;
620 626
627 ceph_crypto_key_destroy(&au->session_key);
621 ceph_buffer_put(au->buf); 628 ceph_buffer_put(au->buf);
622 kfree(au); 629 kfree(au);
623} 630}
@@ -663,6 +670,59 @@ static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
663 memset(&th->validity, 0, sizeof(th->validity)); 670 memset(&th->validity, 0, sizeof(th->validity));
664} 671}
665 672
673static int calcu_signature(struct ceph_x_authorizer *au,
674 struct ceph_msg *msg, __le64 *sig)
675{
676 int ret;
677 char tmp_enc[40];
678 __le32 tmp[5] = {
679 16u, msg->hdr.crc, msg->footer.front_crc,
680 msg->footer.middle_crc, msg->footer.data_crc,
681 };
682 ret = ceph_x_encrypt(&au->session_key, &tmp, sizeof(tmp),
683 tmp_enc, sizeof(tmp_enc));
684 if (ret < 0)
685 return ret;
686 *sig = *(__le64*)(tmp_enc + 4);
687 return 0;
688}
689
690static int ceph_x_sign_message(struct ceph_auth_handshake *auth,
691 struct ceph_msg *msg)
692{
693 int ret;
694 if (!auth->authorizer)
695 return 0;
696 ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
697 msg, &msg->footer.sig);
698 if (ret < 0)
699 return ret;
700 msg->footer.flags |= CEPH_MSG_FOOTER_SIGNED;
701 return 0;
702}
703
704static int ceph_x_check_message_signature(struct ceph_auth_handshake *auth,
705 struct ceph_msg *msg)
706{
707 __le64 sig_check;
708 int ret;
709
710 if (!auth->authorizer)
711 return 0;
712 ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
713 msg, &sig_check);
714 if (ret < 0)
715 return ret;
716 if (sig_check == msg->footer.sig)
717 return 0;
718 if (msg->footer.flags & CEPH_MSG_FOOTER_SIGNED)
719 dout("ceph_x_check_message_signature %p has signature %llx "
720 "expect %llx\n", msg, msg->footer.sig, sig_check);
721 else
722 dout("ceph_x_check_message_signature %p sender did not set "
723 "CEPH_MSG_FOOTER_SIGNED\n", msg);
724 return -EBADMSG;
725}
666 726
667static const struct ceph_auth_client_ops ceph_x_ops = { 727static const struct ceph_auth_client_ops ceph_x_ops = {
668 .name = "x", 728 .name = "x",
@@ -677,6 +737,8 @@ static const struct ceph_auth_client_ops ceph_x_ops = {
677 .invalidate_authorizer = ceph_x_invalidate_authorizer, 737 .invalidate_authorizer = ceph_x_invalidate_authorizer,
678 .reset = ceph_x_reset, 738 .reset = ceph_x_reset,
679 .destroy = ceph_x_destroy, 739 .destroy = ceph_x_destroy,
740 .sign_message = ceph_x_sign_message,
741 .check_message_signature = ceph_x_check_message_signature,
680}; 742};
681 743
682 744
diff --git a/net/ceph/auth_x.h b/net/ceph/auth_x.h
index 65ee72082d99..e8b7c6917d47 100644
--- a/net/ceph/auth_x.h
+++ b/net/ceph/auth_x.h
@@ -26,6 +26,7 @@ struct ceph_x_ticket_handler {
26 26
27 27
28struct ceph_x_authorizer { 28struct ceph_x_authorizer {
29 struct ceph_crypto_key session_key;
29 struct ceph_buffer *buf; 30 struct ceph_buffer *buf;
30 unsigned int service; 31 unsigned int service;
31 u64 nonce; 32 u64 nonce;
diff --git a/net/ceph/buffer.c b/net/ceph/buffer.c
index 621b5f65407f..add5f921a0ff 100644
--- a/net/ceph/buffer.c
+++ b/net/ceph/buffer.c
@@ -6,7 +6,7 @@
6 6
7#include <linux/ceph/buffer.h> 7#include <linux/ceph/buffer.h>
8#include <linux/ceph/decode.h> 8#include <linux/ceph/decode.h>
9#include <linux/ceph/libceph.h> /* for ceph_kv{malloc,free} */ 9#include <linux/ceph/libceph.h> /* for ceph_kvmalloc */
10 10
11struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp) 11struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp)
12{ 12{
@@ -35,7 +35,7 @@ void ceph_buffer_release(struct kref *kref)
35 struct ceph_buffer *b = container_of(kref, struct ceph_buffer, kref); 35 struct ceph_buffer *b = container_of(kref, struct ceph_buffer, kref);
36 36
37 dout("buffer_release %p\n", b); 37 dout("buffer_release %p\n", b);
38 ceph_kvfree(b->vec.iov_base); 38 kvfree(b->vec.iov_base);
39 kfree(b); 39 kfree(b);
40} 40}
41EXPORT_SYMBOL(ceph_buffer_release); 41EXPORT_SYMBOL(ceph_buffer_release);
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 58fbfe134f93..5d5ab67f516d 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -184,14 +184,6 @@ void *ceph_kvmalloc(size_t size, gfp_t flags)
184 return __vmalloc(size, flags | __GFP_HIGHMEM, PAGE_KERNEL); 184 return __vmalloc(size, flags | __GFP_HIGHMEM, PAGE_KERNEL);
185} 185}
186 186
187void ceph_kvfree(const void *ptr)
188{
189 if (is_vmalloc_addr(ptr))
190 vfree(ptr);
191 else
192 kfree(ptr);
193}
194
195 187
196static int parse_fsid(const char *str, struct ceph_fsid *fsid) 188static int parse_fsid(const char *str, struct ceph_fsid *fsid)
197{ 189{
@@ -245,6 +237,8 @@ enum {
245 Opt_noshare, 237 Opt_noshare,
246 Opt_crc, 238 Opt_crc,
247 Opt_nocrc, 239 Opt_nocrc,
240 Opt_cephx_require_signatures,
241 Opt_nocephx_require_signatures,
248}; 242};
249 243
250static match_table_t opt_tokens = { 244static match_table_t opt_tokens = {
@@ -263,6 +257,8 @@ static match_table_t opt_tokens = {
263 {Opt_noshare, "noshare"}, 257 {Opt_noshare, "noshare"},
264 {Opt_crc, "crc"}, 258 {Opt_crc, "crc"},
265 {Opt_nocrc, "nocrc"}, 259 {Opt_nocrc, "nocrc"},
260 {Opt_cephx_require_signatures, "cephx_require_signatures"},
261 {Opt_nocephx_require_signatures, "nocephx_require_signatures"},
266 {-1, NULL} 262 {-1, NULL}
267}; 263};
268 264
@@ -461,6 +457,12 @@ ceph_parse_options(char *options, const char *dev_name,
461 case Opt_nocrc: 457 case Opt_nocrc:
462 opt->flags |= CEPH_OPT_NOCRC; 458 opt->flags |= CEPH_OPT_NOCRC;
463 break; 459 break;
460 case Opt_cephx_require_signatures:
461 opt->flags &= ~CEPH_OPT_NOMSGAUTH;
462 break;
463 case Opt_nocephx_require_signatures:
464 opt->flags |= CEPH_OPT_NOMSGAUTH;
465 break;
464 466
465 default: 467 default:
466 BUG_ON(token); 468 BUG_ON(token);
@@ -504,6 +506,9 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
504 init_waitqueue_head(&client->auth_wq); 506 init_waitqueue_head(&client->auth_wq);
505 client->auth_err = 0; 507 client->auth_err = 0;
506 508
509 if (!ceph_test_opt(client, NOMSGAUTH))
510 required_features |= CEPH_FEATURE_MSG_AUTH;
511
507 client->extra_mon_dispatch = NULL; 512 client->extra_mon_dispatch = NULL;
508 client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT | 513 client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT |
509 supported_features; 514 supported_features;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 8d1653caffdb..33a2f201e460 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1196,8 +1196,18 @@ static void prepare_write_message_footer(struct ceph_connection *con)
1196 dout("prepare_write_message_footer %p\n", con); 1196 dout("prepare_write_message_footer %p\n", con);
1197 con->out_kvec_is_msg = true; 1197 con->out_kvec_is_msg = true;
1198 con->out_kvec[v].iov_base = &m->footer; 1198 con->out_kvec[v].iov_base = &m->footer;
1199 con->out_kvec[v].iov_len = sizeof(m->footer); 1199 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
1200 con->out_kvec_bytes += sizeof(m->footer); 1200 if (con->ops->sign_message)
1201 con->ops->sign_message(con, m);
1202 else
1203 m->footer.sig = 0;
1204 con->out_kvec[v].iov_len = sizeof(m->footer);
1205 con->out_kvec_bytes += sizeof(m->footer);
1206 } else {
1207 m->old_footer.flags = m->footer.flags;
1208 con->out_kvec[v].iov_len = sizeof(m->old_footer);
1209 con->out_kvec_bytes += sizeof(m->old_footer);
1210 }
1201 con->out_kvec_left++; 1211 con->out_kvec_left++;
1202 con->out_more = m->more_to_follow; 1212 con->out_more = m->more_to_follow;
1203 con->out_msg_done = true; 1213 con->out_msg_done = true;
@@ -2249,6 +2259,7 @@ static int read_partial_message(struct ceph_connection *con)
2249 int ret; 2259 int ret;
2250 unsigned int front_len, middle_len, data_len; 2260 unsigned int front_len, middle_len, data_len;
2251 bool do_datacrc = !con->msgr->nocrc; 2261 bool do_datacrc = !con->msgr->nocrc;
2262 bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
2252 u64 seq; 2263 u64 seq;
2253 u32 crc; 2264 u32 crc;
2254 2265
@@ -2361,12 +2372,21 @@ static int read_partial_message(struct ceph_connection *con)
2361 } 2372 }
2362 2373
2363 /* footer */ 2374 /* footer */
2364 size = sizeof (m->footer); 2375 if (need_sign)
2376 size = sizeof(m->footer);
2377 else
2378 size = sizeof(m->old_footer);
2379
2365 end += size; 2380 end += size;
2366 ret = read_partial(con, end, size, &m->footer); 2381 ret = read_partial(con, end, size, &m->footer);
2367 if (ret <= 0) 2382 if (ret <= 0)
2368 return ret; 2383 return ret;
2369 2384
2385 if (!need_sign) {
2386 m->footer.flags = m->old_footer.flags;
2387 m->footer.sig = 0;
2388 }
2389
2370 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", 2390 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
2371 m, front_len, m->footer.front_crc, middle_len, 2391 m, front_len, m->footer.front_crc, middle_len,
2372 m->footer.middle_crc, data_len, m->footer.data_crc); 2392 m->footer.middle_crc, data_len, m->footer.data_crc);
@@ -2390,6 +2410,12 @@ static int read_partial_message(struct ceph_connection *con)
2390 return -EBADMSG; 2410 return -EBADMSG;
2391 } 2411 }
2392 2412
2413 if (need_sign && con->ops->check_message_signature &&
2414 con->ops->check_message_signature(con, m)) {
2415 pr_err("read_partial_message %p signature check failed\n", m);
2416 return -EBADMSG;
2417 }
2418
2393 return 1; /* done! */ 2419 return 1; /* done! */
2394} 2420}
2395 2421
@@ -3288,7 +3314,7 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
3288static void ceph_msg_free(struct ceph_msg *m) 3314static void ceph_msg_free(struct ceph_msg *m)
3289{ 3315{
3290 dout("%s %p\n", __func__, m); 3316 dout("%s %p\n", __func__, m);
3291 ceph_kvfree(m->front.iov_base); 3317 kvfree(m->front.iov_base);
3292 kmem_cache_free(ceph_msg_cache, m); 3318 kmem_cache_free(ceph_msg_cache, m);
3293} 3319}
3294 3320
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 6f164289bde8..53299c7b0ca4 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -292,6 +292,10 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
292 ceph_osd_data_release(&op->cls.request_data); 292 ceph_osd_data_release(&op->cls.request_data);
293 ceph_osd_data_release(&op->cls.response_data); 293 ceph_osd_data_release(&op->cls.response_data);
294 break; 294 break;
295 case CEPH_OSD_OP_SETXATTR:
296 case CEPH_OSD_OP_CMPXATTR:
297 ceph_osd_data_release(&op->xattr.osd_data);
298 break;
295 default: 299 default:
296 break; 300 break;
297 } 301 }
@@ -476,8 +480,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
476 size_t payload_len = 0; 480 size_t payload_len = 0;
477 481
478 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 482 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
479 opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO && 483 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE);
480 opcode != CEPH_OSD_OP_TRUNCATE);
481 484
482 op->extent.offset = offset; 485 op->extent.offset = offset;
483 op->extent.length = length; 486 op->extent.length = length;
@@ -545,6 +548,39 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
545} 548}
546EXPORT_SYMBOL(osd_req_op_cls_init); 549EXPORT_SYMBOL(osd_req_op_cls_init);
547 550
551int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
552 u16 opcode, const char *name, const void *value,
553 size_t size, u8 cmp_op, u8 cmp_mode)
554{
555 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
556 struct ceph_pagelist *pagelist;
557 size_t payload_len;
558
559 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
560
561 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
562 if (!pagelist)
563 return -ENOMEM;
564
565 ceph_pagelist_init(pagelist);
566
567 payload_len = strlen(name);
568 op->xattr.name_len = payload_len;
569 ceph_pagelist_append(pagelist, name, payload_len);
570
571 op->xattr.value_len = size;
572 ceph_pagelist_append(pagelist, value, size);
573 payload_len += size;
574
575 op->xattr.cmp_op = cmp_op;
576 op->xattr.cmp_mode = cmp_mode;
577
578 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
579 op->payload_len = payload_len;
580 return 0;
581}
582EXPORT_SYMBOL(osd_req_op_xattr_init);
583
548void osd_req_op_watch_init(struct ceph_osd_request *osd_req, 584void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
549 unsigned int which, u16 opcode, 585 unsigned int which, u16 opcode,
550 u64 cookie, u64 version, int flag) 586 u64 cookie, u64 version, int flag)
@@ -626,7 +662,6 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
626 case CEPH_OSD_OP_READ: 662 case CEPH_OSD_OP_READ:
627 case CEPH_OSD_OP_WRITE: 663 case CEPH_OSD_OP_WRITE:
628 case CEPH_OSD_OP_ZERO: 664 case CEPH_OSD_OP_ZERO:
629 case CEPH_OSD_OP_DELETE:
630 case CEPH_OSD_OP_TRUNCATE: 665 case CEPH_OSD_OP_TRUNCATE:
631 if (src->op == CEPH_OSD_OP_WRITE) 666 if (src->op == CEPH_OSD_OP_WRITE)
632 request_data_len = src->extent.length; 667 request_data_len = src->extent.length;
@@ -676,6 +711,19 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
676 dst->alloc_hint.expected_write_size = 711 dst->alloc_hint.expected_write_size =
677 cpu_to_le64(src->alloc_hint.expected_write_size); 712 cpu_to_le64(src->alloc_hint.expected_write_size);
678 break; 713 break;
714 case CEPH_OSD_OP_SETXATTR:
715 case CEPH_OSD_OP_CMPXATTR:
716 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
717 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
718 dst->xattr.cmp_op = src->xattr.cmp_op;
719 dst->xattr.cmp_mode = src->xattr.cmp_mode;
720 osd_data = &src->xattr.osd_data;
721 ceph_osdc_msg_data_add(req->r_request, osd_data);
722 request_data_len = osd_data->pagelist->length;
723 break;
724 case CEPH_OSD_OP_CREATE:
725 case CEPH_OSD_OP_DELETE:
726 break;
679 default: 727 default:
680 pr_err("unsupported osd opcode %s\n", 728 pr_err("unsupported osd opcode %s\n",
681 ceph_osd_op_name(src->op)); 729 ceph_osd_op_name(src->op));
@@ -705,7 +753,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
705struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 753struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
706 struct ceph_file_layout *layout, 754 struct ceph_file_layout *layout,
707 struct ceph_vino vino, 755 struct ceph_vino vino,
708 u64 off, u64 *plen, int num_ops, 756 u64 off, u64 *plen,
757 unsigned int which, int num_ops,
709 int opcode, int flags, 758 int opcode, int flags,
710 struct ceph_snap_context *snapc, 759 struct ceph_snap_context *snapc,
711 u32 truncate_seq, 760 u32 truncate_seq,
@@ -716,13 +765,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
716 u64 objnum = 0; 765 u64 objnum = 0;
717 u64 objoff = 0; 766 u64 objoff = 0;
718 u64 objlen = 0; 767 u64 objlen = 0;
719 u32 object_size;
720 u64 object_base;
721 int r; 768 int r;
722 769
723 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 770 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
724 opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO && 771 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
725 opcode != CEPH_OSD_OP_TRUNCATE); 772 opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
726 773
727 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, 774 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
728 GFP_NOFS); 775 GFP_NOFS);
@@ -738,29 +785,24 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
738 return ERR_PTR(r); 785 return ERR_PTR(r);
739 } 786 }
740 787
741 object_size = le32_to_cpu(layout->fl_object_size); 788 if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
742 object_base = off - objoff; 789 osd_req_op_init(req, which, opcode);
743 if (!(truncate_seq == 1 && truncate_size == -1ULL)) { 790 } else {
744 if (truncate_size <= object_base) { 791 u32 object_size = le32_to_cpu(layout->fl_object_size);
745 truncate_size = 0; 792 u32 object_base = off - objoff;
746 } else { 793 if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
747 truncate_size -= object_base; 794 if (truncate_size <= object_base) {
748 if (truncate_size > object_size) 795 truncate_size = 0;
749 truncate_size = object_size; 796 } else {
797 truncate_size -= object_base;
798 if (truncate_size > object_size)
799 truncate_size = object_size;
800 }
750 } 801 }
802 osd_req_op_extent_init(req, which, opcode, objoff, objlen,
803 truncate_size, truncate_seq);
751 } 804 }
752 805
753 osd_req_op_extent_init(req, 0, opcode, objoff, objlen,
754 truncate_size, truncate_seq);
755
756 /*
757 * A second op in the ops array means the caller wants to
758 * also issue a include a 'startsync' command so that the
759 * osd will flush data quickly.
760 */
761 if (num_ops > 1)
762 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
763
764 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); 806 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
765 807
766 snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name), 808 snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name),
@@ -2626,7 +2668,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
2626 2668
2627 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 2669 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
2628 vino.snap, off, *plen); 2670 vino.snap, off, *plen);
2629 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1, 2671 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
2630 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 2672 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
2631 NULL, truncate_seq, truncate_size, 2673 NULL, truncate_seq, truncate_size,
2632 false); 2674 false);
@@ -2669,7 +2711,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
2669 int page_align = off & ~PAGE_MASK; 2711 int page_align = off & ~PAGE_MASK;
2670 2712
2671 BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */ 2713 BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */
2672 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1, 2714 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
2673 CEPH_OSD_OP_WRITE, 2715 CEPH_OSD_OP_WRITE,
2674 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2716 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
2675 snapc, truncate_seq, truncate_size, 2717 snapc, truncate_seq, truncate_size,
@@ -2920,6 +2962,20 @@ static int invalidate_authorizer(struct ceph_connection *con)
2920 return ceph_monc_validate_auth(&osdc->client->monc); 2962 return ceph_monc_validate_auth(&osdc->client->monc);
2921} 2963}
2922 2964
2965static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
2966{
2967 struct ceph_osd *o = con->private;
2968 struct ceph_auth_handshake *auth = &o->o_auth;
2969 return ceph_auth_sign_message(auth, msg);
2970}
2971
2972static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
2973{
2974 struct ceph_osd *o = con->private;
2975 struct ceph_auth_handshake *auth = &o->o_auth;
2976 return ceph_auth_check_message_signature(auth, msg);
2977}
2978
2923static const struct ceph_connection_operations osd_con_ops = { 2979static const struct ceph_connection_operations osd_con_ops = {
2924 .get = get_osd_con, 2980 .get = get_osd_con,
2925 .put = put_osd_con, 2981 .put = put_osd_con,
@@ -2928,5 +2984,7 @@ static const struct ceph_connection_operations osd_con_ops = {
2928 .verify_authorizer_reply = verify_authorizer_reply, 2984 .verify_authorizer_reply = verify_authorizer_reply,
2929 .invalidate_authorizer = invalidate_authorizer, 2985 .invalidate_authorizer = invalidate_authorizer,
2930 .alloc_msg = alloc_msg, 2986 .alloc_msg = alloc_msg,
2987 .sign_message = sign_message,
2988 .check_message_signature = check_message_signature,
2931 .fault = osd_reset, 2989 .fault = osd_reset,
2932}; 2990};