aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2011-07-26 16:38:50 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2011-07-26 16:38:50 -0400
commitba5b56cb3e3d2cab73d4fee9a022bb69462a8cd9 (patch)
treeeda7ea059a41ae5d68e2ad5a36a87069187ef22a
parent243dd2809a5edd2e0e3e62781083aa44049af37d (diff)
parentd79698da32b317e96216236f265a9b72b78ae568 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (23 commits) ceph: document unlocked d_parent accesses ceph: explicitly reference rename old_dentry parent dir in request ceph: document locking for ceph_set_dentry_offset ceph: avoid d_parent in ceph_dentry_hash; fix ceph_encode_fh() hashing bug ceph: protect d_parent access in ceph_d_revalidate ceph: protect access to d_parent ceph: handle racing calls to ceph_init_dentry ceph: set dir complete frag after adding capability rbd: set blk_queue request sizes to object size ceph: set up readahead size when rsize is not passed rbd: cancel watch request when releasing the device ceph: ignore lease mask ceph: fix ceph_lookup_open intent usage ceph: only link open operations to directory unsafe list if O_CREAT|O_TRUNC ceph: fix bad parent_inode calc in ceph_lookup_open ceph: avoid carrying Fw cap during write into page cache libceph: don't time out osd requests that haven't been received ceph: report f_bfree based on kb_avail rather than diffing. ceph: only queue capsnap if caps are dirty ceph: fix snap writeback when racing with writes ...
-rw-r--r--drivers/block/rbd.c46
-rw-r--r--fs/ceph/debugfs.c2
-rw-r--r--fs/ceph/dir.c116
-rw-r--r--fs/ceph/export.c24
-rw-r--r--fs/ceph/file.c61
-rw-r--r--fs/ceph/inode.c48
-rw-r--r--fs/ceph/ioctl.c15
-rw-r--r--fs/ceph/ioctl.h1
-rw-r--r--fs/ceph/mds_client.c56
-rw-r--r--fs/ceph/mds_client.h3
-rw-r--r--fs/ceph/snap.c25
-rw-r--r--fs/ceph/super.c7
-rw-r--r--fs/ceph/super.h20
-rw-r--r--fs/ceph/xattr.c8
-rw-r--r--include/linux/ceph/messenger.h1
-rw-r--r--net/ceph/messenger.c12
-rw-r--r--net/ceph/osd_client.c6
17 files changed, 306 insertions, 145 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 1278098624e6..15f65b5f3fc7 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -630,6 +630,14 @@ static int rbd_get_num_segments(struct rbd_image_header *header,
630} 630}
631 631
632/* 632/*
633 * returns the size of an object in the image
634 */
635static u64 rbd_obj_bytes(struct rbd_image_header *header)
636{
637 return 1 << header->obj_order;
638}
639
640/*
633 * bio helpers 641 * bio helpers
634 */ 642 */
635 643
@@ -1253,6 +1261,35 @@ fail:
1253 return ret; 1261 return ret;
1254} 1262}
1255 1263
1264/*
1265 * Request sync osd unwatch
1266 */
1267static int rbd_req_sync_unwatch(struct rbd_device *dev,
1268 const char *obj)
1269{
1270 struct ceph_osd_req_op *ops;
1271
1272 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1273 if (ret < 0)
1274 return ret;
1275
1276 ops[0].watch.ver = 0;
1277 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1278 ops[0].watch.flag = 0;
1279
1280 ret = rbd_req_sync_op(dev, NULL,
1281 CEPH_NOSNAP,
1282 0,
1283 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1284 ops,
1285 1, obj, 0, 0, NULL, NULL, NULL);
1286
1287 rbd_destroy_ops(ops);
1288 ceph_osdc_cancel_event(dev->watch_event);
1289 dev->watch_event = NULL;
1290 return ret;
1291}
1292
1256struct rbd_notify_info { 1293struct rbd_notify_info {
1257 struct rbd_device *dev; 1294 struct rbd_device *dev;
1258}; 1295};
@@ -1736,6 +1773,13 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
1736 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock); 1773 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1737 if (!q) 1774 if (!q)
1738 goto out_disk; 1775 goto out_disk;
1776
1777 /* set io sizes to object size */
1778 blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1779 blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1780 blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1781 blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1782
1739 blk_queue_merge_bvec(q, rbd_merge_bvec); 1783 blk_queue_merge_bvec(q, rbd_merge_bvec);
1740 disk->queue = q; 1784 disk->queue = q;
1741 1785
@@ -2290,7 +2334,7 @@ static void rbd_dev_release(struct device *dev)
2290 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc, 2334 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2291 rbd_dev->watch_request); 2335 rbd_dev->watch_request);
2292 if (rbd_dev->watch_event) 2336 if (rbd_dev->watch_event)
2293 ceph_osdc_cancel_event(rbd_dev->watch_event); 2337 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2294 2338
2295 rbd_put_client(rbd_dev); 2339 rbd_put_client(rbd_dev);
2296 2340
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 0dba6915712b..fb962efdacee 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -102,7 +102,7 @@ static int mdsc_show(struct seq_file *s, void *p)
102 path = NULL; 102 path = NULL;
103 spin_lock(&req->r_old_dentry->d_lock); 103 spin_lock(&req->r_old_dentry->d_lock);
104 seq_printf(s, " #%llx/%.*s (%s)", 104 seq_printf(s, " #%llx/%.*s (%s)",
105 ceph_ino(req->r_old_dentry->d_parent->d_inode), 105 ceph_ino(req->r_old_dentry_dir),
106 req->r_old_dentry->d_name.len, 106 req->r_old_dentry->d_name.len,
107 req->r_old_dentry->d_name.name, 107 req->r_old_dentry->d_name.name,
108 path ? path : ""); 108 path ? path : "");
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 1065ac779840..382abc9a6a54 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -40,14 +40,6 @@ int ceph_init_dentry(struct dentry *dentry)
40 if (dentry->d_fsdata) 40 if (dentry->d_fsdata)
41 return 0; 41 return 0;
42 42
43 if (dentry->d_parent == NULL || /* nfs fh_to_dentry */
44 ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP)
45 d_set_d_op(dentry, &ceph_dentry_ops);
46 else if (ceph_snap(dentry->d_parent->d_inode) == CEPH_SNAPDIR)
47 d_set_d_op(dentry, &ceph_snapdir_dentry_ops);
48 else
49 d_set_d_op(dentry, &ceph_snap_dentry_ops);
50
51 di = kmem_cache_alloc(ceph_dentry_cachep, GFP_NOFS | __GFP_ZERO); 43 di = kmem_cache_alloc(ceph_dentry_cachep, GFP_NOFS | __GFP_ZERO);
52 if (!di) 44 if (!di)
53 return -ENOMEM; /* oh well */ 45 return -ENOMEM; /* oh well */
@@ -58,16 +50,42 @@ int ceph_init_dentry(struct dentry *dentry)
58 kmem_cache_free(ceph_dentry_cachep, di); 50 kmem_cache_free(ceph_dentry_cachep, di);
59 goto out_unlock; 51 goto out_unlock;
60 } 52 }
53
54 if (dentry->d_parent == NULL || /* nfs fh_to_dentry */
55 ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP)
56 d_set_d_op(dentry, &ceph_dentry_ops);
57 else if (ceph_snap(dentry->d_parent->d_inode) == CEPH_SNAPDIR)
58 d_set_d_op(dentry, &ceph_snapdir_dentry_ops);
59 else
60 d_set_d_op(dentry, &ceph_snap_dentry_ops);
61
61 di->dentry = dentry; 62 di->dentry = dentry;
62 di->lease_session = NULL; 63 di->lease_session = NULL;
63 dentry->d_fsdata = di;
64 dentry->d_time = jiffies; 64 dentry->d_time = jiffies;
65 /* avoid reordering d_fsdata setup so that the check above is safe */
66 smp_mb();
67 dentry->d_fsdata = di;
65 ceph_dentry_lru_add(dentry); 68 ceph_dentry_lru_add(dentry);
66out_unlock: 69out_unlock:
67 spin_unlock(&dentry->d_lock); 70 spin_unlock(&dentry->d_lock);
68 return 0; 71 return 0;
69} 72}
70 73
74struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry)
75{
76 struct inode *inode = NULL;
77
78 if (!dentry)
79 return NULL;
80
81 spin_lock(&dentry->d_lock);
82 if (dentry->d_parent) {
83 inode = dentry->d_parent->d_inode;
84 ihold(inode);
85 }
86 spin_unlock(&dentry->d_lock);
87 return inode;
88}
71 89
72 90
73/* 91/*
@@ -133,7 +151,7 @@ more:
133 d_unhashed(dentry) ? "!hashed" : "hashed", 151 d_unhashed(dentry) ? "!hashed" : "hashed",
134 parent->d_subdirs.prev, parent->d_subdirs.next); 152 parent->d_subdirs.prev, parent->d_subdirs.next);
135 if (p == &parent->d_subdirs) { 153 if (p == &parent->d_subdirs) {
136 fi->at_end = 1; 154 fi->flags |= CEPH_F_ATEND;
137 goto out_unlock; 155 goto out_unlock;
138 } 156 }
139 spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED); 157 spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED);
@@ -234,7 +252,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
234 const int max_bytes = fsc->mount_options->max_readdir_bytes; 252 const int max_bytes = fsc->mount_options->max_readdir_bytes;
235 253
236 dout("readdir %p filp %p frag %u off %u\n", inode, filp, frag, off); 254 dout("readdir %p filp %p frag %u off %u\n", inode, filp, frag, off);
237 if (fi->at_end) 255 if (fi->flags & CEPH_F_ATEND)
238 return 0; 256 return 0;
239 257
240 /* always start with . and .. */ 258 /* always start with . and .. */
@@ -403,7 +421,7 @@ more:
403 dout("readdir next frag is %x\n", frag); 421 dout("readdir next frag is %x\n", frag);
404 goto more; 422 goto more;
405 } 423 }
406 fi->at_end = 1; 424 fi->flags |= CEPH_F_ATEND;
407 425
408 /* 426 /*
409 * if dir_release_count still matches the dir, no dentries 427 * if dir_release_count still matches the dir, no dentries
@@ -435,7 +453,7 @@ static void reset_readdir(struct ceph_file_info *fi)
435 dput(fi->dentry); 453 dput(fi->dentry);
436 fi->dentry = NULL; 454 fi->dentry = NULL;
437 } 455 }
438 fi->at_end = 0; 456 fi->flags &= ~CEPH_F_ATEND;
439} 457}
440 458
441static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin) 459static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin)
@@ -463,7 +481,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin)
463 if (offset != file->f_pos) { 481 if (offset != file->f_pos) {
464 file->f_pos = offset; 482 file->f_pos = offset;
465 file->f_version = 0; 483 file->f_version = 0;
466 fi->at_end = 0; 484 fi->flags &= ~CEPH_F_ATEND;
467 } 485 }
468 retval = offset; 486 retval = offset;
469 487
@@ -488,21 +506,13 @@ out:
488} 506}
489 507
490/* 508/*
491 * Process result of a lookup/open request. 509 * Handle lookups for the hidden .snap directory.
492 *
493 * Mainly, make sure we return the final req->r_dentry (if it already
494 * existed) in place of the original VFS-provided dentry when they
495 * differ.
496 *
497 * Gracefully handle the case where the MDS replies with -ENOENT and
498 * no trace (which it may do, at its discretion, e.g., if it doesn't
499 * care to issue a lease on the negative dentry).
500 */ 510 */
501struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, 511int ceph_handle_snapdir(struct ceph_mds_request *req,
502 struct dentry *dentry, int err) 512 struct dentry *dentry, int err)
503{ 513{
504 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 514 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
505 struct inode *parent = dentry->d_parent->d_inode; 515 struct inode *parent = dentry->d_parent->d_inode; /* we hold i_mutex */
506 516
507 /* .snap dir? */ 517 /* .snap dir? */
508 if (err == -ENOENT && 518 if (err == -ENOENT &&
@@ -516,7 +526,23 @@ struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
516 d_add(dentry, inode); 526 d_add(dentry, inode);
517 err = 0; 527 err = 0;
518 } 528 }
529 return err;
530}
519 531
532/*
533 * Figure out final result of a lookup/open request.
534 *
535 * Mainly, make sure we return the final req->r_dentry (if it already
536 * existed) in place of the original VFS-provided dentry when they
537 * differ.
538 *
539 * Gracefully handle the case where the MDS replies with -ENOENT and
540 * no trace (which it may do, at its discretion, e.g., if it doesn't
541 * care to issue a lease on the negative dentry).
542 */
543struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
544 struct dentry *dentry, int err)
545{
520 if (err == -ENOENT) { 546 if (err == -ENOENT) {
521 /* no trace? */ 547 /* no trace? */
522 err = 0; 548 err = 0;
@@ -610,6 +636,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
610 req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE); 636 req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE);
611 req->r_locked_dir = dir; 637 req->r_locked_dir = dir;
612 err = ceph_mdsc_do_request(mdsc, NULL, req); 638 err = ceph_mdsc_do_request(mdsc, NULL, req);
639 err = ceph_handle_snapdir(req, dentry, err);
613 dentry = ceph_finish_lookup(req, dentry, err); 640 dentry = ceph_finish_lookup(req, dentry, err);
614 ceph_mdsc_put_request(req); /* will dput(dentry) */ 641 ceph_mdsc_put_request(req); /* will dput(dentry) */
615 dout("lookup result=%p\n", dentry); 642 dout("lookup result=%p\n", dentry);
@@ -789,6 +816,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
789 req->r_dentry = dget(dentry); 816 req->r_dentry = dget(dentry);
790 req->r_num_caps = 2; 817 req->r_num_caps = 2;
791 req->r_old_dentry = dget(old_dentry); /* or inode? hrm. */ 818 req->r_old_dentry = dget(old_dentry); /* or inode? hrm. */
819 req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry);
792 req->r_locked_dir = dir; 820 req->r_locked_dir = dir;
793 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 821 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
794 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 822 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
@@ -887,6 +915,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
887 req->r_dentry = dget(new_dentry); 915 req->r_dentry = dget(new_dentry);
888 req->r_num_caps = 2; 916 req->r_num_caps = 2;
889 req->r_old_dentry = dget(old_dentry); 917 req->r_old_dentry = dget(old_dentry);
918 req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry);
890 req->r_locked_dir = new_dir; 919 req->r_locked_dir = new_dir;
891 req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED; 920 req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED;
892 req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL; 921 req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
@@ -1002,36 +1031,38 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
1002 */ 1031 */
1003static int ceph_d_revalidate(struct dentry *dentry, struct nameidata *nd) 1032static int ceph_d_revalidate(struct dentry *dentry, struct nameidata *nd)
1004{ 1033{
1034 int valid = 0;
1005 struct inode *dir; 1035 struct inode *dir;
1006 1036
1007 if (nd && nd->flags & LOOKUP_RCU) 1037 if (nd && nd->flags & LOOKUP_RCU)
1008 return -ECHILD; 1038 return -ECHILD;
1009 1039
1010 dir = dentry->d_parent->d_inode;
1011
1012 dout("d_revalidate %p '%.*s' inode %p offset %lld\n", dentry, 1040 dout("d_revalidate %p '%.*s' inode %p offset %lld\n", dentry,
1013 dentry->d_name.len, dentry->d_name.name, dentry->d_inode, 1041 dentry->d_name.len, dentry->d_name.name, dentry->d_inode,
1014 ceph_dentry(dentry)->offset); 1042 ceph_dentry(dentry)->offset);
1015 1043
1044 dir = ceph_get_dentry_parent_inode(dentry);
1045
1016 /* always trust cached snapped dentries, snapdir dentry */ 1046 /* always trust cached snapped dentries, snapdir dentry */
1017 if (ceph_snap(dir) != CEPH_NOSNAP) { 1047 if (ceph_snap(dir) != CEPH_NOSNAP) {
1018 dout("d_revalidate %p '%.*s' inode %p is SNAPPED\n", dentry, 1048 dout("d_revalidate %p '%.*s' inode %p is SNAPPED\n", dentry,
1019 dentry->d_name.len, dentry->d_name.name, dentry->d_inode); 1049 dentry->d_name.len, dentry->d_name.name, dentry->d_inode);
1020 goto out_touch; 1050 valid = 1;
1051 } else if (dentry->d_inode &&
1052 ceph_snap(dentry->d_inode) == CEPH_SNAPDIR) {
1053 valid = 1;
1054 } else if (dentry_lease_is_valid(dentry) ||
1055 dir_lease_is_valid(dir, dentry)) {
1056 valid = 1;
1021 } 1057 }
1022 if (dentry->d_inode && ceph_snap(dentry->d_inode) == CEPH_SNAPDIR)
1023 goto out_touch;
1024
1025 if (dentry_lease_is_valid(dentry) ||
1026 dir_lease_is_valid(dir, dentry))
1027 goto out_touch;
1028 1058
1029 dout("d_revalidate %p invalid\n", dentry); 1059 dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
1030 d_drop(dentry); 1060 if (valid)
1031 return 0; 1061 ceph_dentry_lru_touch(dentry);
1032out_touch: 1062 else
1033 ceph_dentry_lru_touch(dentry); 1063 d_drop(dentry);
1034 return 1; 1064 iput(dir);
1065 return valid;
1035} 1066}
1036 1067
1037/* 1068/*
@@ -1228,9 +1259,8 @@ void ceph_dentry_lru_del(struct dentry *dn)
1228 * Return name hash for a given dentry. This is dependent on 1259 * Return name hash for a given dentry. This is dependent on
1229 * the parent directory's hash function. 1260 * the parent directory's hash function.
1230 */ 1261 */
1231unsigned ceph_dentry_hash(struct dentry *dn) 1262unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn)
1232{ 1263{
1233 struct inode *dir = dn->d_parent->d_inode;
1234 struct ceph_inode_info *dci = ceph_inode(dir); 1264 struct ceph_inode_info *dci = ceph_inode(dir);
1235 1265
1236 switch (dci->i_dir_layout.dl_dir_hash) { 1266 switch (dci->i_dir_layout.dl_dir_hash) {
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index f67b687550de..9fbcdecaaccd 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -46,7 +46,7 @@ static int ceph_encode_fh(struct dentry *dentry, u32 *rawfh, int *max_len,
46 int type; 46 int type;
47 struct ceph_nfs_fh *fh = (void *)rawfh; 47 struct ceph_nfs_fh *fh = (void *)rawfh;
48 struct ceph_nfs_confh *cfh = (void *)rawfh; 48 struct ceph_nfs_confh *cfh = (void *)rawfh;
49 struct dentry *parent = dentry->d_parent; 49 struct dentry *parent;
50 struct inode *inode = dentry->d_inode; 50 struct inode *inode = dentry->d_inode;
51 int connected_handle_length = sizeof(*cfh)/4; 51 int connected_handle_length = sizeof(*cfh)/4;
52 int handle_length = sizeof(*fh)/4; 52 int handle_length = sizeof(*fh)/4;
@@ -55,26 +55,33 @@ static int ceph_encode_fh(struct dentry *dentry, u32 *rawfh, int *max_len,
55 if (ceph_snap(inode) != CEPH_NOSNAP) 55 if (ceph_snap(inode) != CEPH_NOSNAP)
56 return -EINVAL; 56 return -EINVAL;
57 57
58 spin_lock(&dentry->d_lock);
59 parent = dget(dentry->d_parent);
60 spin_unlock(&dentry->d_lock);
61
58 if (*max_len >= connected_handle_length) { 62 if (*max_len >= connected_handle_length) {
59 dout("encode_fh %p connectable\n", dentry); 63 dout("encode_fh %p connectable\n", dentry);
60 cfh->ino = ceph_ino(dentry->d_inode); 64 cfh->ino = ceph_ino(dentry->d_inode);
61 cfh->parent_ino = ceph_ino(parent->d_inode); 65 cfh->parent_ino = ceph_ino(parent->d_inode);
62 cfh->parent_name_hash = ceph_dentry_hash(parent); 66 cfh->parent_name_hash = ceph_dentry_hash(parent->d_inode,
67 dentry);
63 *max_len = connected_handle_length; 68 *max_len = connected_handle_length;
64 type = 2; 69 type = 2;
65 } else if (*max_len >= handle_length) { 70 } else if (*max_len >= handle_length) {
66 if (connectable) { 71 if (connectable) {
67 *max_len = connected_handle_length; 72 *max_len = connected_handle_length;
68 return 255; 73 type = 255;
74 } else {
75 dout("encode_fh %p\n", dentry);
76 fh->ino = ceph_ino(dentry->d_inode);
77 *max_len = handle_length;
78 type = 1;
69 } 79 }
70 dout("encode_fh %p\n", dentry);
71 fh->ino = ceph_ino(dentry->d_inode);
72 *max_len = handle_length;
73 type = 1;
74 } else { 80 } else {
75 *max_len = handle_length; 81 *max_len = handle_length;
76 return 255; 82 type = 255;
77 } 83 }
84 dput(parent);
78 return type; 85 return type;
79} 86}
80 87
@@ -123,7 +130,6 @@ static struct dentry *__fh_to_dentry(struct super_block *sb,
123 return dentry; 130 return dentry;
124 } 131 }
125 err = ceph_init_dentry(dentry); 132 err = ceph_init_dentry(dentry);
126
127 if (err < 0) { 133 if (err < 0) {
128 iput(inode); 134 iput(inode);
129 return ERR_PTR(err); 135 return ERR_PTR(err);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 0d0eae05598f..ce549d31eeb7 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -122,7 +122,7 @@ int ceph_open(struct inode *inode, struct file *file)
122 struct ceph_mds_client *mdsc = fsc->mdsc; 122 struct ceph_mds_client *mdsc = fsc->mdsc;
123 struct ceph_mds_request *req; 123 struct ceph_mds_request *req;
124 struct ceph_file_info *cf = file->private_data; 124 struct ceph_file_info *cf = file->private_data;
125 struct inode *parent_inode = file->f_dentry->d_parent->d_inode; 125 struct inode *parent_inode = NULL;
126 int err; 126 int err;
127 int flags, fmode, wanted; 127 int flags, fmode, wanted;
128 128
@@ -194,7 +194,10 @@ int ceph_open(struct inode *inode, struct file *file)
194 req->r_inode = inode; 194 req->r_inode = inode;
195 ihold(inode); 195 ihold(inode);
196 req->r_num_caps = 1; 196 req->r_num_caps = 1;
197 if (flags & (O_CREAT|O_TRUNC))
198 parent_inode = ceph_get_dentry_parent_inode(file->f_dentry);
197 err = ceph_mdsc_do_request(mdsc, parent_inode, req); 199 err = ceph_mdsc_do_request(mdsc, parent_inode, req);
200 iput(parent_inode);
198 if (!err) 201 if (!err)
199 err = ceph_init_file(inode, file, req->r_fmode); 202 err = ceph_init_file(inode, file, req->r_fmode);
200 ceph_mdsc_put_request(req); 203 ceph_mdsc_put_request(req);
@@ -222,9 +225,9 @@ struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry,
222{ 225{
223 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 226 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
224 struct ceph_mds_client *mdsc = fsc->mdsc; 227 struct ceph_mds_client *mdsc = fsc->mdsc;
225 struct file *file = nd->intent.open.file; 228 struct file *file;
226 struct inode *parent_inode = get_dentry_parent_inode(file->f_dentry);
227 struct ceph_mds_request *req; 229 struct ceph_mds_request *req;
230 struct dentry *ret;
228 int err; 231 int err;
229 int flags = nd->intent.open.flags; 232 int flags = nd->intent.open.flags;
230 233
@@ -242,16 +245,24 @@ struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry,
242 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 245 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
243 } 246 }
244 req->r_locked_dir = dir; /* caller holds dir->i_mutex */ 247 req->r_locked_dir = dir; /* caller holds dir->i_mutex */
245 err = ceph_mdsc_do_request(mdsc, parent_inode, req); 248 err = ceph_mdsc_do_request(mdsc,
246 dentry = ceph_finish_lookup(req, dentry, err); 249 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
247 if (!err && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) 250 req);
251 err = ceph_handle_snapdir(req, dentry, err);
252 if (err)
253 goto out;
254 if ((flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
248 err = ceph_handle_notrace_create(dir, dentry); 255 err = ceph_handle_notrace_create(dir, dentry);
249 if (!err) 256 if (err)
250 err = ceph_init_file(req->r_dentry->d_inode, file, 257 goto out;
251 req->r_fmode); 258 file = lookup_instantiate_filp(nd, req->r_dentry, ceph_open);
259 if (IS_ERR(file))
260 err = PTR_ERR(file);
261out:
262 ret = ceph_finish_lookup(req, dentry, err);
252 ceph_mdsc_put_request(req); 263 ceph_mdsc_put_request(req);
253 dout("ceph_lookup_open result=%p\n", dentry); 264 dout("ceph_lookup_open result=%p\n", ret);
254 return dentry; 265 return ret;
255} 266}
256 267
257int ceph_release(struct inode *inode, struct file *file) 268int ceph_release(struct inode *inode, struct file *file)
@@ -643,7 +654,8 @@ again:
643 654
644 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || 655 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
645 (iocb->ki_filp->f_flags & O_DIRECT) || 656 (iocb->ki_filp->f_flags & O_DIRECT) ||
646 (inode->i_sb->s_flags & MS_SYNCHRONOUS)) 657 (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
658 (fi->flags & CEPH_F_SYNC))
647 /* hmm, this isn't really async... */ 659 /* hmm, this isn't really async... */
648 ret = ceph_sync_read(filp, base, len, ppos, &checkeof); 660 ret = ceph_sync_read(filp, base, len, ppos, &checkeof);
649 else 661 else
@@ -712,7 +724,7 @@ retry_snap:
712 want = CEPH_CAP_FILE_BUFFER; 724 want = CEPH_CAP_FILE_BUFFER;
713 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff); 725 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
714 if (ret < 0) 726 if (ret < 0)
715 goto out; 727 goto out_put;
716 728
717 dout("aio_write %p %llx.%llx %llu~%u got cap refs on %s\n", 729 dout("aio_write %p %llx.%llx %llu~%u got cap refs on %s\n",
718 inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len, 730 inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
@@ -720,12 +732,23 @@ retry_snap:
720 732
721 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || 733 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
722 (iocb->ki_filp->f_flags & O_DIRECT) || 734 (iocb->ki_filp->f_flags & O_DIRECT) ||
723 (inode->i_sb->s_flags & MS_SYNCHRONOUS)) { 735 (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
736 (fi->flags & CEPH_F_SYNC)) {
724 ret = ceph_sync_write(file, iov->iov_base, iov->iov_len, 737 ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
725 &iocb->ki_pos); 738 &iocb->ki_pos);
726 } else { 739 } else {
727 ret = generic_file_aio_write(iocb, iov, nr_segs, pos); 740 /*
741 * buffered write; drop Fw early to avoid slow
742 * revocation if we get stuck on balance_dirty_pages
743 */
744 int dirty;
728 745
746 spin_lock(&inode->i_lock);
747 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
748 spin_unlock(&inode->i_lock);
749 ceph_put_cap_refs(ci, got);
750
751 ret = generic_file_aio_write(iocb, iov, nr_segs, pos);
729 if ((ret >= 0 || ret == -EIOCBQUEUED) && 752 if ((ret >= 0 || ret == -EIOCBQUEUED) &&
730 ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host) 753 ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host)
731 || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) { 754 || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) {
@@ -733,7 +756,12 @@ retry_snap:
733 if (err < 0) 756 if (err < 0)
734 ret = err; 757 ret = err;
735 } 758 }
759
760 if (dirty)
761 __mark_inode_dirty(inode, dirty);
762 goto out;
736 } 763 }
764
737 if (ret >= 0) { 765 if (ret >= 0) {
738 int dirty; 766 int dirty;
739 spin_lock(&inode->i_lock); 767 spin_lock(&inode->i_lock);
@@ -743,12 +771,13 @@ retry_snap:
743 __mark_inode_dirty(inode, dirty); 771 __mark_inode_dirty(inode, dirty);
744 } 772 }
745 773
746out: 774out_put:
747 dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n", 775 dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n",
748 inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len, 776 inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
749 ceph_cap_string(got)); 777 ceph_cap_string(got));
750 ceph_put_cap_refs(ci, got); 778 ceph_put_cap_refs(ci, got);
751 779
780out:
752 if (ret == -EOLDSNAPC) { 781 if (ret == -EOLDSNAPC) {
753 dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n", 782 dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n",
754 inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len); 783 inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index dfb2831d8d85..095799ba9dd1 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -560,7 +560,8 @@ static int fill_inode(struct inode *inode,
560 struct ceph_mds_reply_inode *info = iinfo->in; 560 struct ceph_mds_reply_inode *info = iinfo->in;
561 struct ceph_inode_info *ci = ceph_inode(inode); 561 struct ceph_inode_info *ci = ceph_inode(inode);
562 int i; 562 int i;
563 int issued, implemented; 563 int issued = 0, implemented;
564 int updating_inode = 0;
564 struct timespec mtime, atime, ctime; 565 struct timespec mtime, atime, ctime;
565 u32 nsplits; 566 u32 nsplits;
566 struct ceph_buffer *xattr_blob = NULL; 567 struct ceph_buffer *xattr_blob = NULL;
@@ -599,7 +600,8 @@ static int fill_inode(struct inode *inode,
599 if (le64_to_cpu(info->version) > 0 && 600 if (le64_to_cpu(info->version) > 0 &&
600 (ci->i_version & ~1) >= le64_to_cpu(info->version)) 601 (ci->i_version & ~1) >= le64_to_cpu(info->version))
601 goto no_change; 602 goto no_change;
602 603
604 updating_inode = 1;
603 issued = __ceph_caps_issued(ci, &implemented); 605 issued = __ceph_caps_issued(ci, &implemented);
604 issued |= implemented | __ceph_caps_dirty(ci); 606 issued |= implemented | __ceph_caps_dirty(ci);
605 607
@@ -707,17 +709,6 @@ static int fill_inode(struct inode *inode,
707 ci->i_rfiles = le64_to_cpu(info->rfiles); 709 ci->i_rfiles = le64_to_cpu(info->rfiles);
708 ci->i_rsubdirs = le64_to_cpu(info->rsubdirs); 710 ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
709 ceph_decode_timespec(&ci->i_rctime, &info->rctime); 711 ceph_decode_timespec(&ci->i_rctime, &info->rctime);
710
711 /* set dir completion flag? */
712 if (ci->i_files == 0 && ci->i_subdirs == 0 &&
713 ceph_snap(inode) == CEPH_NOSNAP &&
714 (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
715 (issued & CEPH_CAP_FILE_EXCL) == 0 &&
716 (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) {
717 dout(" marking %p complete (empty)\n", inode);
718 /* ci->i_ceph_flags |= CEPH_I_COMPLETE; */
719 ci->i_max_offset = 2;
720 }
721 break; 712 break;
722 default: 713 default:
723 pr_err("fill_inode %llx.%llx BAD mode 0%o\n", 714 pr_err("fill_inode %llx.%llx BAD mode 0%o\n",
@@ -774,6 +765,19 @@ no_change:
774 __ceph_get_fmode(ci, cap_fmode); 765 __ceph_get_fmode(ci, cap_fmode);
775 } 766 }
776 767
768 /* set dir completion flag? */
769 if (S_ISDIR(inode->i_mode) &&
770 updating_inode && /* didn't jump to no_change */
771 ci->i_files == 0 && ci->i_subdirs == 0 &&
772 ceph_snap(inode) == CEPH_NOSNAP &&
773 (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
774 (issued & CEPH_CAP_FILE_EXCL) == 0 &&
775 (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) {
776 dout(" marking %p complete (empty)\n", inode);
777 /* ci->i_ceph_flags |= CEPH_I_COMPLETE; */
778 ci->i_max_offset = 2;
779 }
780
777 /* update delegation info? */ 781 /* update delegation info? */
778 if (dirinfo) 782 if (dirinfo)
779 ceph_fill_dirfrag(inode, dirinfo); 783 ceph_fill_dirfrag(inode, dirinfo);
@@ -805,14 +809,14 @@ static void update_dentry_lease(struct dentry *dentry,
805 return; 809 return;
806 810
807 spin_lock(&dentry->d_lock); 811 spin_lock(&dentry->d_lock);
808 dout("update_dentry_lease %p mask %d duration %lu ms ttl %lu\n", 812 dout("update_dentry_lease %p duration %lu ms ttl %lu\n",
809 dentry, le16_to_cpu(lease->mask), duration, ttl); 813 dentry, duration, ttl);
810 814
811 /* make lease_rdcache_gen match directory */ 815 /* make lease_rdcache_gen match directory */
812 dir = dentry->d_parent->d_inode; 816 dir = dentry->d_parent->d_inode;
813 di->lease_shared_gen = ceph_inode(dir)->i_shared_gen; 817 di->lease_shared_gen = ceph_inode(dir)->i_shared_gen;
814 818
815 if (lease->mask == 0) 819 if (duration == 0)
816 goto out_unlock; 820 goto out_unlock;
817 821
818 if (di->lease_gen == session->s_cap_gen && 822 if (di->lease_gen == session->s_cap_gen &&
@@ -839,11 +843,13 @@ out_unlock:
839/* 843/*
840 * Set dentry's directory position based on the current dir's max, and 844 * Set dentry's directory position based on the current dir's max, and
841 * order it in d_subdirs, so that dcache_readdir behaves. 845 * order it in d_subdirs, so that dcache_readdir behaves.
846 *
847 * Always called under directory's i_mutex.
842 */ 848 */
843static void ceph_set_dentry_offset(struct dentry *dn) 849static void ceph_set_dentry_offset(struct dentry *dn)
844{ 850{
845 struct dentry *dir = dn->d_parent; 851 struct dentry *dir = dn->d_parent;
846 struct inode *inode = dn->d_parent->d_inode; 852 struct inode *inode = dir->d_inode;
847 struct ceph_dentry_info *di; 853 struct ceph_dentry_info *di;
848 854
849 BUG_ON(!inode); 855 BUG_ON(!inode);
@@ -1022,9 +1028,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
1022 1028
1023 /* do we have a dn lease? */ 1029 /* do we have a dn lease? */
1024 have_lease = have_dir_cap || 1030 have_lease = have_dir_cap ||
1025 (le16_to_cpu(rinfo->dlease->mask) & 1031 le32_to_cpu(rinfo->dlease->duration_ms);
1026 CEPH_LOCK_DN);
1027
1028 if (!have_lease) 1032 if (!have_lease)
1029 dout("fill_trace no dentry lease or dir cap\n"); 1033 dout("fill_trace no dentry lease or dir cap\n");
1030 1034
@@ -1560,7 +1564,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1560{ 1564{
1561 struct inode *inode = dentry->d_inode; 1565 struct inode *inode = dentry->d_inode;
1562 struct ceph_inode_info *ci = ceph_inode(inode); 1566 struct ceph_inode_info *ci = ceph_inode(inode);
1563 struct inode *parent_inode = dentry->d_parent->d_inode; 1567 struct inode *parent_inode;
1564 const unsigned int ia_valid = attr->ia_valid; 1568 const unsigned int ia_valid = attr->ia_valid;
1565 struct ceph_mds_request *req; 1569 struct ceph_mds_request *req;
1566 struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; 1570 struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
@@ -1743,7 +1747,9 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1743 req->r_inode_drop = release; 1747 req->r_inode_drop = release;
1744 req->r_args.setattr.mask = cpu_to_le32(mask); 1748 req->r_args.setattr.mask = cpu_to_le32(mask);
1745 req->r_num_caps = 1; 1749 req->r_num_caps = 1;
1750 parent_inode = ceph_get_dentry_parent_inode(dentry);
1746 err = ceph_mdsc_do_request(mdsc, parent_inode, req); 1751 err = ceph_mdsc_do_request(mdsc, parent_inode, req);
1752 iput(parent_inode);
1747 } 1753 }
1748 dout("setattr %p result=%d (%s locally, %d remote)\n", inode, err, 1754 dout("setattr %p result=%d (%s locally, %d remote)\n", inode, err,
1749 ceph_cap_string(dirtied), mask); 1755 ceph_cap_string(dirtied), mask);
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index ef0b5f48e13a..3b256b50f7d8 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -38,7 +38,7 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg)
38static long ceph_ioctl_set_layout(struct file *file, void __user *arg) 38static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
39{ 39{
40 struct inode *inode = file->f_dentry->d_inode; 40 struct inode *inode = file->f_dentry->d_inode;
41 struct inode *parent_inode = file->f_dentry->d_parent->d_inode; 41 struct inode *parent_inode;
42 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 42 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
43 struct ceph_mds_request *req; 43 struct ceph_mds_request *req;
44 struct ceph_ioctl_layout l; 44 struct ceph_ioctl_layout l;
@@ -87,7 +87,9 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
87 req->r_args.setlayout.layout.fl_pg_preferred = 87 req->r_args.setlayout.layout.fl_pg_preferred =
88 cpu_to_le32(l.preferred_osd); 88 cpu_to_le32(l.preferred_osd);
89 89
90 parent_inode = ceph_get_dentry_parent_inode(file->f_dentry);
90 err = ceph_mdsc_do_request(mdsc, parent_inode, req); 91 err = ceph_mdsc_do_request(mdsc, parent_inode, req);
92 iput(parent_inode);
91 ceph_mdsc_put_request(req); 93 ceph_mdsc_put_request(req);
92 return err; 94 return err;
93} 95}
@@ -231,6 +233,14 @@ static long ceph_ioctl_lazyio(struct file *file)
231 return 0; 233 return 0;
232} 234}
233 235
236static long ceph_ioctl_syncio(struct file *file)
237{
238 struct ceph_file_info *fi = file->private_data;
239
240 fi->flags |= CEPH_F_SYNC;
241 return 0;
242}
243
234long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg) 244long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
235{ 245{
236 dout("ioctl file %p cmd %u arg %lu\n", file, cmd, arg); 246 dout("ioctl file %p cmd %u arg %lu\n", file, cmd, arg);
@@ -249,6 +259,9 @@ long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
249 259
250 case CEPH_IOC_LAZYIO: 260 case CEPH_IOC_LAZYIO:
251 return ceph_ioctl_lazyio(file); 261 return ceph_ioctl_lazyio(file);
262
263 case CEPH_IOC_SYNCIO:
264 return ceph_ioctl_syncio(file);
252 } 265 }
253 266
254 return -ENOTTY; 267 return -ENOTTY;
diff --git a/fs/ceph/ioctl.h b/fs/ceph/ioctl.h
index 52e8fd74d450..0c5167e43180 100644
--- a/fs/ceph/ioctl.h
+++ b/fs/ceph/ioctl.h
@@ -40,5 +40,6 @@ struct ceph_ioctl_dataloc {
40 struct ceph_ioctl_dataloc) 40 struct ceph_ioctl_dataloc)
41 41
42#define CEPH_IOC_LAZYIO _IO(CEPH_IOCTL_MAGIC, 4) 42#define CEPH_IOC_LAZYIO _IO(CEPH_IOCTL_MAGIC, 4)
43#define CEPH_IOC_SYNCIO _IO(CEPH_IOCTL_MAGIC, 5)
43 44
44#endif 45#endif
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 0c1d91756528..fee028b5332e 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -483,22 +483,26 @@ void ceph_mdsc_release_request(struct kref *kref)
483 destroy_reply_info(&req->r_reply_info); 483 destroy_reply_info(&req->r_reply_info);
484 } 484 }
485 if (req->r_inode) { 485 if (req->r_inode) {
486 ceph_put_cap_refs(ceph_inode(req->r_inode), 486 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
487 CEPH_CAP_PIN);
488 iput(req->r_inode); 487 iput(req->r_inode);
489 } 488 }
490 if (req->r_locked_dir) 489 if (req->r_locked_dir)
491 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), 490 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
492 CEPH_CAP_PIN);
493 if (req->r_target_inode) 491 if (req->r_target_inode)
494 iput(req->r_target_inode); 492 iput(req->r_target_inode);
495 if (req->r_dentry) 493 if (req->r_dentry)
496 dput(req->r_dentry); 494 dput(req->r_dentry);
497 if (req->r_old_dentry) { 495 if (req->r_old_dentry) {
498 ceph_put_cap_refs( 496 /*
499 ceph_inode(req->r_old_dentry->d_parent->d_inode), 497 * track (and drop pins for) r_old_dentry_dir
500 CEPH_CAP_PIN); 498 * separately, since r_old_dentry's d_parent may have
499 * changed between the dir mutex being dropped and
500 * this request being freed.
501 */
502 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
503 CEPH_CAP_PIN);
501 dput(req->r_old_dentry); 504 dput(req->r_old_dentry);
505 iput(req->r_old_dentry_dir);
502 } 506 }
503 kfree(req->r_path1); 507 kfree(req->r_path1);
504 kfree(req->r_path2); 508 kfree(req->r_path2);
@@ -617,6 +621,12 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
617 */ 621 */
618struct dentry *get_nonsnap_parent(struct dentry *dentry) 622struct dentry *get_nonsnap_parent(struct dentry *dentry)
619{ 623{
624 /*
625 * we don't need to worry about protecting the d_parent access
626 * here because we never renaming inside the snapped namespace
627 * except to resplice to another snapdir, and either the old or new
628 * result is a valid result.
629 */
620 while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP) 630 while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
621 dentry = dentry->d_parent; 631 dentry = dentry->d_parent;
622 return dentry; 632 return dentry;
@@ -652,7 +662,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
652 if (req->r_inode) { 662 if (req->r_inode) {
653 inode = req->r_inode; 663 inode = req->r_inode;
654 } else if (req->r_dentry) { 664 } else if (req->r_dentry) {
655 struct inode *dir = req->r_dentry->d_parent->d_inode; 665 /* ignore race with rename; old or new d_parent is okay */
666 struct dentry *parent = req->r_dentry->d_parent;
667 struct inode *dir = parent->d_inode;
656 668
657 if (dir->i_sb != mdsc->fsc->sb) { 669 if (dir->i_sb != mdsc->fsc->sb) {
658 /* not this fs! */ 670 /* not this fs! */
@@ -660,8 +672,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
660 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 672 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
661 /* direct snapped/virtual snapdir requests 673 /* direct snapped/virtual snapdir requests
662 * based on parent dir inode */ 674 * based on parent dir inode */
663 struct dentry *dn = 675 struct dentry *dn = get_nonsnap_parent(parent);
664 get_nonsnap_parent(req->r_dentry->d_parent);
665 inode = dn->d_inode; 676 inode = dn->d_inode;
666 dout("__choose_mds using nonsnap parent %p\n", inode); 677 dout("__choose_mds using nonsnap parent %p\n", inode);
667 } else if (req->r_dentry->d_inode) { 678 } else if (req->r_dentry->d_inode) {
@@ -670,7 +681,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
670 } else { 681 } else {
671 /* dir + name */ 682 /* dir + name */
672 inode = dir; 683 inode = dir;
673 hash = ceph_dentry_hash(req->r_dentry); 684 hash = ceph_dentry_hash(dir, req->r_dentry);
674 is_hash = true; 685 is_hash = true;
675 } 686 }
676 } 687 }
@@ -1931,9 +1942,8 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1931 if (req->r_locked_dir) 1942 if (req->r_locked_dir)
1932 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 1943 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1933 if (req->r_old_dentry) 1944 if (req->r_old_dentry)
1934 ceph_get_cap_refs( 1945 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
1935 ceph_inode(req->r_old_dentry->d_parent->d_inode), 1946 CEPH_CAP_PIN);
1936 CEPH_CAP_PIN);
1937 1947
1938 /* issue */ 1948 /* issue */
1939 mutex_lock(&mdsc->mutex); 1949 mutex_lock(&mdsc->mutex);
@@ -2714,7 +2724,6 @@ static void handle_lease(struct ceph_mds_client *mdsc,
2714 struct ceph_mds_lease *h = msg->front.iov_base; 2724 struct ceph_mds_lease *h = msg->front.iov_base;
2715 u32 seq; 2725 u32 seq;
2716 struct ceph_vino vino; 2726 struct ceph_vino vino;
2717 int mask;
2718 struct qstr dname; 2727 struct qstr dname;
2719 int release = 0; 2728 int release = 0;
2720 2729
@@ -2725,7 +2734,6 @@ static void handle_lease(struct ceph_mds_client *mdsc,
2725 goto bad; 2734 goto bad;
2726 vino.ino = le64_to_cpu(h->ino); 2735 vino.ino = le64_to_cpu(h->ino);
2727 vino.snap = CEPH_NOSNAP; 2736 vino.snap = CEPH_NOSNAP;
2728 mask = le16_to_cpu(h->mask);
2729 seq = le32_to_cpu(h->seq); 2737 seq = le32_to_cpu(h->seq);
2730 dname.name = (void *)h + sizeof(*h) + sizeof(u32); 2738 dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2731 dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32); 2739 dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
@@ -2737,8 +2745,8 @@ static void handle_lease(struct ceph_mds_client *mdsc,
2737 2745
2738 /* lookup inode */ 2746 /* lookup inode */
2739 inode = ceph_find_inode(sb, vino); 2747 inode = ceph_find_inode(sb, vino);
2740 dout("handle_lease %s, mask %d, ino %llx %p %.*s\n", 2748 dout("handle_lease %s, ino %llx %p %.*s\n",
2741 ceph_lease_op_name(h->action), mask, vino.ino, inode, 2749 ceph_lease_op_name(h->action), vino.ino, inode,
2742 dname.len, dname.name); 2750 dname.len, dname.name);
2743 if (inode == NULL) { 2751 if (inode == NULL) {
2744 dout("handle_lease no inode %llx\n", vino.ino); 2752 dout("handle_lease no inode %llx\n", vino.ino);
@@ -2828,7 +2836,6 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2828 return; 2836 return;
2829 lease = msg->front.iov_base; 2837 lease = msg->front.iov_base;
2830 lease->action = action; 2838 lease->action = action;
2831 lease->mask = cpu_to_le16(1);
2832 lease->ino = cpu_to_le64(ceph_vino(inode).ino); 2839 lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2833 lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap); 2840 lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2834 lease->seq = cpu_to_le32(seq); 2841 lease->seq = cpu_to_le32(seq);
@@ -2850,7 +2857,7 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2850 * Pass @inode always, @dentry is optional. 2857 * Pass @inode always, @dentry is optional.
2851 */ 2858 */
2852void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, 2859void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2853 struct dentry *dentry, int mask) 2860 struct dentry *dentry)
2854{ 2861{
2855 struct ceph_dentry_info *di; 2862 struct ceph_dentry_info *di;
2856 struct ceph_mds_session *session; 2863 struct ceph_mds_session *session;
@@ -2858,7 +2865,6 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2858 2865
2859 BUG_ON(inode == NULL); 2866 BUG_ON(inode == NULL);
2860 BUG_ON(dentry == NULL); 2867 BUG_ON(dentry == NULL);
2861 BUG_ON(mask == 0);
2862 2868
2863 /* is dentry lease valid? */ 2869 /* is dentry lease valid? */
2864 spin_lock(&dentry->d_lock); 2870 spin_lock(&dentry->d_lock);
@@ -2868,8 +2874,8 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2868 di->lease_gen != di->lease_session->s_cap_gen || 2874 di->lease_gen != di->lease_session->s_cap_gen ||
2869 !time_before(jiffies, dentry->d_time)) { 2875 !time_before(jiffies, dentry->d_time)) {
2870 dout("lease_release inode %p dentry %p -- " 2876 dout("lease_release inode %p dentry %p -- "
2871 "no lease on %d\n", 2877 "no lease\n",
2872 inode, dentry, mask); 2878 inode, dentry);
2873 spin_unlock(&dentry->d_lock); 2879 spin_unlock(&dentry->d_lock);
2874 return; 2880 return;
2875 } 2881 }
@@ -2880,8 +2886,8 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2880 __ceph_mdsc_drop_dentry_lease(dentry); 2886 __ceph_mdsc_drop_dentry_lease(dentry);
2881 spin_unlock(&dentry->d_lock); 2887 spin_unlock(&dentry->d_lock);
2882 2888
2883 dout("lease_release inode %p dentry %p mask %d to mds%d\n", 2889 dout("lease_release inode %p dentry %p to mds%d\n",
2884 inode, dentry, mask, session->s_mds); 2890 inode, dentry, session->s_mds);
2885 ceph_mdsc_lease_send_msg(session, inode, dentry, 2891 ceph_mdsc_lease_send_msg(session, inode, dentry,
2886 CEPH_MDS_LEASE_RELEASE, seq); 2892 CEPH_MDS_LEASE_RELEASE, seq);
2887 ceph_put_mds_session(session); 2893 ceph_put_mds_session(session);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 7d8a0d662d56..4bb239921dbd 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -171,6 +171,7 @@ struct ceph_mds_request {
171 struct inode *r_inode; /* arg1 */ 171 struct inode *r_inode; /* arg1 */
172 struct dentry *r_dentry; /* arg1 */ 172 struct dentry *r_dentry; /* arg1 */
173 struct dentry *r_old_dentry; /* arg2: rename from or link from */ 173 struct dentry *r_old_dentry; /* arg2: rename from or link from */
174 struct inode *r_old_dentry_dir; /* arg2: old dentry's parent dir */
174 char *r_path1, *r_path2; 175 char *r_path1, *r_path2;
175 struct ceph_vino r_ino1, r_ino2; 176 struct ceph_vino r_ino1, r_ino2;
176 177
@@ -333,7 +334,7 @@ extern void ceph_mdsc_sync(struct ceph_mds_client *mdsc);
333 334
334extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, 335extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc,
335 struct inode *inode, 336 struct inode *inode,
336 struct dentry *dn, int mask); 337 struct dentry *dn);
337 338
338extern void ceph_invalidate_dir_request(struct ceph_mds_request *req); 339extern void ceph_invalidate_dir_request(struct ceph_mds_request *req);
339 340
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 54b14de2e729..e26437191333 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -449,6 +449,15 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
449 spin_lock(&inode->i_lock); 449 spin_lock(&inode->i_lock);
450 used = __ceph_caps_used(ci); 450 used = __ceph_caps_used(ci);
451 dirty = __ceph_caps_dirty(ci); 451 dirty = __ceph_caps_dirty(ci);
452
453 /*
454 * If there is a write in progress, treat that as a dirty Fw,
455 * even though it hasn't completed yet; by the time we finish
456 * up this capsnap it will be.
457 */
458 if (used & CEPH_CAP_FILE_WR)
459 dirty |= CEPH_CAP_FILE_WR;
460
452 if (__ceph_have_pending_cap_snap(ci)) { 461 if (__ceph_have_pending_cap_snap(ci)) {
453 /* there is no point in queuing multiple "pending" cap_snaps, 462 /* there is no point in queuing multiple "pending" cap_snaps,
454 as no new writes are allowed to start when pending, so any 463 as no new writes are allowed to start when pending, so any
@@ -456,13 +465,19 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
456 cap_snap. lucky us. */ 465 cap_snap. lucky us. */
457 dout("queue_cap_snap %p already pending\n", inode); 466 dout("queue_cap_snap %p already pending\n", inode);
458 kfree(capsnap); 467 kfree(capsnap);
459 } else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR) || 468 } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
460 (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL| 469 CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
461 CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) {
462 struct ceph_snap_context *snapc = ci->i_head_snapc; 470 struct ceph_snap_context *snapc = ci->i_head_snapc;
463 471
464 dout("queue_cap_snap %p cap_snap %p queuing under %p\n", inode, 472 /*
465 capsnap, snapc); 473 * if we are a sync write, we may need to go to the snaprealm
474 * to get the current snapc.
475 */
476 if (!snapc)
477 snapc = ci->i_snap_realm->cached_context;
478
479 dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n",
480 inode, capsnap, snapc, ceph_cap_string(dirty));
466 ihold(inode); 481 ihold(inode);
467 482
468 atomic_set(&capsnap->nref, 1); 483 atomic_set(&capsnap->nref, 1);
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index f2f77fd3c14c..d47c5ec7fb1f 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -73,8 +73,7 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
73 */ 73 */
74 buf->f_bsize = 1 << CEPH_BLOCK_SHIFT; 74 buf->f_bsize = 1 << CEPH_BLOCK_SHIFT;
75 buf->f_blocks = le64_to_cpu(st.kb) >> (CEPH_BLOCK_SHIFT-10); 75 buf->f_blocks = le64_to_cpu(st.kb) >> (CEPH_BLOCK_SHIFT-10);
76 buf->f_bfree = (le64_to_cpu(st.kb) - le64_to_cpu(st.kb_used)) >> 76 buf->f_bfree = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10);
77 (CEPH_BLOCK_SHIFT-10);
78 buf->f_bavail = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10); 77 buf->f_bavail = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10);
79 78
80 buf->f_files = le64_to_cpu(st.num_objects); 79 buf->f_files = le64_to_cpu(st.num_objects);
@@ -780,6 +779,10 @@ static int ceph_register_bdi(struct super_block *sb,
780 fsc->backing_dev_info.ra_pages = 779 fsc->backing_dev_info.ra_pages =
781 (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1) 780 (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
782 >> PAGE_SHIFT; 781 >> PAGE_SHIFT;
782 else
783 fsc->backing_dev_info.ra_pages =
784 default_backing_dev_info.ra_pages;
785
783 err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%d", 786 err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%d",
784 atomic_long_inc_return(&bdi_seq)); 787 atomic_long_inc_return(&bdi_seq));
785 if (!err) 788 if (!err)
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 30446b144e3d..a23eed526f05 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -543,13 +543,16 @@ extern void ceph_reservation_status(struct ceph_fs_client *client,
543/* 543/*
544 * we keep buffered readdir results attached to file->private_data 544 * we keep buffered readdir results attached to file->private_data
545 */ 545 */
546#define CEPH_F_SYNC 1
547#define CEPH_F_ATEND 2
548
546struct ceph_file_info { 549struct ceph_file_info {
547 int fmode; /* initialized on open */ 550 short fmode; /* initialized on open */
551 short flags; /* CEPH_F_* */
548 552
549 /* readdir: position within the dir */ 553 /* readdir: position within the dir */
550 u32 frag; 554 u32 frag;
551 struct ceph_mds_request *last_readdir; 555 struct ceph_mds_request *last_readdir;
552 int at_end;
553 556
554 /* readdir: position within a frag */ 557 /* readdir: position within a frag */
555 unsigned offset; /* offset of last chunk, adjusted for . and .. */ 558 unsigned offset; /* offset of last chunk, adjusted for . and .. */
@@ -789,6 +792,8 @@ extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
789 ceph_snapdir_dentry_ops; 792 ceph_snapdir_dentry_ops;
790 793
791extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry); 794extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry);
795extern int ceph_handle_snapdir(struct ceph_mds_request *req,
796 struct dentry *dentry, int err);
792extern struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, 797extern struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
793 struct dentry *dentry, int err); 798 struct dentry *dentry, int err);
794 799
@@ -796,7 +801,8 @@ extern void ceph_dentry_lru_add(struct dentry *dn);
796extern void ceph_dentry_lru_touch(struct dentry *dn); 801extern void ceph_dentry_lru_touch(struct dentry *dn);
797extern void ceph_dentry_lru_del(struct dentry *dn); 802extern void ceph_dentry_lru_del(struct dentry *dn);
798extern void ceph_invalidate_dentry_lease(struct dentry *dentry); 803extern void ceph_invalidate_dentry_lease(struct dentry *dentry);
799extern unsigned ceph_dentry_hash(struct dentry *dn); 804extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn);
805extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry);
800 806
801/* 807/*
802 * our d_ops vary depending on whether the inode is live, 808 * our d_ops vary depending on whether the inode is live,
@@ -819,14 +825,6 @@ extern int ceph_encode_locks(struct inode *i, struct ceph_pagelist *p,
819 int p_locks, int f_locks); 825 int p_locks, int f_locks);
820extern int lock_to_ceph_filelock(struct file_lock *fl, struct ceph_filelock *c); 826extern int lock_to_ceph_filelock(struct file_lock *fl, struct ceph_filelock *c);
821 827
822static inline struct inode *get_dentry_parent_inode(struct dentry *dentry)
823{
824 if (dentry && dentry->d_parent)
825 return dentry->d_parent->d_inode;
826
827 return NULL;
828}
829
830/* debugfs.c */ 828/* debugfs.c */
831extern int ceph_fs_debugfs_init(struct ceph_fs_client *client); 829extern int ceph_fs_debugfs_init(struct ceph_fs_client *client);
832extern void ceph_fs_debugfs_cleanup(struct ceph_fs_client *client); 830extern void ceph_fs_debugfs_cleanup(struct ceph_fs_client *client);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index f42d730f1b66..96c6739a0280 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -629,7 +629,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
629 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 629 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
630 struct inode *inode = dentry->d_inode; 630 struct inode *inode = dentry->d_inode;
631 struct ceph_inode_info *ci = ceph_inode(inode); 631 struct ceph_inode_info *ci = ceph_inode(inode);
632 struct inode *parent_inode = dentry->d_parent->d_inode; 632 struct inode *parent_inode;
633 struct ceph_mds_request *req; 633 struct ceph_mds_request *req;
634 struct ceph_mds_client *mdsc = fsc->mdsc; 634 struct ceph_mds_client *mdsc = fsc->mdsc;
635 int err; 635 int err;
@@ -677,7 +677,9 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
677 req->r_data_len = size; 677 req->r_data_len = size;
678 678
679 dout("xattr.ver (before): %lld\n", ci->i_xattrs.version); 679 dout("xattr.ver (before): %lld\n", ci->i_xattrs.version);
680 parent_inode = ceph_get_dentry_parent_inode(dentry);
680 err = ceph_mdsc_do_request(mdsc, parent_inode, req); 681 err = ceph_mdsc_do_request(mdsc, parent_inode, req);
682 iput(parent_inode);
681 ceph_mdsc_put_request(req); 683 ceph_mdsc_put_request(req);
682 dout("xattr.ver (after): %lld\n", ci->i_xattrs.version); 684 dout("xattr.ver (after): %lld\n", ci->i_xattrs.version);
683 685
@@ -788,7 +790,7 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name)
788 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 790 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
789 struct ceph_mds_client *mdsc = fsc->mdsc; 791 struct ceph_mds_client *mdsc = fsc->mdsc;
790 struct inode *inode = dentry->d_inode; 792 struct inode *inode = dentry->d_inode;
791 struct inode *parent_inode = dentry->d_parent->d_inode; 793 struct inode *parent_inode;
792 struct ceph_mds_request *req; 794 struct ceph_mds_request *req;
793 int err; 795 int err;
794 796
@@ -802,7 +804,9 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name)
802 req->r_num_caps = 1; 804 req->r_num_caps = 1;
803 req->r_path2 = kstrdup(name, GFP_NOFS); 805 req->r_path2 = kstrdup(name, GFP_NOFS);
804 806
807 parent_inode = ceph_get_dentry_parent_inode(dentry);
805 err = ceph_mdsc_do_request(mdsc, parent_inode, req); 808 err = ceph_mdsc_do_request(mdsc, parent_inode, req);
809 iput(parent_inode);
806 ceph_mdsc_put_request(req); 810 ceph_mdsc_put_request(req);
807 return err; 811 return err;
808} 812}
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 31d91a64838b..d7adf151d335 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -94,6 +94,7 @@ struct ceph_msg {
94 bool more_to_follow; 94 bool more_to_follow;
95 bool needs_out_seq; 95 bool needs_out_seq;
96 int front_max; 96 int front_max;
97 unsigned long ack_stamp; /* tx: when we were acked */
97 98
98 struct ceph_msgpool *pool; 99 struct ceph_msgpool *pool;
99}; 100};
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 78b55f49de7c..c340e2e0765b 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -486,13 +486,10 @@ static void prepare_write_message(struct ceph_connection *con)
486 m = list_first_entry(&con->out_queue, 486 m = list_first_entry(&con->out_queue,
487 struct ceph_msg, list_head); 487 struct ceph_msg, list_head);
488 con->out_msg = m; 488 con->out_msg = m;
489 if (test_bit(LOSSYTX, &con->state)) { 489
490 list_del_init(&m->list_head); 490 /* put message on sent list */
491 } else { 491 ceph_msg_get(m);
492 /* put message on sent list */ 492 list_move_tail(&m->list_head, &con->out_sent);
493 ceph_msg_get(m);
494 list_move_tail(&m->list_head, &con->out_sent);
495 }
496 493
497 /* 494 /*
498 * only assign outgoing seq # if we haven't sent this message 495 * only assign outgoing seq # if we haven't sent this message
@@ -1399,6 +1396,7 @@ static void process_ack(struct ceph_connection *con)
1399 break; 1396 break;
1400 dout("got ack for seq %llu type %d at %p\n", seq, 1397 dout("got ack for seq %llu type %d at %p\n", seq,
1401 le16_to_cpu(m->hdr.type), m); 1398 le16_to_cpu(m->hdr.type), m);
1399 m->ack_stamp = jiffies;
1402 ceph_msg_remove(m); 1400 ceph_msg_remove(m);
1403 } 1401 }
1404 prepare_read_tag(con); 1402 prepare_read_tag(con);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 7330c2757c0c..ce310eee708d 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1085,9 +1085,15 @@ static void handle_timeout(struct work_struct *work)
1085 req = list_entry(osdc->req_lru.next, struct ceph_osd_request, 1085 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
1086 r_req_lru_item); 1086 r_req_lru_item);
1087 1087
1088 /* hasn't been long enough since we sent it? */
1088 if (time_before(jiffies, req->r_stamp + timeout)) 1089 if (time_before(jiffies, req->r_stamp + timeout))
1089 break; 1090 break;
1090 1091
1092 /* hasn't been long enough since it was acked? */
1093 if (req->r_request->ack_stamp == 0 ||
1094 time_before(jiffies, req->r_request->ack_stamp + timeout))
1095 break;
1096
1091 BUG_ON(req == last_req && req->r_stamp == last_stamp); 1097 BUG_ON(req == last_req && req->r_stamp == last_stamp);
1092 last_req = req; 1098 last_req = req;
1093 last_stamp = req->r_stamp; 1099 last_stamp = req->r_stamp;