aboutsummaryrefslogtreecommitdiffstats
path: root/fs/btrfs/async-thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/btrfs/async-thread.c')
-rw-r--r--fs/btrfs/async-thread.c230
1 files changed, 184 insertions, 46 deletions
diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c
index 019e8af449ab..6ea5cd0a595f 100644
--- a/fs/btrfs/async-thread.c
+++ b/fs/btrfs/async-thread.c
@@ -48,6 +48,9 @@ struct btrfs_worker_thread {
48 /* number of things on the pending list */ 48 /* number of things on the pending list */
49 atomic_t num_pending; 49 atomic_t num_pending;
50 50
51 /* reference counter for this struct */
52 atomic_t refs;
53
51 unsigned long sequence; 54 unsigned long sequence;
52 55
53 /* protects the pending list. */ 56 /* protects the pending list. */
@@ -93,17 +96,40 @@ static void check_busy_worker(struct btrfs_worker_thread *worker)
93 } 96 }
94} 97}
95 98
96static noinline int run_ordered_completions(struct btrfs_workers *workers, 99static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
97 struct btrfs_work *work)
98{ 100{
101 struct btrfs_workers *workers = worker->workers;
99 unsigned long flags; 102 unsigned long flags;
100 103
104 rmb();
105 if (!workers->atomic_start_pending)
106 return;
107
108 spin_lock_irqsave(&workers->lock, flags);
109 if (!workers->atomic_start_pending)
110 goto out;
111
112 workers->atomic_start_pending = 0;
113 if (workers->num_workers >= workers->max_workers)
114 goto out;
115
116 spin_unlock_irqrestore(&workers->lock, flags);
117 btrfs_start_workers(workers, 1);
118 return;
119
120out:
121 spin_unlock_irqrestore(&workers->lock, flags);
122}
123
124static noinline int run_ordered_completions(struct btrfs_workers *workers,
125 struct btrfs_work *work)
126{
101 if (!workers->ordered) 127 if (!workers->ordered)
102 return 0; 128 return 0;
103 129
104 set_bit(WORK_DONE_BIT, &work->flags); 130 set_bit(WORK_DONE_BIT, &work->flags);
105 131
106 spin_lock_irqsave(&workers->lock, flags); 132 spin_lock(&workers->order_lock);
107 133
108 while (1) { 134 while (1) {
109 if (!list_empty(&workers->prio_order_list)) { 135 if (!list_empty(&workers->prio_order_list)) {
@@ -126,45 +152,117 @@ static noinline int run_ordered_completions(struct btrfs_workers *workers,
126 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags)) 152 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
127 break; 153 break;
128 154
129 spin_unlock_irqrestore(&workers->lock, flags); 155 spin_unlock(&workers->order_lock);
130 156
131 work->ordered_func(work); 157 work->ordered_func(work);
132 158
133 /* now take the lock again and call the freeing code */ 159 /* now take the lock again and call the freeing code */
134 spin_lock_irqsave(&workers->lock, flags); 160 spin_lock(&workers->order_lock);
135 list_del(&work->order_list); 161 list_del(&work->order_list);
136 work->ordered_free(work); 162 work->ordered_free(work);
137 } 163 }
138 164
139 spin_unlock_irqrestore(&workers->lock, flags); 165 spin_unlock(&workers->order_lock);
140 return 0; 166 return 0;
141} 167}
142 168
169static void put_worker(struct btrfs_worker_thread *worker)
170{
171 if (atomic_dec_and_test(&worker->refs))
172 kfree(worker);
173}
174
175static int try_worker_shutdown(struct btrfs_worker_thread *worker)
176{
177 int freeit = 0;
178
179 spin_lock_irq(&worker->lock);
180 spin_lock_irq(&worker->workers->lock);
181 if (worker->workers->num_workers > 1 &&
182 worker->idle &&
183 !worker->working &&
184 !list_empty(&worker->worker_list) &&
185 list_empty(&worker->prio_pending) &&
186 list_empty(&worker->pending)) {
187 freeit = 1;
188 list_del_init(&worker->worker_list);
189 worker->workers->num_workers--;
190 }
191 spin_unlock_irq(&worker->workers->lock);
192 spin_unlock_irq(&worker->lock);
193
194 if (freeit)
195 put_worker(worker);
196 return freeit;
197}
198
199static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
200 struct list_head *prio_head,
201 struct list_head *head)
202{
203 struct btrfs_work *work = NULL;
204 struct list_head *cur = NULL;
205
206 if(!list_empty(prio_head))
207 cur = prio_head->next;
208
209 smp_mb();
210 if (!list_empty(&worker->prio_pending))
211 goto refill;
212
213 if (!list_empty(head))
214 cur = head->next;
215
216 if (cur)
217 goto out;
218
219refill:
220 spin_lock_irq(&worker->lock);
221 list_splice_tail_init(&worker->prio_pending, prio_head);
222 list_splice_tail_init(&worker->pending, head);
223
224 if (!list_empty(prio_head))
225 cur = prio_head->next;
226 else if (!list_empty(head))
227 cur = head->next;
228 spin_unlock_irq(&worker->lock);
229
230 if (!cur)
231 goto out_fail;
232
233out:
234 work = list_entry(cur, struct btrfs_work, list);
235
236out_fail:
237 return work;
238}
239
143/* 240/*
144 * main loop for servicing work items 241 * main loop for servicing work items
145 */ 242 */
146static int worker_loop(void *arg) 243static int worker_loop(void *arg)
147{ 244{
148 struct btrfs_worker_thread *worker = arg; 245 struct btrfs_worker_thread *worker = arg;
149 struct list_head *cur; 246 struct list_head head;
247 struct list_head prio_head;
150 struct btrfs_work *work; 248 struct btrfs_work *work;
249
250 INIT_LIST_HEAD(&head);
251 INIT_LIST_HEAD(&prio_head);
252
151 do { 253 do {
152 spin_lock_irq(&worker->lock); 254again:
153again_locked:
154 while (1) { 255 while (1) {
155 if (!list_empty(&worker->prio_pending)) 256
156 cur = worker->prio_pending.next; 257
157 else if (!list_empty(&worker->pending)) 258 work = get_next_work(worker, &prio_head, &head);
158 cur = worker->pending.next; 259 if (!work)
159 else
160 break; 260 break;
161 261
162 work = list_entry(cur, struct btrfs_work, list);
163 list_del(&work->list); 262 list_del(&work->list);
164 clear_bit(WORK_QUEUED_BIT, &work->flags); 263 clear_bit(WORK_QUEUED_BIT, &work->flags);
165 264
166 work->worker = worker; 265 work->worker = worker;
167 spin_unlock_irq(&worker->lock);
168 266
169 work->func(work); 267 work->func(work);
170 268
@@ -175,9 +273,13 @@ again_locked:
175 */ 273 */
176 run_ordered_completions(worker->workers, work); 274 run_ordered_completions(worker->workers, work);
177 275
178 spin_lock_irq(&worker->lock); 276 check_pending_worker_creates(worker);
179 check_idle_worker(worker); 277
180 } 278 }
279
280 spin_lock_irq(&worker->lock);
281 check_idle_worker(worker);
282
181 if (freezing(current)) { 283 if (freezing(current)) {
182 worker->working = 0; 284 worker->working = 0;
183 spin_unlock_irq(&worker->lock); 285 spin_unlock_irq(&worker->lock);
@@ -216,8 +318,10 @@ again_locked:
216 spin_lock_irq(&worker->lock); 318 spin_lock_irq(&worker->lock);
217 set_current_state(TASK_INTERRUPTIBLE); 319 set_current_state(TASK_INTERRUPTIBLE);
218 if (!list_empty(&worker->pending) || 320 if (!list_empty(&worker->pending) ||
219 !list_empty(&worker->prio_pending)) 321 !list_empty(&worker->prio_pending)) {
220 goto again_locked; 322 spin_unlock_irq(&worker->lock);
323 goto again;
324 }
221 325
222 /* 326 /*
223 * this makes sure we get a wakeup when someone 327 * this makes sure we get a wakeup when someone
@@ -226,8 +330,13 @@ again_locked:
226 worker->working = 0; 330 worker->working = 0;
227 spin_unlock_irq(&worker->lock); 331 spin_unlock_irq(&worker->lock);
228 332
229 if (!kthread_should_stop()) 333 if (!kthread_should_stop()) {
230 schedule(); 334 schedule_timeout(HZ * 120);
335 if (!worker->working &&
336 try_worker_shutdown(worker)) {
337 return 0;
338 }
339 }
231 } 340 }
232 __set_current_state(TASK_RUNNING); 341 __set_current_state(TASK_RUNNING);
233 } 342 }
@@ -242,16 +351,30 @@ int btrfs_stop_workers(struct btrfs_workers *workers)
242{ 351{
243 struct list_head *cur; 352 struct list_head *cur;
244 struct btrfs_worker_thread *worker; 353 struct btrfs_worker_thread *worker;
354 int can_stop;
245 355
356 spin_lock_irq(&workers->lock);
246 list_splice_init(&workers->idle_list, &workers->worker_list); 357 list_splice_init(&workers->idle_list, &workers->worker_list);
247 while (!list_empty(&workers->worker_list)) { 358 while (!list_empty(&workers->worker_list)) {
248 cur = workers->worker_list.next; 359 cur = workers->worker_list.next;
249 worker = list_entry(cur, struct btrfs_worker_thread, 360 worker = list_entry(cur, struct btrfs_worker_thread,
250 worker_list); 361 worker_list);
251 kthread_stop(worker->task); 362
252 list_del(&worker->worker_list); 363 atomic_inc(&worker->refs);
253 kfree(worker); 364 workers->num_workers -= 1;
365 if (!list_empty(&worker->worker_list)) {
366 list_del_init(&worker->worker_list);
367 put_worker(worker);
368 can_stop = 1;
369 } else
370 can_stop = 0;
371 spin_unlock_irq(&workers->lock);
372 if (can_stop)
373 kthread_stop(worker->task);
374 spin_lock_irq(&workers->lock);
375 put_worker(worker);
254 } 376 }
377 spin_unlock_irq(&workers->lock);
255 return 0; 378 return 0;
256} 379}
257 380
@@ -266,10 +389,13 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
266 INIT_LIST_HEAD(&workers->order_list); 389 INIT_LIST_HEAD(&workers->order_list);
267 INIT_LIST_HEAD(&workers->prio_order_list); 390 INIT_LIST_HEAD(&workers->prio_order_list);
268 spin_lock_init(&workers->lock); 391 spin_lock_init(&workers->lock);
392 spin_lock_init(&workers->order_lock);
269 workers->max_workers = max; 393 workers->max_workers = max;
270 workers->idle_thresh = 32; 394 workers->idle_thresh = 32;
271 workers->name = name; 395 workers->name = name;
272 workers->ordered = 0; 396 workers->ordered = 0;
397 workers->atomic_start_pending = 0;
398 workers->atomic_worker_start = 0;
273} 399}
274 400
275/* 401/*
@@ -293,7 +419,9 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
293 INIT_LIST_HEAD(&worker->prio_pending); 419 INIT_LIST_HEAD(&worker->prio_pending);
294 INIT_LIST_HEAD(&worker->worker_list); 420 INIT_LIST_HEAD(&worker->worker_list);
295 spin_lock_init(&worker->lock); 421 spin_lock_init(&worker->lock);
422
296 atomic_set(&worker->num_pending, 0); 423 atomic_set(&worker->num_pending, 0);
424 atomic_set(&worker->refs, 1);
297 worker->workers = workers; 425 worker->workers = workers;
298 worker->task = kthread_run(worker_loop, worker, 426 worker->task = kthread_run(worker_loop, worker,
299 "btrfs-%s-%d", workers->name, 427 "btrfs-%s-%d", workers->name,
@@ -303,7 +431,6 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
303 kfree(worker); 431 kfree(worker);
304 goto fail; 432 goto fail;
305 } 433 }
306
307 spin_lock_irq(&workers->lock); 434 spin_lock_irq(&workers->lock);
308 list_add_tail(&worker->worker_list, &workers->idle_list); 435 list_add_tail(&worker->worker_list, &workers->idle_list);
309 worker->idle = 1; 436 worker->idle = 1;
@@ -367,28 +494,18 @@ static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
367{ 494{
368 struct btrfs_worker_thread *worker; 495 struct btrfs_worker_thread *worker;
369 unsigned long flags; 496 unsigned long flags;
497 struct list_head *fallback;
370 498
371again: 499again:
372 spin_lock_irqsave(&workers->lock, flags); 500 spin_lock_irqsave(&workers->lock, flags);
373 worker = next_worker(workers); 501 worker = next_worker(workers);
374 spin_unlock_irqrestore(&workers->lock, flags);
375 502
376 if (!worker) { 503 if (!worker) {
377 spin_lock_irqsave(&workers->lock, flags);
378 if (workers->num_workers >= workers->max_workers) { 504 if (workers->num_workers >= workers->max_workers) {
379 struct list_head *fallback = NULL; 505 goto fallback;
380 /* 506 } else if (workers->atomic_worker_start) {
381 * we have failed to find any workers, just 507 workers->atomic_start_pending = 1;
382 * return the force one 508 goto fallback;
383 */
384 if (!list_empty(&workers->worker_list))
385 fallback = workers->worker_list.next;
386 if (!list_empty(&workers->idle_list))
387 fallback = workers->idle_list.next;
388 BUG_ON(!fallback);
389 worker = list_entry(fallback,
390 struct btrfs_worker_thread, worker_list);
391 spin_unlock_irqrestore(&workers->lock, flags);
392 } else { 509 } else {
393 spin_unlock_irqrestore(&workers->lock, flags); 510 spin_unlock_irqrestore(&workers->lock, flags);
394 /* we're below the limit, start another worker */ 511 /* we're below the limit, start another worker */
@@ -396,6 +513,23 @@ again:
396 goto again; 513 goto again;
397 } 514 }
398 } 515 }
516 spin_unlock_irqrestore(&workers->lock, flags);
517 return worker;
518
519fallback:
520 fallback = NULL;
521 /*
522 * we have failed to find any workers, just
523 * return the first one we can find.
524 */
525 if (!list_empty(&workers->worker_list))
526 fallback = workers->worker_list.next;
527 if (!list_empty(&workers->idle_list))
528 fallback = workers->idle_list.next;
529 BUG_ON(!fallback);
530 worker = list_entry(fallback,
531 struct btrfs_worker_thread, worker_list);
532 spin_unlock_irqrestore(&workers->lock, flags);
399 return worker; 533 return worker;
400} 534}
401 535
@@ -435,9 +569,9 @@ int btrfs_requeue_work(struct btrfs_work *work)
435 worker->working = 1; 569 worker->working = 1;
436 } 570 }
437 571
438 spin_unlock_irqrestore(&worker->lock, flags);
439 if (wake) 572 if (wake)
440 wake_up_process(worker->task); 573 wake_up_process(worker->task);
574 spin_unlock_irqrestore(&worker->lock, flags);
441out: 575out:
442 576
443 return 0; 577 return 0;
@@ -463,14 +597,18 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
463 597
464 worker = find_worker(workers); 598 worker = find_worker(workers);
465 if (workers->ordered) { 599 if (workers->ordered) {
466 spin_lock_irqsave(&workers->lock, flags); 600 /*
601 * you're not allowed to do ordered queues from an
602 * interrupt handler
603 */
604 spin_lock(&workers->order_lock);
467 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) { 605 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
468 list_add_tail(&work->order_list, 606 list_add_tail(&work->order_list,
469 &workers->prio_order_list); 607 &workers->prio_order_list);
470 } else { 608 } else {
471 list_add_tail(&work->order_list, &workers->order_list); 609 list_add_tail(&work->order_list, &workers->order_list);
472 } 610 }
473 spin_unlock_irqrestore(&workers->lock, flags); 611 spin_unlock(&workers->order_lock);
474 } else { 612 } else {
475 INIT_LIST_HEAD(&work->order_list); 613 INIT_LIST_HEAD(&work->order_list);
476 } 614 }
@@ -492,10 +630,10 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
492 wake = 1; 630 wake = 1;
493 worker->working = 1; 631 worker->working = 1;
494 632
495 spin_unlock_irqrestore(&worker->lock, flags);
496
497 if (wake) 633 if (wake)
498 wake_up_process(worker->task); 634 wake_up_process(worker->task);
635 spin_unlock_irqrestore(&worker->lock, flags);
636
499out: 637out:
500 return 0; 638 return 0;
501} 639}