aboutsummaryrefslogtreecommitdiffstats
path: root/fs/btrfs/async-thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/btrfs/async-thread.c')
-rw-r--r--fs/btrfs/async-thread.c254
1 files changed, 202 insertions, 52 deletions
diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c
index 019e8af449ab..282ca085c2fb 100644
--- a/fs/btrfs/async-thread.c
+++ b/fs/btrfs/async-thread.c
@@ -48,6 +48,9 @@ struct btrfs_worker_thread {
48 /* number of things on the pending list */ 48 /* number of things on the pending list */
49 atomic_t num_pending; 49 atomic_t num_pending;
50 50
51 /* reference counter for this struct */
52 atomic_t refs;
53
51 unsigned long sequence; 54 unsigned long sequence;
52 55
53 /* protects the pending list. */ 56 /* protects the pending list. */
@@ -71,7 +74,12 @@ static void check_idle_worker(struct btrfs_worker_thread *worker)
71 unsigned long flags; 74 unsigned long flags;
72 spin_lock_irqsave(&worker->workers->lock, flags); 75 spin_lock_irqsave(&worker->workers->lock, flags);
73 worker->idle = 1; 76 worker->idle = 1;
74 list_move(&worker->worker_list, &worker->workers->idle_list); 77
78 /* the list may be empty if the worker is just starting */
79 if (!list_empty(&worker->worker_list)) {
80 list_move(&worker->worker_list,
81 &worker->workers->idle_list);
82 }
75 spin_unlock_irqrestore(&worker->workers->lock, flags); 83 spin_unlock_irqrestore(&worker->workers->lock, flags);
76 } 84 }
77} 85}
@@ -87,23 +95,49 @@ static void check_busy_worker(struct btrfs_worker_thread *worker)
87 unsigned long flags; 95 unsigned long flags;
88 spin_lock_irqsave(&worker->workers->lock, flags); 96 spin_lock_irqsave(&worker->workers->lock, flags);
89 worker->idle = 0; 97 worker->idle = 0;
90 list_move_tail(&worker->worker_list, 98
91 &worker->workers->worker_list); 99 if (!list_empty(&worker->worker_list)) {
100 list_move_tail(&worker->worker_list,
101 &worker->workers->worker_list);
102 }
92 spin_unlock_irqrestore(&worker->workers->lock, flags); 103 spin_unlock_irqrestore(&worker->workers->lock, flags);
93 } 104 }
94} 105}
95 106
96static noinline int run_ordered_completions(struct btrfs_workers *workers, 107static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
97 struct btrfs_work *work)
98{ 108{
109 struct btrfs_workers *workers = worker->workers;
99 unsigned long flags; 110 unsigned long flags;
100 111
112 rmb();
113 if (!workers->atomic_start_pending)
114 return;
115
116 spin_lock_irqsave(&workers->lock, flags);
117 if (!workers->atomic_start_pending)
118 goto out;
119
120 workers->atomic_start_pending = 0;
121 if (workers->num_workers >= workers->max_workers)
122 goto out;
123
124 spin_unlock_irqrestore(&workers->lock, flags);
125 btrfs_start_workers(workers, 1);
126 return;
127
128out:
129 spin_unlock_irqrestore(&workers->lock, flags);
130}
131
132static noinline int run_ordered_completions(struct btrfs_workers *workers,
133 struct btrfs_work *work)
134{
101 if (!workers->ordered) 135 if (!workers->ordered)
102 return 0; 136 return 0;
103 137
104 set_bit(WORK_DONE_BIT, &work->flags); 138 set_bit(WORK_DONE_BIT, &work->flags);
105 139
106 spin_lock_irqsave(&workers->lock, flags); 140 spin_lock(&workers->order_lock);
107 141
108 while (1) { 142 while (1) {
109 if (!list_empty(&workers->prio_order_list)) { 143 if (!list_empty(&workers->prio_order_list)) {
@@ -126,45 +160,118 @@ static noinline int run_ordered_completions(struct btrfs_workers *workers,
126 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags)) 160 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
127 break; 161 break;
128 162
129 spin_unlock_irqrestore(&workers->lock, flags); 163 spin_unlock(&workers->order_lock);
130 164
131 work->ordered_func(work); 165 work->ordered_func(work);
132 166
133 /* now take the lock again and call the freeing code */ 167 /* now take the lock again and call the freeing code */
134 spin_lock_irqsave(&workers->lock, flags); 168 spin_lock(&workers->order_lock);
135 list_del(&work->order_list); 169 list_del(&work->order_list);
136 work->ordered_free(work); 170 work->ordered_free(work);
137 } 171 }
138 172
139 spin_unlock_irqrestore(&workers->lock, flags); 173 spin_unlock(&workers->order_lock);
140 return 0; 174 return 0;
141} 175}
142 176
177static void put_worker(struct btrfs_worker_thread *worker)
178{
179 if (atomic_dec_and_test(&worker->refs))
180 kfree(worker);
181}
182
183static int try_worker_shutdown(struct btrfs_worker_thread *worker)
184{
185 int freeit = 0;
186
187 spin_lock_irq(&worker->lock);
188 spin_lock(&worker->workers->lock);
189 if (worker->workers->num_workers > 1 &&
190 worker->idle &&
191 !worker->working &&
192 !list_empty(&worker->worker_list) &&
193 list_empty(&worker->prio_pending) &&
194 list_empty(&worker->pending) &&
195 atomic_read(&worker->num_pending) == 0) {
196 freeit = 1;
197 list_del_init(&worker->worker_list);
198 worker->workers->num_workers--;
199 }
200 spin_unlock(&worker->workers->lock);
201 spin_unlock_irq(&worker->lock);
202
203 if (freeit)
204 put_worker(worker);
205 return freeit;
206}
207
208static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
209 struct list_head *prio_head,
210 struct list_head *head)
211{
212 struct btrfs_work *work = NULL;
213 struct list_head *cur = NULL;
214
215 if(!list_empty(prio_head))
216 cur = prio_head->next;
217
218 smp_mb();
219 if (!list_empty(&worker->prio_pending))
220 goto refill;
221
222 if (!list_empty(head))
223 cur = head->next;
224
225 if (cur)
226 goto out;
227
228refill:
229 spin_lock_irq(&worker->lock);
230 list_splice_tail_init(&worker->prio_pending, prio_head);
231 list_splice_tail_init(&worker->pending, head);
232
233 if (!list_empty(prio_head))
234 cur = prio_head->next;
235 else if (!list_empty(head))
236 cur = head->next;
237 spin_unlock_irq(&worker->lock);
238
239 if (!cur)
240 goto out_fail;
241
242out:
243 work = list_entry(cur, struct btrfs_work, list);
244
245out_fail:
246 return work;
247}
248
143/* 249/*
144 * main loop for servicing work items 250 * main loop for servicing work items
145 */ 251 */
146static int worker_loop(void *arg) 252static int worker_loop(void *arg)
147{ 253{
148 struct btrfs_worker_thread *worker = arg; 254 struct btrfs_worker_thread *worker = arg;
149 struct list_head *cur; 255 struct list_head head;
256 struct list_head prio_head;
150 struct btrfs_work *work; 257 struct btrfs_work *work;
258
259 INIT_LIST_HEAD(&head);
260 INIT_LIST_HEAD(&prio_head);
261
151 do { 262 do {
152 spin_lock_irq(&worker->lock); 263again:
153again_locked:
154 while (1) { 264 while (1) {
155 if (!list_empty(&worker->prio_pending)) 265
156 cur = worker->prio_pending.next; 266
157 else if (!list_empty(&worker->pending)) 267 work = get_next_work(worker, &prio_head, &head);
158 cur = worker->pending.next; 268 if (!work)
159 else
160 break; 269 break;
161 270
162 work = list_entry(cur, struct btrfs_work, list);
163 list_del(&work->list); 271 list_del(&work->list);
164 clear_bit(WORK_QUEUED_BIT, &work->flags); 272 clear_bit(WORK_QUEUED_BIT, &work->flags);
165 273
166 work->worker = worker; 274 work->worker = worker;
167 spin_unlock_irq(&worker->lock);
168 275
169 work->func(work); 276 work->func(work);
170 277
@@ -175,9 +282,13 @@ again_locked:
175 */ 282 */
176 run_ordered_completions(worker->workers, work); 283 run_ordered_completions(worker->workers, work);
177 284
178 spin_lock_irq(&worker->lock); 285 check_pending_worker_creates(worker);
179 check_idle_worker(worker); 286
180 } 287 }
288
289 spin_lock_irq(&worker->lock);
290 check_idle_worker(worker);
291
181 if (freezing(current)) { 292 if (freezing(current)) {
182 worker->working = 0; 293 worker->working = 0;
183 spin_unlock_irq(&worker->lock); 294 spin_unlock_irq(&worker->lock);
@@ -216,8 +327,10 @@ again_locked:
216 spin_lock_irq(&worker->lock); 327 spin_lock_irq(&worker->lock);
217 set_current_state(TASK_INTERRUPTIBLE); 328 set_current_state(TASK_INTERRUPTIBLE);
218 if (!list_empty(&worker->pending) || 329 if (!list_empty(&worker->pending) ||
219 !list_empty(&worker->prio_pending)) 330 !list_empty(&worker->prio_pending)) {
220 goto again_locked; 331 spin_unlock_irq(&worker->lock);
332 goto again;
333 }
221 334
222 /* 335 /*
223 * this makes sure we get a wakeup when someone 336 * this makes sure we get a wakeup when someone
@@ -226,8 +339,13 @@ again_locked:
226 worker->working = 0; 339 worker->working = 0;
227 spin_unlock_irq(&worker->lock); 340 spin_unlock_irq(&worker->lock);
228 341
229 if (!kthread_should_stop()) 342 if (!kthread_should_stop()) {
230 schedule(); 343 schedule_timeout(HZ * 120);
344 if (!worker->working &&
345 try_worker_shutdown(worker)) {
346 return 0;
347 }
348 }
231 } 349 }
232 __set_current_state(TASK_RUNNING); 350 __set_current_state(TASK_RUNNING);
233 } 351 }
@@ -242,16 +360,30 @@ int btrfs_stop_workers(struct btrfs_workers *workers)
242{ 360{
243 struct list_head *cur; 361 struct list_head *cur;
244 struct btrfs_worker_thread *worker; 362 struct btrfs_worker_thread *worker;
363 int can_stop;
245 364
365 spin_lock_irq(&workers->lock);
246 list_splice_init(&workers->idle_list, &workers->worker_list); 366 list_splice_init(&workers->idle_list, &workers->worker_list);
247 while (!list_empty(&workers->worker_list)) { 367 while (!list_empty(&workers->worker_list)) {
248 cur = workers->worker_list.next; 368 cur = workers->worker_list.next;
249 worker = list_entry(cur, struct btrfs_worker_thread, 369 worker = list_entry(cur, struct btrfs_worker_thread,
250 worker_list); 370 worker_list);
251 kthread_stop(worker->task); 371
252 list_del(&worker->worker_list); 372 atomic_inc(&worker->refs);
253 kfree(worker); 373 workers->num_workers -= 1;
374 if (!list_empty(&worker->worker_list)) {
375 list_del_init(&worker->worker_list);
376 put_worker(worker);
377 can_stop = 1;
378 } else
379 can_stop = 0;
380 spin_unlock_irq(&workers->lock);
381 if (can_stop)
382 kthread_stop(worker->task);
383 spin_lock_irq(&workers->lock);
384 put_worker(worker);
254 } 385 }
386 spin_unlock_irq(&workers->lock);
255 return 0; 387 return 0;
256} 388}
257 389
@@ -266,10 +398,13 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
266 INIT_LIST_HEAD(&workers->order_list); 398 INIT_LIST_HEAD(&workers->order_list);
267 INIT_LIST_HEAD(&workers->prio_order_list); 399 INIT_LIST_HEAD(&workers->prio_order_list);
268 spin_lock_init(&workers->lock); 400 spin_lock_init(&workers->lock);
401 spin_lock_init(&workers->order_lock);
269 workers->max_workers = max; 402 workers->max_workers = max;
270 workers->idle_thresh = 32; 403 workers->idle_thresh = 32;
271 workers->name = name; 404 workers->name = name;
272 workers->ordered = 0; 405 workers->ordered = 0;
406 workers->atomic_start_pending = 0;
407 workers->atomic_worker_start = 0;
273} 408}
274 409
275/* 410/*
@@ -293,7 +428,9 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
293 INIT_LIST_HEAD(&worker->prio_pending); 428 INIT_LIST_HEAD(&worker->prio_pending);
294 INIT_LIST_HEAD(&worker->worker_list); 429 INIT_LIST_HEAD(&worker->worker_list);
295 spin_lock_init(&worker->lock); 430 spin_lock_init(&worker->lock);
431
296 atomic_set(&worker->num_pending, 0); 432 atomic_set(&worker->num_pending, 0);
433 atomic_set(&worker->refs, 1);
297 worker->workers = workers; 434 worker->workers = workers;
298 worker->task = kthread_run(worker_loop, worker, 435 worker->task = kthread_run(worker_loop, worker,
299 "btrfs-%s-%d", workers->name, 436 "btrfs-%s-%d", workers->name,
@@ -303,7 +440,6 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
303 kfree(worker); 440 kfree(worker);
304 goto fail; 441 goto fail;
305 } 442 }
306
307 spin_lock_irq(&workers->lock); 443 spin_lock_irq(&workers->lock);
308 list_add_tail(&worker->worker_list, &workers->idle_list); 444 list_add_tail(&worker->worker_list, &workers->idle_list);
309 worker->idle = 1; 445 worker->idle = 1;
@@ -350,7 +486,6 @@ static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
350 */ 486 */
351 next = workers->worker_list.next; 487 next = workers->worker_list.next;
352 worker = list_entry(next, struct btrfs_worker_thread, worker_list); 488 worker = list_entry(next, struct btrfs_worker_thread, worker_list);
353 atomic_inc(&worker->num_pending);
354 worker->sequence++; 489 worker->sequence++;
355 490
356 if (worker->sequence % workers->idle_thresh == 0) 491 if (worker->sequence % workers->idle_thresh == 0)
@@ -367,28 +502,18 @@ static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
367{ 502{
368 struct btrfs_worker_thread *worker; 503 struct btrfs_worker_thread *worker;
369 unsigned long flags; 504 unsigned long flags;
505 struct list_head *fallback;
370 506
371again: 507again:
372 spin_lock_irqsave(&workers->lock, flags); 508 spin_lock_irqsave(&workers->lock, flags);
373 worker = next_worker(workers); 509 worker = next_worker(workers);
374 spin_unlock_irqrestore(&workers->lock, flags);
375 510
376 if (!worker) { 511 if (!worker) {
377 spin_lock_irqsave(&workers->lock, flags);
378 if (workers->num_workers >= workers->max_workers) { 512 if (workers->num_workers >= workers->max_workers) {
379 struct list_head *fallback = NULL; 513 goto fallback;
380 /* 514 } else if (workers->atomic_worker_start) {
381 * we have failed to find any workers, just 515 workers->atomic_start_pending = 1;
382 * return the force one 516 goto fallback;
383 */
384 if (!list_empty(&workers->worker_list))
385 fallback = workers->worker_list.next;
386 if (!list_empty(&workers->idle_list))
387 fallback = workers->idle_list.next;
388 BUG_ON(!fallback);
389 worker = list_entry(fallback,
390 struct btrfs_worker_thread, worker_list);
391 spin_unlock_irqrestore(&workers->lock, flags);
392 } else { 517 } else {
393 spin_unlock_irqrestore(&workers->lock, flags); 518 spin_unlock_irqrestore(&workers->lock, flags);
394 /* we're below the limit, start another worker */ 519 /* we're below the limit, start another worker */
@@ -396,6 +521,28 @@ again:
396 goto again; 521 goto again;
397 } 522 }
398 } 523 }
524 goto found;
525
526fallback:
527 fallback = NULL;
528 /*
529 * we have failed to find any workers, just
530 * return the first one we can find.
531 */
532 if (!list_empty(&workers->worker_list))
533 fallback = workers->worker_list.next;
534 if (!list_empty(&workers->idle_list))
535 fallback = workers->idle_list.next;
536 BUG_ON(!fallback);
537 worker = list_entry(fallback,
538 struct btrfs_worker_thread, worker_list);
539found:
540 /*
541 * this makes sure the worker doesn't exit before it is placed
542 * onto a busy/idle list
543 */
544 atomic_inc(&worker->num_pending);
545 spin_unlock_irqrestore(&workers->lock, flags);
399 return worker; 546 return worker;
400} 547}
401 548
@@ -427,7 +574,7 @@ int btrfs_requeue_work(struct btrfs_work *work)
427 spin_lock(&worker->workers->lock); 574 spin_lock(&worker->workers->lock);
428 worker->idle = 0; 575 worker->idle = 0;
429 list_move_tail(&worker->worker_list, 576 list_move_tail(&worker->worker_list,
430 &worker->workers->worker_list); 577 &worker->workers->worker_list);
431 spin_unlock(&worker->workers->lock); 578 spin_unlock(&worker->workers->lock);
432 } 579 }
433 if (!worker->working) { 580 if (!worker->working) {
@@ -435,9 +582,9 @@ int btrfs_requeue_work(struct btrfs_work *work)
435 worker->working = 1; 582 worker->working = 1;
436 } 583 }
437 584
438 spin_unlock_irqrestore(&worker->lock, flags);
439 if (wake) 585 if (wake)
440 wake_up_process(worker->task); 586 wake_up_process(worker->task);
587 spin_unlock_irqrestore(&worker->lock, flags);
441out: 588out:
442 589
443 return 0; 590 return 0;
@@ -463,14 +610,18 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
463 610
464 worker = find_worker(workers); 611 worker = find_worker(workers);
465 if (workers->ordered) { 612 if (workers->ordered) {
466 spin_lock_irqsave(&workers->lock, flags); 613 /*
614 * you're not allowed to do ordered queues from an
615 * interrupt handler
616 */
617 spin_lock(&workers->order_lock);
467 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) { 618 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
468 list_add_tail(&work->order_list, 619 list_add_tail(&work->order_list,
469 &workers->prio_order_list); 620 &workers->prio_order_list);
470 } else { 621 } else {
471 list_add_tail(&work->order_list, &workers->order_list); 622 list_add_tail(&work->order_list, &workers->order_list);
472 } 623 }
473 spin_unlock_irqrestore(&workers->lock, flags); 624 spin_unlock(&workers->order_lock);
474 } else { 625 } else {
475 INIT_LIST_HEAD(&work->order_list); 626 INIT_LIST_HEAD(&work->order_list);
476 } 627 }
@@ -481,7 +632,6 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
481 list_add_tail(&work->list, &worker->prio_pending); 632 list_add_tail(&work->list, &worker->prio_pending);
482 else 633 else
483 list_add_tail(&work->list, &worker->pending); 634 list_add_tail(&work->list, &worker->pending);
484 atomic_inc(&worker->num_pending);
485 check_busy_worker(worker); 635 check_busy_worker(worker);
486 636
487 /* 637 /*
@@ -492,10 +642,10 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
492 wake = 1; 642 wake = 1;
493 worker->working = 1; 643 worker->working = 1;
494 644
495 spin_unlock_irqrestore(&worker->lock, flags);
496
497 if (wake) 645 if (wake)
498 wake_up_process(worker->task); 646 wake_up_process(worker->task);
647 spin_unlock_irqrestore(&worker->lock, flags);
648
499out: 649out:
500 return 0; 650 return 0;
501} 651}