aboutsummaryrefslogtreecommitdiffstats
path: root/fs/btrfs/async-thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/btrfs/async-thread.c')
-rw-r--r--fs/btrfs/async-thread.c419
1 files changed, 419 insertions, 0 deletions
diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c
new file mode 100644
index 000000000000..8e2fec05dbe0
--- /dev/null
+++ b/fs/btrfs/async-thread.c
@@ -0,0 +1,419 @@
1/*
2 * Copyright (C) 2007 Oracle. All rights reserved.
3 *
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public
6 * License v2 as published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public
14 * License along with this program; if not, write to the
15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
16 * Boston, MA 021110-1307, USA.
17 */
18
19#include <linux/version.h>
20#include <linux/kthread.h>
21#include <linux/list.h>
22#include <linux/spinlock.h>
23# include <linux/freezer.h>
24#include "async-thread.h"
25
26#define WORK_QUEUED_BIT 0
27#define WORK_DONE_BIT 1
28#define WORK_ORDER_DONE_BIT 2
29
30/*
31 * container for the kthread task pointer and the list of pending work
32 * One of these is allocated per thread.
33 */
34struct btrfs_worker_thread {
35 /* pool we belong to */
36 struct btrfs_workers *workers;
37
38 /* list of struct btrfs_work that are waiting for service */
39 struct list_head pending;
40
41 /* list of worker threads from struct btrfs_workers */
42 struct list_head worker_list;
43
44 /* kthread */
45 struct task_struct *task;
46
47 /* number of things on the pending list */
48 atomic_t num_pending;
49
50 unsigned long sequence;
51
52 /* protects the pending list. */
53 spinlock_t lock;
54
55 /* set to non-zero when this thread is already awake and kicking */
56 int working;
57
58 /* are we currently idle */
59 int idle;
60};
61
62/*
63 * helper function to move a thread onto the idle list after it
64 * has finished some requests.
65 */
66static void check_idle_worker(struct btrfs_worker_thread *worker)
67{
68 if (!worker->idle && atomic_read(&worker->num_pending) <
69 worker->workers->idle_thresh / 2) {
70 unsigned long flags;
71 spin_lock_irqsave(&worker->workers->lock, flags);
72 worker->idle = 1;
73 list_move(&worker->worker_list, &worker->workers->idle_list);
74 spin_unlock_irqrestore(&worker->workers->lock, flags);
75 }
76}
77
78/*
79 * helper function to move a thread off the idle list after new
80 * pending work is added.
81 */
82static void check_busy_worker(struct btrfs_worker_thread *worker)
83{
84 if (worker->idle && atomic_read(&worker->num_pending) >=
85 worker->workers->idle_thresh) {
86 unsigned long flags;
87 spin_lock_irqsave(&worker->workers->lock, flags);
88 worker->idle = 0;
89 list_move_tail(&worker->worker_list,
90 &worker->workers->worker_list);
91 spin_unlock_irqrestore(&worker->workers->lock, flags);
92 }
93}
94
95static noinline int run_ordered_completions(struct btrfs_workers *workers,
96 struct btrfs_work *work)
97{
98 unsigned long flags;
99
100 if (!workers->ordered)
101 return 0;
102
103 set_bit(WORK_DONE_BIT, &work->flags);
104
105 spin_lock_irqsave(&workers->lock, flags);
106
107 while (!list_empty(&workers->order_list)) {
108 work = list_entry(workers->order_list.next,
109 struct btrfs_work, order_list);
110
111 if (!test_bit(WORK_DONE_BIT, &work->flags))
112 break;
113
114 /* we are going to call the ordered done function, but
115 * we leave the work item on the list as a barrier so
116 * that later work items that are done don't have their
117 * functions called before this one returns
118 */
119 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
120 break;
121
122 spin_unlock_irqrestore(&workers->lock, flags);
123
124 work->ordered_func(work);
125
126 /* now take the lock again and call the freeing code */
127 spin_lock_irqsave(&workers->lock, flags);
128 list_del(&work->order_list);
129 work->ordered_free(work);
130 }
131
132 spin_unlock_irqrestore(&workers->lock, flags);
133 return 0;
134}
135
136/*
137 * main loop for servicing work items
138 */
139static int worker_loop(void *arg)
140{
141 struct btrfs_worker_thread *worker = arg;
142 struct list_head *cur;
143 struct btrfs_work *work;
144 do {
145 spin_lock_irq(&worker->lock);
146 while (!list_empty(&worker->pending)) {
147 cur = worker->pending.next;
148 work = list_entry(cur, struct btrfs_work, list);
149 list_del(&work->list);
150 clear_bit(WORK_QUEUED_BIT, &work->flags);
151
152 work->worker = worker;
153 spin_unlock_irq(&worker->lock);
154
155 work->func(work);
156
157 atomic_dec(&worker->num_pending);
158 /*
159 * unless this is an ordered work queue,
160 * 'work' was probably freed by func above.
161 */
162 run_ordered_completions(worker->workers, work);
163
164 spin_lock_irq(&worker->lock);
165 check_idle_worker(worker);
166
167 }
168 worker->working = 0;
169 if (freezing(current)) {
170 refrigerator();
171 } else {
172 set_current_state(TASK_INTERRUPTIBLE);
173 spin_unlock_irq(&worker->lock);
174 if (!kthread_should_stop())
175 schedule();
176 __set_current_state(TASK_RUNNING);
177 }
178 } while (!kthread_should_stop());
179 return 0;
180}
181
182/*
183 * this will wait for all the worker threads to shutdown
184 */
185int btrfs_stop_workers(struct btrfs_workers *workers)
186{
187 struct list_head *cur;
188 struct btrfs_worker_thread *worker;
189
190 list_splice_init(&workers->idle_list, &workers->worker_list);
191 while (!list_empty(&workers->worker_list)) {
192 cur = workers->worker_list.next;
193 worker = list_entry(cur, struct btrfs_worker_thread,
194 worker_list);
195 kthread_stop(worker->task);
196 list_del(&worker->worker_list);
197 kfree(worker);
198 }
199 return 0;
200}
201
202/*
203 * simple init on struct btrfs_workers
204 */
205void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
206{
207 workers->num_workers = 0;
208 INIT_LIST_HEAD(&workers->worker_list);
209 INIT_LIST_HEAD(&workers->idle_list);
210 INIT_LIST_HEAD(&workers->order_list);
211 spin_lock_init(&workers->lock);
212 workers->max_workers = max;
213 workers->idle_thresh = 32;
214 workers->name = name;
215 workers->ordered = 0;
216}
217
218/*
219 * starts new worker threads. This does not enforce the max worker
220 * count in case you need to temporarily go past it.
221 */
222int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
223{
224 struct btrfs_worker_thread *worker;
225 int ret = 0;
226 int i;
227
228 for (i = 0; i < num_workers; i++) {
229 worker = kzalloc(sizeof(*worker), GFP_NOFS);
230 if (!worker) {
231 ret = -ENOMEM;
232 goto fail;
233 }
234
235 INIT_LIST_HEAD(&worker->pending);
236 INIT_LIST_HEAD(&worker->worker_list);
237 spin_lock_init(&worker->lock);
238 atomic_set(&worker->num_pending, 0);
239 worker->task = kthread_run(worker_loop, worker,
240 "btrfs-%s-%d", workers->name,
241 workers->num_workers + i);
242 worker->workers = workers;
243 if (IS_ERR(worker->task)) {
244 kfree(worker);
245 ret = PTR_ERR(worker->task);
246 goto fail;
247 }
248
249 spin_lock_irq(&workers->lock);
250 list_add_tail(&worker->worker_list, &workers->idle_list);
251 worker->idle = 1;
252 workers->num_workers++;
253 spin_unlock_irq(&workers->lock);
254 }
255 return 0;
256fail:
257 btrfs_stop_workers(workers);
258 return ret;
259}
260
261/*
262 * run through the list and find a worker thread that doesn't have a lot
263 * to do right now. This can return null if we aren't yet at the thread
264 * count limit and all of the threads are busy.
265 */
266static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
267{
268 struct btrfs_worker_thread *worker;
269 struct list_head *next;
270 int enforce_min = workers->num_workers < workers->max_workers;
271
272 /*
273 * if we find an idle thread, don't move it to the end of the
274 * idle list. This improves the chance that the next submission
275 * will reuse the same thread, and maybe catch it while it is still
276 * working
277 */
278 if (!list_empty(&workers->idle_list)) {
279 next = workers->idle_list.next;
280 worker = list_entry(next, struct btrfs_worker_thread,
281 worker_list);
282 return worker;
283 }
284 if (enforce_min || list_empty(&workers->worker_list))
285 return NULL;
286
287 /*
288 * if we pick a busy task, move the task to the end of the list.
289 * hopefully this will keep things somewhat evenly balanced.
290 * Do the move in batches based on the sequence number. This groups
291 * requests submitted at roughly the same time onto the same worker.
292 */
293 next = workers->worker_list.next;
294 worker = list_entry(next, struct btrfs_worker_thread, worker_list);
295 atomic_inc(&worker->num_pending);
296 worker->sequence++;
297
298 if (worker->sequence % workers->idle_thresh == 0)
299 list_move_tail(next, &workers->worker_list);
300 return worker;
301}
302
303/*
304 * selects a worker thread to take the next job. This will either find
305 * an idle worker, start a new worker up to the max count, or just return
306 * one of the existing busy workers.
307 */
308static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
309{
310 struct btrfs_worker_thread *worker;
311 unsigned long flags;
312
313again:
314 spin_lock_irqsave(&workers->lock, flags);
315 worker = next_worker(workers);
316 spin_unlock_irqrestore(&workers->lock, flags);
317
318 if (!worker) {
319 spin_lock_irqsave(&workers->lock, flags);
320 if (workers->num_workers >= workers->max_workers) {
321 struct list_head *fallback = NULL;
322 /*
323 * we have failed to find any workers, just
324 * return the force one
325 */
326 if (!list_empty(&workers->worker_list))
327 fallback = workers->worker_list.next;
328 if (!list_empty(&workers->idle_list))
329 fallback = workers->idle_list.next;
330 BUG_ON(!fallback);
331 worker = list_entry(fallback,
332 struct btrfs_worker_thread, worker_list);
333 spin_unlock_irqrestore(&workers->lock, flags);
334 } else {
335 spin_unlock_irqrestore(&workers->lock, flags);
336 /* we're below the limit, start another worker */
337 btrfs_start_workers(workers, 1);
338 goto again;
339 }
340 }
341 return worker;
342}
343
344/*
345 * btrfs_requeue_work just puts the work item back on the tail of the list
346 * it was taken from. It is intended for use with long running work functions
347 * that make some progress and want to give the cpu up for others.
348 */
349int btrfs_requeue_work(struct btrfs_work *work)
350{
351 struct btrfs_worker_thread *worker = work->worker;
352 unsigned long flags;
353
354 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
355 goto out;
356
357 spin_lock_irqsave(&worker->lock, flags);
358 atomic_inc(&worker->num_pending);
359 list_add_tail(&work->list, &worker->pending);
360
361 /* by definition we're busy, take ourselves off the idle
362 * list
363 */
364 if (worker->idle) {
365 spin_lock_irqsave(&worker->workers->lock, flags);
366 worker->idle = 0;
367 list_move_tail(&worker->worker_list,
368 &worker->workers->worker_list);
369 spin_unlock_irqrestore(&worker->workers->lock, flags);
370 }
371
372 spin_unlock_irqrestore(&worker->lock, flags);
373
374out:
375 return 0;
376}
377
378/*
379 * places a struct btrfs_work into the pending queue of one of the kthreads
380 */
381int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
382{
383 struct btrfs_worker_thread *worker;
384 unsigned long flags;
385 int wake = 0;
386
387 /* don't requeue something already on a list */
388 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
389 goto out;
390
391 worker = find_worker(workers);
392 if (workers->ordered) {
393 spin_lock_irqsave(&workers->lock, flags);
394 list_add_tail(&work->order_list, &workers->order_list);
395 spin_unlock_irqrestore(&workers->lock, flags);
396 } else {
397 INIT_LIST_HEAD(&work->order_list);
398 }
399
400 spin_lock_irqsave(&worker->lock, flags);
401 atomic_inc(&worker->num_pending);
402 check_busy_worker(worker);
403 list_add_tail(&work->list, &worker->pending);
404
405 /*
406 * avoid calling into wake_up_process if this thread has already
407 * been kicked
408 */
409 if (!worker->working)
410 wake = 1;
411 worker->working = 1;
412
413 spin_unlock_irqrestore(&worker->lock, flags);
414
415 if (wake)
416 wake_up_process(worker->task);
417out:
418 return 0;
419}