aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
authorYan, Zheng <zyan@redhat.com>2015-06-09 03:48:57 -0400
committerIlya Dryomov <idryomov@gmail.com>2015-06-25 04:49:30 -0400
commit553adfd941f8ca622965ef809553d918ea039929 (patch)
tree26ebb1577287a48cb9bcea96e8197ef1ac3b72bc /fs/ceph
parent6c13a6bb55df6666275b992ba76620324429d7cf (diff)
ceph: track pending caps flushing accurately
Previously we do not trace accurate TID for flushing caps. when MDS failovers, we have no choice but to re-send all flushing caps with a new TID. This can cause problem because MDS can has already flushed some caps and has issued the same caps to other client. The re-sent cap flush has a new TID, which makes MDS unable to detect if it has already processed the cap flush. This patch adds code to track pending caps flushing accurately. When re-sending cap flush is needed, we use its original flush TID. Signed-off-by: Yan, Zheng <zyan@redhat.com>
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/caps.c245
-rw-r--r--fs/ceph/inode.c3
-rw-r--r--fs/ceph/mds_client.c20
-rw-r--r--fs/ceph/mds_client.h1
-rw-r--r--fs/ceph/super.h11
5 files changed, 192 insertions, 88 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index dc988337f841..9a25f8d66fbc 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1097,7 +1097,8 @@ void ceph_queue_caps_release(struct inode *inode)
1097 * caller should hold snap_rwsem (read), s_mutex. 1097 * caller should hold snap_rwsem (read), s_mutex.
1098 */ 1098 */
1099static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, 1099static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1100 int op, int used, int want, int retain, int flushing) 1100 int op, int used, int want, int retain, int flushing,
1101 u64 flush_tid)
1101 __releases(cap->ci->i_ceph_lock) 1102 __releases(cap->ci->i_ceph_lock)
1102{ 1103{
1103 struct ceph_inode_info *ci = cap->ci; 1104 struct ceph_inode_info *ci = cap->ci;
@@ -1115,8 +1116,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1115 u64 xattr_version = 0; 1116 u64 xattr_version = 0;
1116 struct ceph_buffer *xattr_blob = NULL; 1117 struct ceph_buffer *xattr_blob = NULL;
1117 int delayed = 0; 1118 int delayed = 0;
1118 u64 flush_tid = 0;
1119 int i;
1120 int ret; 1119 int ret;
1121 bool inline_data; 1120 bool inline_data;
1122 1121
@@ -1160,24 +1159,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1160 cap->implemented &= cap->issued | used; 1159 cap->implemented &= cap->issued | used;
1161 cap->mds_wanted = want; 1160 cap->mds_wanted = want;
1162 1161
1163 if (flushing) { 1162 follows = flushing ? ci->i_head_snapc->seq : 0;
1164 /*
1165 * assign a tid for flush operations so we can avoid
1166 * flush1 -> dirty1 -> flush2 -> flushack1 -> mark
1167 * clean type races. track latest tid for every bit
1168 * so we can handle flush AxFw, flush Fw, and have the
1169 * first ack clean Ax.
1170 */
1171 flush_tid = ++ci->i_cap_flush_last_tid;
1172 dout(" cap_flush_tid %d\n", (int)flush_tid);
1173 for (i = 0; i < CEPH_CAP_BITS; i++)
1174 if (flushing & (1 << i))
1175 ci->i_cap_flush_tid[i] = flush_tid;
1176
1177 follows = ci->i_head_snapc->seq;
1178 } else {
1179 follows = 0;
1180 }
1181 1163
1182 keep = cap->implemented; 1164 keep = cap->implemented;
1183 seq = cap->seq; 1165 seq = cap->seq;
@@ -1311,7 +1293,10 @@ retry:
1311 goto retry; 1293 goto retry;
1312 } 1294 }
1313 1295
1314 capsnap->flush_tid = ++ci->i_cap_flush_last_tid; 1296 spin_lock(&mdsc->cap_dirty_lock);
1297 capsnap->flush_tid = ++mdsc->last_cap_flush_tid;
1298 spin_unlock(&mdsc->cap_dirty_lock);
1299
1315 atomic_inc(&capsnap->nref); 1300 atomic_inc(&capsnap->nref);
1316 if (list_empty(&capsnap->flushing_item)) 1301 if (list_empty(&capsnap->flushing_item))
1317 list_add_tail(&capsnap->flushing_item, 1302 list_add_tail(&capsnap->flushing_item,
@@ -1407,6 +1392,29 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
1407 return dirty; 1392 return dirty;
1408} 1393}
1409 1394
1395static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci,
1396 struct ceph_cap_flush *cf)
1397{
1398 struct rb_node **p = &ci->i_cap_flush_tree.rb_node;
1399 struct rb_node *parent = NULL;
1400 struct ceph_cap_flush *other = NULL;
1401
1402 while (*p) {
1403 parent = *p;
1404 other = rb_entry(parent, struct ceph_cap_flush, i_node);
1405
1406 if (cf->tid < other->tid)
1407 p = &(*p)->rb_left;
1408 else if (cf->tid > other->tid)
1409 p = &(*p)->rb_right;
1410 else
1411 BUG();
1412 }
1413
1414 rb_link_node(&cf->i_node, parent, p);
1415 rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree);
1416}
1417
1410/* 1418/*
1411 * Add dirty inode to the flushing list. Assigned a seq number so we 1419 * Add dirty inode to the flushing list. Assigned a seq number so we
1412 * can wait for caps to flush without starving. 1420 * can wait for caps to flush without starving.
@@ -1414,10 +1422,12 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
1414 * Called under i_ceph_lock. 1422 * Called under i_ceph_lock.
1415 */ 1423 */
1416static int __mark_caps_flushing(struct inode *inode, 1424static int __mark_caps_flushing(struct inode *inode,
1417 struct ceph_mds_session *session) 1425 struct ceph_mds_session *session,
1426 u64 *flush_tid)
1418{ 1427{
1419 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 1428 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
1420 struct ceph_inode_info *ci = ceph_inode(inode); 1429 struct ceph_inode_info *ci = ceph_inode(inode);
1430 struct ceph_cap_flush *cf;
1421 int flushing; 1431 int flushing;
1422 1432
1423 BUG_ON(ci->i_dirty_caps == 0); 1433 BUG_ON(ci->i_dirty_caps == 0);
@@ -1432,9 +1442,14 @@ static int __mark_caps_flushing(struct inode *inode,
1432 ci->i_dirty_caps = 0; 1442 ci->i_dirty_caps = 0;
1433 dout(" inode %p now !dirty\n", inode); 1443 dout(" inode %p now !dirty\n", inode);
1434 1444
1445 cf = kmalloc(sizeof(*cf), GFP_ATOMIC);
1446 cf->caps = flushing;
1447
1435 spin_lock(&mdsc->cap_dirty_lock); 1448 spin_lock(&mdsc->cap_dirty_lock);
1436 list_del_init(&ci->i_dirty_item); 1449 list_del_init(&ci->i_dirty_item);
1437 1450
1451 cf->tid = ++mdsc->last_cap_flush_tid;
1452
1438 if (list_empty(&ci->i_flushing_item)) { 1453 if (list_empty(&ci->i_flushing_item)) {
1439 ci->i_cap_flush_seq = ++mdsc->cap_flush_seq; 1454 ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
1440 list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); 1455 list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
@@ -1448,6 +1463,9 @@ static int __mark_caps_flushing(struct inode *inode,
1448 } 1463 }
1449 spin_unlock(&mdsc->cap_dirty_lock); 1464 spin_unlock(&mdsc->cap_dirty_lock);
1450 1465
1466 __add_cap_flushing_to_inode(ci, cf);
1467
1468 *flush_tid = cf->tid;
1451 return flushing; 1469 return flushing;
1452} 1470}
1453 1471
@@ -1493,6 +1511,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
1493 struct ceph_mds_client *mdsc = fsc->mdsc; 1511 struct ceph_mds_client *mdsc = fsc->mdsc;
1494 struct inode *inode = &ci->vfs_inode; 1512 struct inode *inode = &ci->vfs_inode;
1495 struct ceph_cap *cap; 1513 struct ceph_cap *cap;
1514 u64 flush_tid;
1496 int file_wanted, used, cap_used; 1515 int file_wanted, used, cap_used;
1497 int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ 1516 int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */
1498 int issued, implemented, want, retain, revoking, flushing = 0; 1517 int issued, implemented, want, retain, revoking, flushing = 0;
@@ -1711,17 +1730,20 @@ ack:
1711 took_snap_rwsem = 1; 1730 took_snap_rwsem = 1;
1712 } 1731 }
1713 1732
1714 if (cap == ci->i_auth_cap && ci->i_dirty_caps) 1733 if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
1715 flushing = __mark_caps_flushing(inode, session); 1734 flushing = __mark_caps_flushing(inode, session,
1716 else 1735 &flush_tid);
1736 } else {
1717 flushing = 0; 1737 flushing = 0;
1738 flush_tid = 0;
1739 }
1718 1740
1719 mds = cap->mds; /* remember mds, so we don't repeat */ 1741 mds = cap->mds; /* remember mds, so we don't repeat */
1720 sent++; 1742 sent++;
1721 1743
1722 /* __send_cap drops i_ceph_lock */ 1744 /* __send_cap drops i_ceph_lock */
1723 delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used, 1745 delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
1724 want, retain, flushing); 1746 want, retain, flushing, flush_tid);
1725 goto retry; /* retake i_ceph_lock and restart our cap scan. */ 1747 goto retry; /* retake i_ceph_lock and restart our cap scan. */
1726 } 1748 }
1727 1749
@@ -1750,12 +1772,13 @@ ack:
1750/* 1772/*
1751 * Try to flush dirty caps back to the auth mds. 1773 * Try to flush dirty caps back to the auth mds.
1752 */ 1774 */
1753static int try_flush_caps(struct inode *inode, u16 flush_tid[]) 1775static int try_flush_caps(struct inode *inode, u64 *ptid)
1754{ 1776{
1755 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 1777 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
1756 struct ceph_inode_info *ci = ceph_inode(inode); 1778 struct ceph_inode_info *ci = ceph_inode(inode);
1757 struct ceph_mds_session *session = NULL; 1779 struct ceph_mds_session *session = NULL;
1758 int flushing = 0; 1780 int flushing = 0;
1781 u64 flush_tid = 0;
1759 1782
1760retry: 1783retry:
1761 spin_lock(&ci->i_ceph_lock); 1784 spin_lock(&ci->i_ceph_lock);
@@ -1780,46 +1803,52 @@ retry:
1780 if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) 1803 if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
1781 goto out; 1804 goto out;
1782 1805
1783 flushing = __mark_caps_flushing(inode, session); 1806 flushing = __mark_caps_flushing(inode, session, &flush_tid);
1784 1807
1785 /* __send_cap drops i_ceph_lock */ 1808 /* __send_cap drops i_ceph_lock */
1786 delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want, 1809 delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
1787 cap->issued | cap->implemented, flushing); 1810 (cap->issued | cap->implemented),
1811 flushing, flush_tid);
1788 1812
1789 spin_lock(&ci->i_ceph_lock); 1813 if (delayed) {
1790 if (delayed) 1814 spin_lock(&ci->i_ceph_lock);
1791 __cap_delay_requeue(mdsc, ci); 1815 __cap_delay_requeue(mdsc, ci);
1816 spin_unlock(&ci->i_ceph_lock);
1817 }
1818 } else {
1819 struct rb_node *n = rb_last(&ci->i_cap_flush_tree);
1820 if (n) {
1821 struct ceph_cap_flush *cf =
1822 rb_entry(n, struct ceph_cap_flush, i_node);
1823 flush_tid = cf->tid;
1824 }
1825 flushing = ci->i_flushing_caps;
1826 spin_unlock(&ci->i_ceph_lock);
1792 } 1827 }
1793
1794 flushing = ci->i_flushing_caps;
1795 if (flushing)
1796 memcpy(flush_tid, ci->i_cap_flush_tid,
1797 sizeof(ci->i_cap_flush_tid));
1798out: 1828out:
1799 spin_unlock(&ci->i_ceph_lock);
1800 if (session) 1829 if (session)
1801 mutex_unlock(&session->s_mutex); 1830 mutex_unlock(&session->s_mutex);
1831
1832 *ptid = flush_tid;
1802 return flushing; 1833 return flushing;
1803} 1834}
1804 1835
1805/* 1836/*
1806 * Return true if we've flushed caps through the given flush_tid. 1837 * Return true if we've flushed caps through the given flush_tid.
1807 */ 1838 */
1808static int caps_are_flushed(struct inode *inode, u16 flush_tid[]) 1839static int caps_are_flushed(struct inode *inode, u64 flush_tid)
1809{ 1840{
1810 struct ceph_inode_info *ci = ceph_inode(inode); 1841 struct ceph_inode_info *ci = ceph_inode(inode);
1811 int i, ret = 1; 1842 struct ceph_cap_flush *cf;
1843 struct rb_node *n;
1844 int ret = 1;
1812 1845
1813 spin_lock(&ci->i_ceph_lock); 1846 spin_lock(&ci->i_ceph_lock);
1814 for (i = 0; i < CEPH_CAP_BITS; i++) { 1847 n = rb_first(&ci->i_cap_flush_tree);
1815 if (!(ci->i_flushing_caps & (1 << i))) 1848 if (n) {
1816 continue; 1849 cf = rb_entry(n, struct ceph_cap_flush, i_node);
1817 // tid only has 16 bits. we need to handle wrapping 1850 if (cf->tid <= flush_tid)
1818 if ((s16)(ci->i_cap_flush_tid[i] - flush_tid[i]) <= 0) {
1819 /* still flushing this bit */
1820 ret = 0; 1851 ret = 0;
1821 break;
1822 }
1823 } 1852 }
1824 spin_unlock(&ci->i_ceph_lock); 1853 spin_unlock(&ci->i_ceph_lock);
1825 return ret; 1854 return ret;
@@ -1922,7 +1951,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
1922{ 1951{
1923 struct inode *inode = file->f_mapping->host; 1952 struct inode *inode = file->f_mapping->host;
1924 struct ceph_inode_info *ci = ceph_inode(inode); 1953 struct ceph_inode_info *ci = ceph_inode(inode);
1925 u16 flush_tid[CEPH_CAP_BITS]; 1954 u64 flush_tid;
1926 int ret; 1955 int ret;
1927 int dirty; 1956 int dirty;
1928 1957
@@ -1938,7 +1967,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
1938 1967
1939 mutex_lock(&inode->i_mutex); 1968 mutex_lock(&inode->i_mutex);
1940 1969
1941 dirty = try_flush_caps(inode, flush_tid); 1970 dirty = try_flush_caps(inode, &flush_tid);
1942 dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); 1971 dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
1943 1972
1944 ret = unsafe_dirop_wait(inode); 1973 ret = unsafe_dirop_wait(inode);
@@ -1967,14 +1996,14 @@ out:
1967int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) 1996int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
1968{ 1997{
1969 struct ceph_inode_info *ci = ceph_inode(inode); 1998 struct ceph_inode_info *ci = ceph_inode(inode);
1970 u16 flush_tid[CEPH_CAP_BITS]; 1999 u64 flush_tid;
1971 int err = 0; 2000 int err = 0;
1972 int dirty; 2001 int dirty;
1973 int wait = wbc->sync_mode == WB_SYNC_ALL; 2002 int wait = wbc->sync_mode == WB_SYNC_ALL;
1974 2003
1975 dout("write_inode %p wait=%d\n", inode, wait); 2004 dout("write_inode %p wait=%d\n", inode, wait);
1976 if (wait) { 2005 if (wait) {
1977 dirty = try_flush_caps(inode, flush_tid); 2006 dirty = try_flush_caps(inode, &flush_tid);
1978 if (dirty) 2007 if (dirty)
1979 err = wait_event_interruptible(ci->i_cap_wq, 2008 err = wait_event_interruptible(ci->i_cap_wq,
1980 caps_are_flushed(inode, flush_tid)); 2009 caps_are_flushed(inode, flush_tid));
@@ -2022,6 +2051,51 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
2022 } 2051 }
2023} 2052}
2024 2053
2054static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
2055 struct ceph_mds_session *session,
2056 struct ceph_inode_info *ci)
2057{
2058 struct inode *inode = &ci->vfs_inode;
2059 struct ceph_cap *cap;
2060 struct ceph_cap_flush *cf;
2061 struct rb_node *n;
2062 int delayed = 0;
2063 u64 first_tid = 0;
2064
2065 while (true) {
2066 spin_lock(&ci->i_ceph_lock);
2067 cap = ci->i_auth_cap;
2068 if (!(cap && cap->session == session)) {
2069 pr_err("%p auth cap %p not mds%d ???\n", inode,
2070 cap, session->s_mds);
2071 spin_unlock(&ci->i_ceph_lock);
2072 break;
2073 }
2074
2075 for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) {
2076 cf = rb_entry(n, struct ceph_cap_flush, i_node);
2077 if (cf->tid >= first_tid)
2078 break;
2079 }
2080 if (!n) {
2081 spin_unlock(&ci->i_ceph_lock);
2082 break;
2083 }
2084
2085 cf = rb_entry(n, struct ceph_cap_flush, i_node);
2086 first_tid = cf->tid + 1;
2087
2088 dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode,
2089 cap, cf->tid, ceph_cap_string(cf->caps));
2090 delayed |= __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
2091 __ceph_caps_used(ci),
2092 __ceph_caps_wanted(ci),
2093 cap->issued | cap->implemented,
2094 cf->caps, cf->tid);
2095 }
2096 return delayed;
2097}
2098
2025void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, 2099void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
2026 struct ceph_mds_session *session) 2100 struct ceph_mds_session *session)
2027{ 2101{
@@ -2031,28 +2105,10 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
2031 2105
2032 dout("kick_flushing_caps mds%d\n", session->s_mds); 2106 dout("kick_flushing_caps mds%d\n", session->s_mds);
2033 list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { 2107 list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
2034 struct inode *inode = &ci->vfs_inode; 2108 int delayed = __kick_flushing_caps(mdsc, session, ci);
2035 struct ceph_cap *cap; 2109 if (delayed) {
2036 int delayed = 0; 2110 spin_lock(&ci->i_ceph_lock);
2037 2111 __cap_delay_requeue(mdsc, ci);
2038 spin_lock(&ci->i_ceph_lock);
2039 cap = ci->i_auth_cap;
2040 if (cap && cap->session == session) {
2041 dout("kick_flushing_caps %p cap %p %s\n", inode,
2042 cap, ceph_cap_string(ci->i_flushing_caps));
2043 delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
2044 __ceph_caps_used(ci),
2045 __ceph_caps_wanted(ci),
2046 cap->issued | cap->implemented,
2047 ci->i_flushing_caps);
2048 if (delayed) {
2049 spin_lock(&ci->i_ceph_lock);
2050 __cap_delay_requeue(mdsc, ci);
2051 spin_unlock(&ci->i_ceph_lock);
2052 }
2053 } else {
2054 pr_err("%p auth cap %p not mds%d ???\n", inode,
2055 cap, session->s_mds);
2056 spin_unlock(&ci->i_ceph_lock); 2112 spin_unlock(&ci->i_ceph_lock);
2057 } 2113 }
2058 } 2114 }
@@ -2064,7 +2120,6 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
2064{ 2120{
2065 struct ceph_inode_info *ci = ceph_inode(inode); 2121 struct ceph_inode_info *ci = ceph_inode(inode);
2066 struct ceph_cap *cap; 2122 struct ceph_cap *cap;
2067 int delayed = 0;
2068 2123
2069 spin_lock(&ci->i_ceph_lock); 2124 spin_lock(&ci->i_ceph_lock);
2070 cap = ci->i_auth_cap; 2125 cap = ci->i_auth_cap;
@@ -2074,16 +2129,16 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
2074 __ceph_flush_snaps(ci, &session, 1); 2129 __ceph_flush_snaps(ci, &session, 1);
2075 2130
2076 if (ci->i_flushing_caps) { 2131 if (ci->i_flushing_caps) {
2132 int delayed;
2133
2077 spin_lock(&mdsc->cap_dirty_lock); 2134 spin_lock(&mdsc->cap_dirty_lock);
2078 list_move_tail(&ci->i_flushing_item, 2135 list_move_tail(&ci->i_flushing_item,
2079 &cap->session->s_cap_flushing); 2136 &cap->session->s_cap_flushing);
2080 spin_unlock(&mdsc->cap_dirty_lock); 2137 spin_unlock(&mdsc->cap_dirty_lock);
2081 2138
2082 delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, 2139 spin_unlock(&ci->i_ceph_lock);
2083 __ceph_caps_used(ci), 2140
2084 __ceph_caps_wanted(ci), 2141 delayed = __kick_flushing_caps(mdsc, session, ci);
2085 cap->issued | cap->implemented,
2086 ci->i_flushing_caps);
2087 if (delayed) { 2142 if (delayed) {
2088 spin_lock(&ci->i_ceph_lock); 2143 spin_lock(&ci->i_ceph_lock);
2089 __cap_delay_requeue(mdsc, ci); 2144 __cap_delay_requeue(mdsc, ci);
@@ -2836,16 +2891,29 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
2836{ 2891{
2837 struct ceph_inode_info *ci = ceph_inode(inode); 2892 struct ceph_inode_info *ci = ceph_inode(inode);
2838 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 2893 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
2894 struct ceph_cap_flush *cf;
2895 struct rb_node *n;
2896 LIST_HEAD(to_remove);
2839 unsigned seq = le32_to_cpu(m->seq); 2897 unsigned seq = le32_to_cpu(m->seq);
2840 int dirty = le32_to_cpu(m->dirty); 2898 int dirty = le32_to_cpu(m->dirty);
2841 int cleaned = 0; 2899 int cleaned = 0;
2842 int drop = 0; 2900 int drop = 0;
2843 int i;
2844 2901
2845 for (i = 0; i < CEPH_CAP_BITS; i++) 2902 n = rb_first(&ci->i_cap_flush_tree);
2846 if ((dirty & (1 << i)) && 2903 while (n) {
2847 (u16)flush_tid == ci->i_cap_flush_tid[i]) 2904 cf = rb_entry(n, struct ceph_cap_flush, i_node);
2848 cleaned |= 1 << i; 2905 n = rb_next(&cf->i_node);
2906 if (cf->tid == flush_tid)
2907 cleaned = cf->caps;
2908 if (cf->tid <= flush_tid) {
2909 rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
2910 list_add_tail(&cf->list, &to_remove);
2911 } else {
2912 cleaned &= ~cf->caps;
2913 if (!cleaned)
2914 break;
2915 }
2916 }
2849 2917
2850 dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s," 2918 dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s,"
2851 " flushing %s -> %s\n", 2919 " flushing %s -> %s\n",
@@ -2890,6 +2958,13 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
2890 2958
2891out: 2959out:
2892 spin_unlock(&ci->i_ceph_lock); 2960 spin_unlock(&ci->i_ceph_lock);
2961
2962 while (!list_empty(&to_remove)) {
2963 cf = list_first_entry(&to_remove,
2964 struct ceph_cap_flush, list);
2965 list_del(&cf->list);
2966 kfree(cf);
2967 }
2893 if (drop) 2968 if (drop)
2894 iput(inode); 2969 iput(inode);
2895} 2970}
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 1c991df276c9..6d3f19db8c8a 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -417,8 +417,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
417 INIT_LIST_HEAD(&ci->i_dirty_item); 417 INIT_LIST_HEAD(&ci->i_dirty_item);
418 INIT_LIST_HEAD(&ci->i_flushing_item); 418 INIT_LIST_HEAD(&ci->i_flushing_item);
419 ci->i_cap_flush_seq = 0; 419 ci->i_cap_flush_seq = 0;
420 ci->i_cap_flush_last_tid = 0; 420 ci->i_cap_flush_tree = RB_ROOT;
421 memset(&ci->i_cap_flush_tid, 0, sizeof(ci->i_cap_flush_tid));
422 init_waitqueue_head(&ci->i_cap_wq); 421 init_waitqueue_head(&ci->i_cap_wq);
423 ci->i_hold_caps_min = 0; 422 ci->i_hold_caps_min = 0;
424 ci->i_hold_caps_max = 0; 423 ci->i_hold_caps_max = 0;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 8080d486a991..839901f51512 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1142,6 +1142,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1142 void *arg) 1142 void *arg)
1143{ 1143{
1144 struct ceph_inode_info *ci = ceph_inode(inode); 1144 struct ceph_inode_info *ci = ceph_inode(inode);
1145 LIST_HEAD(to_remove);
1145 int drop = 0; 1146 int drop = 0;
1146 1147
1147 dout("removing cap %p, ci is %p, inode is %p\n", 1148 dout("removing cap %p, ci is %p, inode is %p\n",
@@ -1149,9 +1150,19 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1149 spin_lock(&ci->i_ceph_lock); 1150 spin_lock(&ci->i_ceph_lock);
1150 __ceph_remove_cap(cap, false); 1151 __ceph_remove_cap(cap, false);
1151 if (!ci->i_auth_cap) { 1152 if (!ci->i_auth_cap) {
1153 struct ceph_cap_flush *cf;
1152 struct ceph_mds_client *mdsc = 1154 struct ceph_mds_client *mdsc =
1153 ceph_sb_to_client(inode->i_sb)->mdsc; 1155 ceph_sb_to_client(inode->i_sb)->mdsc;
1154 1156
1157 while (true) {
1158 struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
1159 if (!n)
1160 break;
1161 cf = rb_entry(n, struct ceph_cap_flush, i_node);
1162 rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
1163 list_add(&cf->list, &to_remove);
1164 }
1165
1155 spin_lock(&mdsc->cap_dirty_lock); 1166 spin_lock(&mdsc->cap_dirty_lock);
1156 if (!list_empty(&ci->i_dirty_item)) { 1167 if (!list_empty(&ci->i_dirty_item)) {
1157 pr_warn_ratelimited( 1168 pr_warn_ratelimited(
@@ -1173,8 +1184,16 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1173 drop = 1; 1184 drop = 1;
1174 } 1185 }
1175 spin_unlock(&mdsc->cap_dirty_lock); 1186 spin_unlock(&mdsc->cap_dirty_lock);
1187
1176 } 1188 }
1177 spin_unlock(&ci->i_ceph_lock); 1189 spin_unlock(&ci->i_ceph_lock);
1190 while (!list_empty(&to_remove)) {
1191 struct ceph_cap_flush *cf;
1192 cf = list_first_entry(&to_remove,
1193 struct ceph_cap_flush, list);
1194 list_del(&cf->list);
1195 kfree(cf);
1196 }
1178 while (drop--) 1197 while (drop--)
1179 iput(inode); 1198 iput(inode);
1180 return 0; 1199 return 0;
@@ -3408,6 +3427,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
3408 INIT_LIST_HEAD(&mdsc->snap_flush_list); 3427 INIT_LIST_HEAD(&mdsc->snap_flush_list);
3409 spin_lock_init(&mdsc->snap_flush_lock); 3428 spin_lock_init(&mdsc->snap_flush_lock);
3410 mdsc->cap_flush_seq = 0; 3429 mdsc->cap_flush_seq = 0;
3430 mdsc->last_cap_flush_tid = 1;
3411 INIT_LIST_HEAD(&mdsc->cap_dirty); 3431 INIT_LIST_HEAD(&mdsc->cap_dirty);
3412 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 3432 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3413 mdsc->num_cap_flushing = 0; 3433 mdsc->num_cap_flushing = 0;
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 509d6822e9b1..19f6084203f0 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -307,6 +307,7 @@ struct ceph_mds_client {
307 spinlock_t snap_flush_lock; 307 spinlock_t snap_flush_lock;
308 308
309 u64 cap_flush_seq; 309 u64 cap_flush_seq;
310 u64 last_cap_flush_tid;
310 struct list_head cap_dirty; /* inodes with dirty caps */ 311 struct list_head cap_dirty; /* inodes with dirty caps */
311 struct list_head cap_dirty_migrating; /* ...that are migration... */ 312 struct list_head cap_dirty_migrating; /* ...that are migration... */
312 int num_cap_flushing; /* # caps we are flushing */ 313 int num_cap_flushing; /* # caps we are flushing */
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index c4961353d058..cc597f52e046 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -186,6 +186,15 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
186 } 186 }
187} 187}
188 188
189struct ceph_cap_flush {
190 u64 tid;
191 int caps;
192 union {
193 struct rb_node i_node;
194 struct list_head list;
195 };
196};
197
189/* 198/*
190 * The frag tree describes how a directory is fragmented, potentially across 199 * The frag tree describes how a directory is fragmented, potentially across
191 * multiple metadata servers. It is also used to indicate points where 200 * multiple metadata servers. It is also used to indicate points where
@@ -299,7 +308,7 @@ struct ceph_inode_info {
299 /* we need to track cap writeback on a per-cap-bit basis, to allow 308 /* we need to track cap writeback on a per-cap-bit basis, to allow
300 * overlapping, pipelined cap flushes to the mds. we can probably 309 * overlapping, pipelined cap flushes to the mds. we can probably
301 * reduce the tid to 8 bits if we're concerned about inode size. */ 310 * reduce the tid to 8 bits if we're concerned about inode size. */
302 u16 i_cap_flush_last_tid, i_cap_flush_tid[CEPH_CAP_BITS]; 311 struct rb_root i_cap_flush_tree;
303 wait_queue_head_t i_cap_wq; /* threads waiting on a capability */ 312 wait_queue_head_t i_cap_wq; /* threads waiting on a capability */
304 unsigned long i_hold_caps_min; /* jiffies */ 313 unsigned long i_hold_caps_min; /* jiffies */
305 unsigned long i_hold_caps_max; /* jiffies */ 314 unsigned long i_hold_caps_max; /* jiffies */