aboutsummaryrefslogtreecommitdiffstats
path: root/fs/xfs/xfs_buf.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/xfs/xfs_buf.c')
-rw-r--r--fs/xfs/xfs_buf.c236
1 files changed, 166 insertions, 70 deletions
diff --git a/fs/xfs/xfs_buf.c b/fs/xfs/xfs_buf.c
index a87a0d5477bd..47a318ce82e0 100644
--- a/fs/xfs/xfs_buf.c
+++ b/fs/xfs/xfs_buf.c
@@ -80,6 +80,47 @@ xfs_buf_vmap_len(
80} 80}
81 81
82/* 82/*
83 * Bump the I/O in flight count on the buftarg if we haven't yet done so for
84 * this buffer. The count is incremented once per buffer (per hold cycle)
85 * because the corresponding decrement is deferred to buffer release. Buffers
86 * can undergo I/O multiple times in a hold-release cycle and per buffer I/O
87 * tracking adds unnecessary overhead. This is used for sychronization purposes
88 * with unmount (see xfs_wait_buftarg()), so all we really need is a count of
89 * in-flight buffers.
90 *
91 * Buffers that are never released (e.g., superblock, iclog buffers) must set
92 * the XBF_NO_IOACCT flag before I/O submission. Otherwise, the buftarg count
93 * never reaches zero and unmount hangs indefinitely.
94 */
95static inline void
96xfs_buf_ioacct_inc(
97 struct xfs_buf *bp)
98{
99 if (bp->b_flags & (XBF_NO_IOACCT|_XBF_IN_FLIGHT))
100 return;
101
102 ASSERT(bp->b_flags & XBF_ASYNC);
103 bp->b_flags |= _XBF_IN_FLIGHT;
104 percpu_counter_inc(&bp->b_target->bt_io_count);
105}
106
107/*
108 * Clear the in-flight state on a buffer about to be released to the LRU or
109 * freed and unaccount from the buftarg.
110 */
111static inline void
112xfs_buf_ioacct_dec(
113 struct xfs_buf *bp)
114{
115 if (!(bp->b_flags & _XBF_IN_FLIGHT))
116 return;
117
118 ASSERT(bp->b_flags & XBF_ASYNC);
119 bp->b_flags &= ~_XBF_IN_FLIGHT;
120 percpu_counter_dec(&bp->b_target->bt_io_count);
121}
122
123/*
83 * When we mark a buffer stale, we remove the buffer from the LRU and clear the 124 * When we mark a buffer stale, we remove the buffer from the LRU and clear the
84 * b_lru_ref count so that the buffer is freed immediately when the buffer 125 * b_lru_ref count so that the buffer is freed immediately when the buffer
85 * reference count falls to zero. If the buffer is already on the LRU, we need 126 * reference count falls to zero. If the buffer is already on the LRU, we need
@@ -102,6 +143,14 @@ xfs_buf_stale(
102 */ 143 */
103 bp->b_flags &= ~_XBF_DELWRI_Q; 144 bp->b_flags &= ~_XBF_DELWRI_Q;
104 145
146 /*
147 * Once the buffer is marked stale and unlocked, a subsequent lookup
148 * could reset b_flags. There is no guarantee that the buffer is
149 * unaccounted (released to LRU) before that occurs. Drop in-flight
150 * status now to preserve accounting consistency.
151 */
152 xfs_buf_ioacct_dec(bp);
153
105 spin_lock(&bp->b_lock); 154 spin_lock(&bp->b_lock);
106 atomic_set(&bp->b_lru_ref, 0); 155 atomic_set(&bp->b_lru_ref, 0);
107 if (!(bp->b_state & XFS_BSTATE_DISPOSE) && 156 if (!(bp->b_state & XFS_BSTATE_DISPOSE) &&
@@ -815,7 +864,8 @@ xfs_buf_get_uncached(
815 struct xfs_buf *bp; 864 struct xfs_buf *bp;
816 DEFINE_SINGLE_BUF_MAP(map, XFS_BUF_DADDR_NULL, numblks); 865 DEFINE_SINGLE_BUF_MAP(map, XFS_BUF_DADDR_NULL, numblks);
817 866
818 bp = _xfs_buf_alloc(target, &map, 1, 0); 867 /* flags might contain irrelevant bits, pass only what we care about */
868 bp = _xfs_buf_alloc(target, &map, 1, flags & XBF_NO_IOACCT);
819 if (unlikely(bp == NULL)) 869 if (unlikely(bp == NULL))
820 goto fail; 870 goto fail;
821 871
@@ -866,63 +916,85 @@ xfs_buf_hold(
866} 916}
867 917
868/* 918/*
869 * Releases a hold on the specified buffer. If the 919 * Release a hold on the specified buffer. If the hold count is 1, the buffer is
870 * the hold count is 1, calls xfs_buf_free. 920 * placed on LRU or freed (depending on b_lru_ref).
871 */ 921 */
872void 922void
873xfs_buf_rele( 923xfs_buf_rele(
874 xfs_buf_t *bp) 924 xfs_buf_t *bp)
875{ 925{
876 struct xfs_perag *pag = bp->b_pag; 926 struct xfs_perag *pag = bp->b_pag;
927 bool release;
928 bool freebuf = false;
877 929
878 trace_xfs_buf_rele(bp, _RET_IP_); 930 trace_xfs_buf_rele(bp, _RET_IP_);
879 931
880 if (!pag) { 932 if (!pag) {
881 ASSERT(list_empty(&bp->b_lru)); 933 ASSERT(list_empty(&bp->b_lru));
882 ASSERT(RB_EMPTY_NODE(&bp->b_rbnode)); 934 ASSERT(RB_EMPTY_NODE(&bp->b_rbnode));
883 if (atomic_dec_and_test(&bp->b_hold)) 935 if (atomic_dec_and_test(&bp->b_hold)) {
936 xfs_buf_ioacct_dec(bp);
884 xfs_buf_free(bp); 937 xfs_buf_free(bp);
938 }
885 return; 939 return;
886 } 940 }
887 941
888 ASSERT(!RB_EMPTY_NODE(&bp->b_rbnode)); 942 ASSERT(!RB_EMPTY_NODE(&bp->b_rbnode));
889 943
890 ASSERT(atomic_read(&bp->b_hold) > 0); 944 ASSERT(atomic_read(&bp->b_hold) > 0);
891 if (atomic_dec_and_lock(&bp->b_hold, &pag->pag_buf_lock)) {
892 spin_lock(&bp->b_lock);
893 if (!(bp->b_flags & XBF_STALE) && atomic_read(&bp->b_lru_ref)) {
894 /*
895 * If the buffer is added to the LRU take a new
896 * reference to the buffer for the LRU and clear the
897 * (now stale) dispose list state flag
898 */
899 if (list_lru_add(&bp->b_target->bt_lru, &bp->b_lru)) {
900 bp->b_state &= ~XFS_BSTATE_DISPOSE;
901 atomic_inc(&bp->b_hold);
902 }
903 spin_unlock(&bp->b_lock);
904 spin_unlock(&pag->pag_buf_lock);
905 } else {
906 /*
907 * most of the time buffers will already be removed from
908 * the LRU, so optimise that case by checking for the
909 * XFS_BSTATE_DISPOSE flag indicating the last list the
910 * buffer was on was the disposal list
911 */
912 if (!(bp->b_state & XFS_BSTATE_DISPOSE)) {
913 list_lru_del(&bp->b_target->bt_lru, &bp->b_lru);
914 } else {
915 ASSERT(list_empty(&bp->b_lru));
916 }
917 spin_unlock(&bp->b_lock);
918 945
919 ASSERT(!(bp->b_flags & _XBF_DELWRI_Q)); 946 release = atomic_dec_and_lock(&bp->b_hold, &pag->pag_buf_lock);
920 rb_erase(&bp->b_rbnode, &pag->pag_buf_tree); 947 spin_lock(&bp->b_lock);
921 spin_unlock(&pag->pag_buf_lock); 948 if (!release) {
922 xfs_perag_put(pag); 949 /*
923 xfs_buf_free(bp); 950 * Drop the in-flight state if the buffer is already on the LRU
951 * and it holds the only reference. This is racy because we
952 * haven't acquired the pag lock, but the use of _XBF_IN_FLIGHT
953 * ensures the decrement occurs only once per-buf.
954 */
955 if ((atomic_read(&bp->b_hold) == 1) && !list_empty(&bp->b_lru))
956 xfs_buf_ioacct_dec(bp);
957 goto out_unlock;
958 }
959
960 /* the last reference has been dropped ... */
961 xfs_buf_ioacct_dec(bp);
962 if (!(bp->b_flags & XBF_STALE) && atomic_read(&bp->b_lru_ref)) {
963 /*
964 * If the buffer is added to the LRU take a new reference to the
965 * buffer for the LRU and clear the (now stale) dispose list
966 * state flag
967 */
968 if (list_lru_add(&bp->b_target->bt_lru, &bp->b_lru)) {
969 bp->b_state &= ~XFS_BSTATE_DISPOSE;
970 atomic_inc(&bp->b_hold);
971 }
972 spin_unlock(&pag->pag_buf_lock);
973 } else {
974 /*
975 * most of the time buffers will already be removed from the
976 * LRU, so optimise that case by checking for the
977 * XFS_BSTATE_DISPOSE flag indicating the last list the buffer
978 * was on was the disposal list
979 */
980 if (!(bp->b_state & XFS_BSTATE_DISPOSE)) {
981 list_lru_del(&bp->b_target->bt_lru, &bp->b_lru);
982 } else {
983 ASSERT(list_empty(&bp->b_lru));
924 } 984 }
985
986 ASSERT(!(bp->b_flags & _XBF_DELWRI_Q));
987 rb_erase(&bp->b_rbnode, &pag->pag_buf_tree);
988 spin_unlock(&pag->pag_buf_lock);
989 xfs_perag_put(pag);
990 freebuf = true;
925 } 991 }
992
993out_unlock:
994 spin_unlock(&bp->b_lock);
995
996 if (freebuf)
997 xfs_buf_free(bp);
926} 998}
927 999
928 1000
@@ -944,10 +1016,12 @@ xfs_buf_trylock(
944 int locked; 1016 int locked;
945 1017
946 locked = down_trylock(&bp->b_sema) == 0; 1018 locked = down_trylock(&bp->b_sema) == 0;
947 if (locked) 1019 if (locked) {
948 XB_SET_OWNER(bp); 1020 XB_SET_OWNER(bp);
949 1021 trace_xfs_buf_trylock(bp, _RET_IP_);
950 trace_xfs_buf_trylock(bp, _RET_IP_); 1022 } else {
1023 trace_xfs_buf_trylock_fail(bp, _RET_IP_);
1024 }
951 return locked; 1025 return locked;
952} 1026}
953 1027
@@ -1339,6 +1413,7 @@ xfs_buf_submit(
1339 * xfs_buf_ioend too early. 1413 * xfs_buf_ioend too early.
1340 */ 1414 */
1341 atomic_set(&bp->b_io_remaining, 1); 1415 atomic_set(&bp->b_io_remaining, 1);
1416 xfs_buf_ioacct_inc(bp);
1342 _xfs_buf_ioapply(bp); 1417 _xfs_buf_ioapply(bp);
1343 1418
1344 /* 1419 /*
@@ -1524,13 +1599,19 @@ xfs_wait_buftarg(
1524 int loop = 0; 1599 int loop = 0;
1525 1600
1526 /* 1601 /*
1527 * We need to flush the buffer workqueue to ensure that all IO 1602 * First wait on the buftarg I/O count for all in-flight buffers to be
1528 * completion processing is 100% done. Just waiting on buffer locks is 1603 * released. This is critical as new buffers do not make the LRU until
1529 * not sufficient for async IO as the reference count held over IO is 1604 * they are released.
1530 * not released until after the buffer lock is dropped. Hence we need to 1605 *
1531 * ensure here that all reference counts have been dropped before we 1606 * Next, flush the buffer workqueue to ensure all completion processing
1532 * start walking the LRU list. 1607 * has finished. Just waiting on buffer locks is not sufficient for
1608 * async IO as the reference count held over IO is not released until
1609 * after the buffer lock is dropped. Hence we need to ensure here that
1610 * all reference counts have been dropped before we start walking the
1611 * LRU list.
1533 */ 1612 */
1613 while (percpu_counter_sum(&btp->bt_io_count))
1614 delay(100);
1534 drain_workqueue(btp->bt_mount->m_buf_workqueue); 1615 drain_workqueue(btp->bt_mount->m_buf_workqueue);
1535 1616
1536 /* loop until there is nothing left on the lru list. */ 1617 /* loop until there is nothing left on the lru list. */
@@ -1627,6 +1708,8 @@ xfs_free_buftarg(
1627 struct xfs_buftarg *btp) 1708 struct xfs_buftarg *btp)
1628{ 1709{
1629 unregister_shrinker(&btp->bt_shrinker); 1710 unregister_shrinker(&btp->bt_shrinker);
1711 ASSERT(percpu_counter_sum(&btp->bt_io_count) == 0);
1712 percpu_counter_destroy(&btp->bt_io_count);
1630 list_lru_destroy(&btp->bt_lru); 1713 list_lru_destroy(&btp->bt_lru);
1631 1714
1632 if (mp->m_flags & XFS_MOUNT_BARRIER) 1715 if (mp->m_flags & XFS_MOUNT_BARRIER)
@@ -1691,6 +1774,9 @@ xfs_alloc_buftarg(
1691 if (list_lru_init(&btp->bt_lru)) 1774 if (list_lru_init(&btp->bt_lru))
1692 goto error; 1775 goto error;
1693 1776
1777 if (percpu_counter_init(&btp->bt_io_count, 0, GFP_KERNEL))
1778 goto error;
1779
1694 btp->bt_shrinker.count_objects = xfs_buftarg_shrink_count; 1780 btp->bt_shrinker.count_objects = xfs_buftarg_shrink_count;
1695 btp->bt_shrinker.scan_objects = xfs_buftarg_shrink_scan; 1781 btp->bt_shrinker.scan_objects = xfs_buftarg_shrink_scan;
1696 btp->bt_shrinker.seeks = DEFAULT_SEEKS; 1782 btp->bt_shrinker.seeks = DEFAULT_SEEKS;
@@ -1774,18 +1860,33 @@ xfs_buf_cmp(
1774 return 0; 1860 return 0;
1775} 1861}
1776 1862
1863/*
1864 * submit buffers for write.
1865 *
1866 * When we have a large buffer list, we do not want to hold all the buffers
1867 * locked while we block on the request queue waiting for IO dispatch. To avoid
1868 * this problem, we lock and submit buffers in groups of 50, thereby minimising
1869 * the lock hold times for lists which may contain thousands of objects.
1870 *
1871 * To do this, we sort the buffer list before we walk the list to lock and
1872 * submit buffers, and we plug and unplug around each group of buffers we
1873 * submit.
1874 */
1777static int 1875static int
1778__xfs_buf_delwri_submit( 1876xfs_buf_delwri_submit_buffers(
1779 struct list_head *buffer_list, 1877 struct list_head *buffer_list,
1780 struct list_head *io_list, 1878 struct list_head *wait_list)
1781 bool wait)
1782{ 1879{
1783 struct blk_plug plug;
1784 struct xfs_buf *bp, *n; 1880 struct xfs_buf *bp, *n;
1881 LIST_HEAD (submit_list);
1785 int pinned = 0; 1882 int pinned = 0;
1883 struct blk_plug plug;
1786 1884
1885 list_sort(NULL, buffer_list, xfs_buf_cmp);
1886
1887 blk_start_plug(&plug);
1787 list_for_each_entry_safe(bp, n, buffer_list, b_list) { 1888 list_for_each_entry_safe(bp, n, buffer_list, b_list) {
1788 if (!wait) { 1889 if (!wait_list) {
1789 if (xfs_buf_ispinned(bp)) { 1890 if (xfs_buf_ispinned(bp)) {
1790 pinned++; 1891 pinned++;
1791 continue; 1892 continue;
@@ -1808,25 +1909,21 @@ __xfs_buf_delwri_submit(
1808 continue; 1909 continue;
1809 } 1910 }
1810 1911
1811 list_move_tail(&bp->b_list, io_list);
1812 trace_xfs_buf_delwri_split(bp, _RET_IP_); 1912 trace_xfs_buf_delwri_split(bp, _RET_IP_);
1813 }
1814
1815 list_sort(NULL, io_list, xfs_buf_cmp);
1816
1817 blk_start_plug(&plug);
1818 list_for_each_entry_safe(bp, n, io_list, b_list) {
1819 bp->b_flags &= ~(_XBF_DELWRI_Q | XBF_ASYNC | XBF_WRITE_FAIL);
1820 bp->b_flags |= XBF_WRITE | XBF_ASYNC;
1821 1913
1822 /* 1914 /*
1823 * we do all Io submission async. This means if we need to wait 1915 * We do all IO submission async. This means if we need
1824 * for IO completion we need to take an extra reference so the 1916 * to wait for IO completion we need to take an extra
1825 * buffer is still valid on the other side. 1917 * reference so the buffer is still valid on the other
1918 * side. We need to move the buffer onto the io_list
1919 * at this point so the caller can still access it.
1826 */ 1920 */
1827 if (wait) 1921 bp->b_flags &= ~(_XBF_DELWRI_Q | XBF_WRITE_FAIL);
1922 bp->b_flags |= XBF_WRITE | XBF_ASYNC;
1923 if (wait_list) {
1828 xfs_buf_hold(bp); 1924 xfs_buf_hold(bp);
1829 else 1925 list_move_tail(&bp->b_list, wait_list);
1926 } else
1830 list_del_init(&bp->b_list); 1927 list_del_init(&bp->b_list);
1831 1928
1832 xfs_buf_submit(bp); 1929 xfs_buf_submit(bp);
@@ -1849,8 +1946,7 @@ int
1849xfs_buf_delwri_submit_nowait( 1946xfs_buf_delwri_submit_nowait(
1850 struct list_head *buffer_list) 1947 struct list_head *buffer_list)
1851{ 1948{
1852 LIST_HEAD (io_list); 1949 return xfs_buf_delwri_submit_buffers(buffer_list, NULL);
1853 return __xfs_buf_delwri_submit(buffer_list, &io_list, false);
1854} 1950}
1855 1951
1856/* 1952/*
@@ -1865,15 +1961,15 @@ int
1865xfs_buf_delwri_submit( 1961xfs_buf_delwri_submit(
1866 struct list_head *buffer_list) 1962 struct list_head *buffer_list)
1867{ 1963{
1868 LIST_HEAD (io_list); 1964 LIST_HEAD (wait_list);
1869 int error = 0, error2; 1965 int error = 0, error2;
1870 struct xfs_buf *bp; 1966 struct xfs_buf *bp;
1871 1967
1872 __xfs_buf_delwri_submit(buffer_list, &io_list, true); 1968 xfs_buf_delwri_submit_buffers(buffer_list, &wait_list);
1873 1969
1874 /* Wait for IO to complete. */ 1970 /* Wait for IO to complete. */
1875 while (!list_empty(&io_list)) { 1971 while (!list_empty(&wait_list)) {
1876 bp = list_first_entry(&io_list, struct xfs_buf, b_list); 1972 bp = list_first_entry(&wait_list, struct xfs_buf, b_list);
1877 1973
1878 list_del_init(&bp->b_list); 1974 list_del_init(&bp->b_list);
1879 1975