aboutsummaryrefslogtreecommitdiffstats
path: root/fs/btrfs/locking.c
diff options
context:
space:
mode:
authorChris Mason <chris.mason@oracle.com>2011-07-16 15:23:14 -0400
committerChris Mason <chris.mason@oracle.com>2011-07-27 12:46:46 -0400
commitbd681513fa6f2ff29aa391f01e413a2d1c59fd77 (patch)
treebb10ec6ef876b4d7a553cbe54976ec49a0d10b21 /fs/btrfs/locking.c
parent81317fdeddcef259b6ecf7b5c0d04caa167c6b54 (diff)
Btrfs: switch the btrfs tree locks to reader/writer
The btrfs metadata btree is the source of significant lock contention, especially in the root node. This commit changes our locking to use a reader/writer lock. The lock is built on top of rw spinlocks, and it extends the lock tracking to remember if we have a read lock or a write lock when we go to blocking. Atomics count the number of blocking readers or writers at any given time. It removes all of the adaptive spinning from the old code and uses only the spinning/blocking hints inside of btrfs to decide when it should continue spinning. In read heavy workloads this is dramatically faster. In write heavy workloads we're still faster because of less contention on the root node lock. We suffer slightly in dbench because we schedule more often during write locks, but all other benchmarks so far are improved. Signed-off-by: Chris Mason <chris.mason@oracle.com>
Diffstat (limited to 'fs/btrfs/locking.c')
-rw-r--r--fs/btrfs/locking.c280
1 files changed, 146 insertions, 134 deletions
diff --git a/fs/btrfs/locking.c b/fs/btrfs/locking.c
index 66fa43dc3f0f..d77b67c4b275 100644
--- a/fs/btrfs/locking.c
+++ b/fs/btrfs/locking.c
@@ -24,185 +24,197 @@
24#include "extent_io.h" 24#include "extent_io.h"
25#include "locking.h" 25#include "locking.h"
26 26
27static inline void spin_nested(struct extent_buffer *eb) 27void btrfs_assert_tree_read_locked(struct extent_buffer *eb);
28{
29 spin_lock(&eb->lock);
30}
31 28
32/* 29/*
33 * Setting a lock to blocking will drop the spinlock and set the 30 * if we currently have a spinning reader or writer lock
34 * flag that forces other procs who want the lock to wait. After 31 * (indicated by the rw flag) this will bump the count
35 * this you can safely schedule with the lock held. 32 * of blocking holders and drop the spinlock.
36 */ 33 */
37void btrfs_set_lock_blocking(struct extent_buffer *eb) 34void btrfs_set_lock_blocking_rw(struct extent_buffer *eb, int rw)
38{ 35{
39 if (!test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags)) { 36 if (rw == BTRFS_WRITE_LOCK) {
40 set_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags); 37 if (atomic_read(&eb->blocking_writers) == 0) {
41 spin_unlock(&eb->lock); 38 WARN_ON(atomic_read(&eb->spinning_writers) != 1);
39 atomic_dec(&eb->spinning_writers);
40 btrfs_assert_tree_locked(eb);
41 atomic_inc(&eb->blocking_writers);
42 write_unlock(&eb->lock);
43 }
44 } else if (rw == BTRFS_READ_LOCK) {
45 btrfs_assert_tree_read_locked(eb);
46 atomic_inc(&eb->blocking_readers);
47 WARN_ON(atomic_read(&eb->spinning_readers) == 0);
48 atomic_dec(&eb->spinning_readers);
49 read_unlock(&eb->lock);
42 } 50 }
43 /* exit with the spin lock released and the bit set */ 51 return;
44} 52}
45 53
46/* 54/*
47 * clearing the blocking flag will take the spinlock again. 55 * if we currently have a blocking lock, take the spinlock
48 * After this you can't safely schedule 56 * and drop our blocking count
49 */ 57 */
50void btrfs_clear_lock_blocking(struct extent_buffer *eb) 58void btrfs_clear_lock_blocking_rw(struct extent_buffer *eb, int rw)
51{ 59{
52 if (test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags)) { 60 if (rw == BTRFS_WRITE_LOCK_BLOCKING) {
53 spin_nested(eb); 61 BUG_ON(atomic_read(&eb->blocking_writers) != 1);
54 clear_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags); 62 write_lock(&eb->lock);
55 smp_mb__after_clear_bit(); 63 WARN_ON(atomic_read(&eb->spinning_writers));
64 atomic_inc(&eb->spinning_writers);
65 if (atomic_dec_and_test(&eb->blocking_writers))
66 wake_up(&eb->write_lock_wq);
67 } else if (rw == BTRFS_READ_LOCK_BLOCKING) {
68 BUG_ON(atomic_read(&eb->blocking_readers) == 0);
69 read_lock(&eb->lock);
70 atomic_inc(&eb->spinning_readers);
71 if (atomic_dec_and_test(&eb->blocking_readers))
72 wake_up(&eb->read_lock_wq);
56 } 73 }
57 /* exit with the spin lock held */ 74 return;
58} 75}
59 76
60/* 77/*
61 * unfortunately, many of the places that currently set a lock to blocking 78 * take a spinning read lock. This will wait for any blocking
62 * don't end up blocking for very long, and often they don't block 79 * writers
63 * at all. For a dbench 50 run, if we don't spin on the blocking bit
64 * at all, the context switch rate can jump up to 400,000/sec or more.
65 *
66 * So, we're still stuck with this crummy spin on the blocking bit,
67 * at least until the most common causes of the short blocks
68 * can be dealt with.
69 */ 80 */
70static int btrfs_spin_on_block(struct extent_buffer *eb) 81void btrfs_tree_read_lock(struct extent_buffer *eb)
71{ 82{
72 int i; 83again:
73 84 wait_event(eb->write_lock_wq, atomic_read(&eb->blocking_writers) == 0);
74 for (i = 0; i < 512; i++) { 85 read_lock(&eb->lock);
75 if (!test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags)) 86 if (atomic_read(&eb->blocking_writers)) {
76 return 1; 87 read_unlock(&eb->lock);
77 if (need_resched()) 88 wait_event(eb->write_lock_wq,
78 break; 89 atomic_read(&eb->blocking_writers) == 0);
79 cpu_relax(); 90 goto again;
80 } 91 }
81 return 0; 92 atomic_inc(&eb->read_locks);
93 atomic_inc(&eb->spinning_readers);
82} 94}
83 95
84/* 96/*
85 * This is somewhat different from trylock. It will take the 97 * returns 1 if we get the read lock and 0 if we don't
86 * spinlock but if it finds the lock is set to blocking, it will 98 * this won't wait for blocking writers
87 * return without the lock held.
88 *
89 * returns 1 if it was able to take the lock and zero otherwise
90 *
91 * After this call, scheduling is not safe without first calling
92 * btrfs_set_lock_blocking()
93 */ 99 */
94int btrfs_try_spin_lock(struct extent_buffer *eb) 100int btrfs_try_tree_read_lock(struct extent_buffer *eb)
95{ 101{
96 int i; 102 if (atomic_read(&eb->blocking_writers))
103 return 0;
97 104
98 if (btrfs_spin_on_block(eb)) { 105 read_lock(&eb->lock);
99 spin_nested(eb); 106 if (atomic_read(&eb->blocking_writers)) {
100 if (!test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags)) 107 read_unlock(&eb->lock);
101 return 1; 108 return 0;
102 spin_unlock(&eb->lock);
103 } 109 }
104 /* spin for a bit on the BLOCKING flag */ 110 atomic_inc(&eb->read_locks);
105 for (i = 0; i < 2; i++) { 111 atomic_inc(&eb->spinning_readers);
106 cpu_relax(); 112 return 1;
107 if (!btrfs_spin_on_block(eb))
108 break;
109
110 spin_nested(eb);
111 if (!test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags))
112 return 1;
113 spin_unlock(&eb->lock);
114 }
115 return 0;
116} 113}
117 114
118/* 115/*
119 * the autoremove wake function will return 0 if it tried to wake up 116 * returns 1 if we get the read lock and 0 if we don't
120 * a process that was already awake, which means that process won't 117 * this won't wait for blocking writers or readers
121 * count as an exclusive wakeup. The waitq code will continue waking
122 * procs until it finds one that was actually sleeping.
123 *
124 * For btrfs, this isn't quite what we want. We want a single proc
125 * to be notified that the lock is ready for taking. If that proc
126 * already happen to be awake, great, it will loop around and try for
127 * the lock.
128 *
129 * So, btrfs_wake_function always returns 1, even when the proc that we
130 * tried to wake up was already awake.
131 */ 118 */
132static int btrfs_wake_function(wait_queue_t *wait, unsigned mode, 119int btrfs_try_tree_write_lock(struct extent_buffer *eb)
133 int sync, void *key)
134{ 120{
135 autoremove_wake_function(wait, mode, sync, key); 121 if (atomic_read(&eb->blocking_writers) ||
122 atomic_read(&eb->blocking_readers))
123 return 0;
124 write_lock(&eb->lock);
125 if (atomic_read(&eb->blocking_writers) ||
126 atomic_read(&eb->blocking_readers)) {
127 write_unlock(&eb->lock);
128 return 0;
129 }
130 atomic_inc(&eb->write_locks);
131 atomic_inc(&eb->spinning_writers);
136 return 1; 132 return 1;
137} 133}
138 134
139/* 135/*
140 * returns with the extent buffer spinlocked. 136 * drop a spinning read lock
141 * 137 */
142 * This will spin and/or wait as required to take the lock, and then 138void btrfs_tree_read_unlock(struct extent_buffer *eb)
143 * return with the spinlock held. 139{
144 * 140 btrfs_assert_tree_read_locked(eb);
145 * After this call, scheduling is not safe without first calling 141 WARN_ON(atomic_read(&eb->spinning_readers) == 0);
146 * btrfs_set_lock_blocking() 142 atomic_dec(&eb->spinning_readers);
143 atomic_dec(&eb->read_locks);
144 read_unlock(&eb->lock);
145}
146
147/*
148 * drop a blocking read lock
149 */
150void btrfs_tree_read_unlock_blocking(struct extent_buffer *eb)
151{
152 btrfs_assert_tree_read_locked(eb);
153 WARN_ON(atomic_read(&eb->blocking_readers) == 0);
154 if (atomic_dec_and_test(&eb->blocking_readers))
155 wake_up(&eb->read_lock_wq);
156 atomic_dec(&eb->read_locks);
157}
158
159/*
160 * take a spinning write lock. This will wait for both
161 * blocking readers or writers
147 */ 162 */
148int btrfs_tree_lock(struct extent_buffer *eb) 163int btrfs_tree_lock(struct extent_buffer *eb)
149{ 164{
150 DEFINE_WAIT(wait); 165again:
151 wait.func = btrfs_wake_function; 166 wait_event(eb->read_lock_wq, atomic_read(&eb->blocking_readers) == 0);
152 167 wait_event(eb->write_lock_wq, atomic_read(&eb->blocking_writers) == 0);
153 if (!btrfs_spin_on_block(eb)) 168 write_lock(&eb->lock);
154 goto sleep; 169 if (atomic_read(&eb->blocking_readers)) {
155 170 write_unlock(&eb->lock);
156 while(1) { 171 wait_event(eb->read_lock_wq,
157 spin_nested(eb); 172 atomic_read(&eb->blocking_readers) == 0);
158 173 goto again;
159 /* nobody is blocking, exit with the spinlock held */
160 if (!test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags))
161 return 0;
162
163 /*
164 * we have the spinlock, but the real owner is blocking.
165 * wait for them
166 */
167 spin_unlock(&eb->lock);
168
169 /*
170 * spin for a bit, and if the blocking flag goes away,
171 * loop around
172 */
173 cpu_relax();
174 if (btrfs_spin_on_block(eb))
175 continue;
176sleep:
177 prepare_to_wait_exclusive(&eb->lock_wq, &wait,
178 TASK_UNINTERRUPTIBLE);
179
180 if (test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags))
181 schedule();
182
183 finish_wait(&eb->lock_wq, &wait);
184 } 174 }
175 if (atomic_read(&eb->blocking_writers)) {
176 write_unlock(&eb->lock);
177 wait_event(eb->write_lock_wq,
178 atomic_read(&eb->blocking_writers) == 0);
179 goto again;
180 }
181 WARN_ON(atomic_read(&eb->spinning_writers));
182 atomic_inc(&eb->spinning_writers);
183 atomic_inc(&eb->write_locks);
185 return 0; 184 return 0;
186} 185}
187 186
187/*
188 * drop a spinning or a blocking write lock.
189 */
188int btrfs_tree_unlock(struct extent_buffer *eb) 190int btrfs_tree_unlock(struct extent_buffer *eb)
189{ 191{
190 /* 192 int blockers = atomic_read(&eb->blocking_writers);
191 * if we were a blocking owner, we don't have the spinlock held 193
192 * just clear the bit and look for waiters 194 BUG_ON(blockers > 1);
193 */ 195
194 if (test_and_clear_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags)) 196 btrfs_assert_tree_locked(eb);
195 smp_mb__after_clear_bit(); 197 atomic_dec(&eb->write_locks);
196 else 198
197 spin_unlock(&eb->lock); 199 if (blockers) {
198 200 WARN_ON(atomic_read(&eb->spinning_writers));
199 if (waitqueue_active(&eb->lock_wq)) 201 atomic_dec(&eb->blocking_writers);
200 wake_up(&eb->lock_wq); 202 smp_wmb();
203 wake_up(&eb->write_lock_wq);
204 } else {
205 WARN_ON(atomic_read(&eb->spinning_writers) != 1);
206 atomic_dec(&eb->spinning_writers);
207 write_unlock(&eb->lock);
208 }
201 return 0; 209 return 0;
202} 210}
203 211
204void btrfs_assert_tree_locked(struct extent_buffer *eb) 212void btrfs_assert_tree_locked(struct extent_buffer *eb)
205{ 213{
206 if (!test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags)) 214 BUG_ON(!atomic_read(&eb->write_locks));
207 assert_spin_locked(&eb->lock); 215}
216
217void btrfs_assert_tree_read_locked(struct extent_buffer *eb)
218{
219 BUG_ON(!atomic_read(&eb->read_locks));
208} 220}