aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/btrfs/Makefile2
-rw-r--r--fs/btrfs/async-thread.c288
-rw-r--r--fs/btrfs/async-thread.h78
-rw-r--r--fs/btrfs/ctree.h14
-rw-r--r--fs/btrfs/disk-io.c200
-rw-r--r--fs/btrfs/inode.c4
-rw-r--r--fs/btrfs/volumes.c162
-rw-r--r--fs/btrfs/volumes.h10
8 files changed, 626 insertions, 132 deletions
diff --git a/fs/btrfs/Makefile b/fs/btrfs/Makefile
index e5fc3cfea0ab..9dcfc2fe3332 100644
--- a/fs/btrfs/Makefile
+++ b/fs/btrfs/Makefile
@@ -6,7 +6,7 @@ btrfs-y := super.o ctree.o extent-tree.o print-tree.o root-tree.o dir-item.o \
6 hash.o file-item.o inode-item.o inode-map.o disk-io.o \ 6 hash.o file-item.o inode-item.o inode-map.o disk-io.o \
7 transaction.o bit-radix.o inode.o file.o tree-defrag.o \ 7 transaction.o bit-radix.o inode.o file.o tree-defrag.o \
8 extent_map.o sysfs.o struct-funcs.o xattr.o ordered-data.o \ 8 extent_map.o sysfs.o struct-funcs.o xattr.o ordered-data.o \
9 extent_io.o volumes.o 9 extent_io.o volumes.o async-thread.o
10 10
11btrfs-$(CONFIG_FS_POSIX_ACL) += acl.o 11btrfs-$(CONFIG_FS_POSIX_ACL) += acl.o
12else 12else
diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c
new file mode 100644
index 000000000000..2911b67bd6f7
--- /dev/null
+++ b/fs/btrfs/async-thread.c
@@ -0,0 +1,288 @@
1/*
2 * Copyright (C) 2007 Oracle. All rights reserved.
3 *
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public
6 * License v2 as published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public
14 * License along with this program; if not, write to the
15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
16 * Boston, MA 021110-1307, USA.
17 */
18
19#include <linux/kthread.h>
20#include <linux/list.h>
21#include <linux/spinlock.h>
22#include <linux/freezer.h>
23#include "async-thread.h"
24
25/*
26 * container for the kthread task pointer and the list of pending work
27 * One of these is allocated per thread.
28 */
29struct btrfs_worker_thread {
30 /* list of struct btrfs_work that are waiting for service */
31 struct list_head pending;
32
33 /* list of worker threads from struct btrfs_workers */
34 struct list_head worker_list;
35
36 /* kthread */
37 struct task_struct *task;
38
39 /* number of things on the pending list */
40 atomic_t num_pending;
41
42 /* protects the pending list. */
43 spinlock_t lock;
44
45 /* set to non-zero when this thread is already awake and kicking */
46 int working;
47};
48
49/*
50 * main loop for servicing work items
51 */
52static int worker_loop(void *arg)
53{
54 struct btrfs_worker_thread *worker = arg;
55 struct list_head *cur;
56 struct btrfs_work *work;
57 do {
58 spin_lock_irq(&worker->lock);
59 while(!list_empty(&worker->pending)) {
60 cur = worker->pending.next;
61 work = list_entry(cur, struct btrfs_work, list);
62 list_del(&work->list);
63 clear_bit(0, &work->flags);
64
65 work->worker = worker;
66 spin_unlock_irq(&worker->lock);
67
68 work->func(work);
69
70 atomic_dec(&worker->num_pending);
71 spin_lock_irq(&worker->lock);
72 }
73 worker->working = 0;
74 if (freezing(current)) {
75 refrigerator();
76 } else {
77 set_current_state(TASK_INTERRUPTIBLE);
78 spin_unlock_irq(&worker->lock);
79 schedule();
80 __set_current_state(TASK_RUNNING);
81 }
82 } while (!kthread_should_stop());
83 return 0;
84}
85
86/*
87 * this will wait for all the worker threads to shutdown
88 */
89int btrfs_stop_workers(struct btrfs_workers *workers)
90{
91 struct list_head *cur;
92 struct btrfs_worker_thread *worker;
93
94 while(!list_empty(&workers->worker_list)) {
95 cur = workers->worker_list.next;
96 worker = list_entry(cur, struct btrfs_worker_thread,
97 worker_list);
98 kthread_stop(worker->task);
99 list_del(&worker->worker_list);
100 kfree(worker);
101 }
102 return 0;
103}
104
105/*
106 * simple init on struct btrfs_workers
107 */
108void btrfs_init_workers(struct btrfs_workers *workers, int max)
109{
110 workers->num_workers = 0;
111 INIT_LIST_HEAD(&workers->worker_list);
112 workers->last = NULL;
113 spin_lock_init(&workers->lock);
114 workers->max_workers = max;
115}
116
117/*
118 * starts new worker threads. This does not enforce the max worker
119 * count in case you need to temporarily go past it.
120 */
121int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
122{
123 struct btrfs_worker_thread *worker;
124 int ret = 0;
125 int i;
126
127 for (i = 0; i < num_workers; i++) {
128 worker = kzalloc(sizeof(*worker), GFP_NOFS);
129 if (!worker) {
130 ret = -ENOMEM;
131 goto fail;
132 }
133
134 INIT_LIST_HEAD(&worker->pending);
135 INIT_LIST_HEAD(&worker->worker_list);
136 spin_lock_init(&worker->lock);
137 atomic_set(&worker->num_pending, 0);
138 worker->task = kthread_run(worker_loop, worker, "btrfs");
139 if (IS_ERR(worker->task)) {
140 ret = PTR_ERR(worker->task);
141 goto fail;
142 }
143
144 spin_lock_irq(&workers->lock);
145 list_add_tail(&worker->worker_list, &workers->worker_list);
146 workers->last = worker;
147 workers->num_workers++;
148 spin_unlock_irq(&workers->lock);
149 }
150 return 0;
151fail:
152 btrfs_stop_workers(workers);
153 return ret;
154}
155
156/*
157 * run through the list and find a worker thread that doesn't have a lot
158 * to do right now. This can return null if we aren't yet at the thread
159 * count limit and all of the threads are busy.
160 */
161static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
162{
163 struct btrfs_worker_thread *worker;
164 struct list_head *next;
165 struct list_head *start;
166 int enforce_min = workers->num_workers < workers->max_workers;
167
168 /* start with the last thread if it isn't busy */
169 worker = workers->last;
170 if (atomic_read(&worker->num_pending) < 64)
171 goto done;
172
173 next = worker->worker_list.next;
174 start = &worker->worker_list;
175
176 /*
177 * check all the workers for someone that is bored. FIXME, do
178 * something smart here
179 */
180 while(next != start) {
181 if (next == &workers->worker_list) {
182 next = workers->worker_list.next;
183 continue;
184 }
185 worker = list_entry(next, struct btrfs_worker_thread,
186 worker_list);
187 if (atomic_read(&worker->num_pending) < 64 || !enforce_min)
188 goto done;
189 next = next->next;
190 }
191 /*
192 * nobody was bored, if we're already at the max thread count,
193 * use the last thread
194 */
195 if (!enforce_min || atomic_read(&workers->last->num_pending) < 64) {
196 return workers->last;
197 }
198 return NULL;
199done:
200 workers->last = worker;
201 return worker;
202}
203
204static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
205{
206 struct btrfs_worker_thread *worker;
207 unsigned long flags;
208
209again:
210 spin_lock_irqsave(&workers->lock, flags);
211 worker = next_worker(workers);
212 spin_unlock_irqrestore(&workers->lock, flags);
213
214 if (!worker) {
215 spin_lock_irqsave(&workers->lock, flags);
216 if (workers->num_workers >= workers->max_workers) {
217 /*
218 * we have failed to find any workers, just
219 * return the force one
220 */
221 worker = list_entry(workers->worker_list.next,
222 struct btrfs_worker_thread, worker_list);
223 spin_unlock_irqrestore(&workers->lock, flags);
224 } else {
225 spin_unlock_irqrestore(&workers->lock, flags);
226 /* we're below the limit, start another worker */
227 btrfs_start_workers(workers, 1);
228 goto again;
229 }
230 }
231 return worker;
232}
233
234/*
235 * btrfs_requeue_work just puts the work item back on the tail of the list
236 * it was taken from. It is intended for use with long running work functions
237 * that make some progress and want to give the cpu up for others.
238 */
239int btrfs_requeue_work(struct btrfs_work *work)
240{
241 struct btrfs_worker_thread *worker = work->worker;
242 unsigned long flags;
243
244 if (test_and_set_bit(0, &work->flags))
245 goto out;
246
247 spin_lock_irqsave(&worker->lock, flags);
248 atomic_inc(&worker->num_pending);
249 list_add_tail(&work->list, &worker->pending);
250 spin_unlock_irqrestore(&worker->lock, flags);
251out:
252 return 0;
253}
254
255/*
256 * places a struct btrfs_work into the pending queue of one of the kthreads
257 */
258int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
259{
260 struct btrfs_worker_thread *worker;
261 unsigned long flags;
262 int wake = 0;
263
264 /* don't requeue something already on a list */
265 if (test_and_set_bit(0, &work->flags))
266 goto out;
267
268 worker = find_worker(workers);
269
270 spin_lock_irqsave(&worker->lock, flags);
271 atomic_inc(&worker->num_pending);
272 list_add_tail(&work->list, &worker->pending);
273
274 /*
275 * avoid calling into wake_up_process if this thread has already
276 * been kicked
277 */
278 if (!worker->working)
279 wake = 1;
280 worker->working = 1;
281
282 spin_unlock_irqrestore(&worker->lock, flags);
283
284 if (wake)
285 wake_up_process(worker->task);
286out:
287 return 0;
288}
diff --git a/fs/btrfs/async-thread.h b/fs/btrfs/async-thread.h
new file mode 100644
index 000000000000..52fc9da0f9e7
--- /dev/null
+++ b/fs/btrfs/async-thread.h
@@ -0,0 +1,78 @@
1/*
2 * Copyright (C) 2007 Oracle. All rights reserved.
3 *
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public
6 * License v2 as published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public
14 * License along with this program; if not, write to the
15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
16 * Boston, MA 021110-1307, USA.
17 */
18
19#ifndef __BTRFS_ASYNC_THREAD_
20#define __BTRFS_ASYNC_THREAD_
21
22struct btrfs_worker_thread;
23
24/*
25 * This is similar to a workqueue, but it is meant to spread the operations
26 * across all available cpus instead of just the CPU that was used to
27 * queue the work. There is also some batching introduced to try and
28 * cut down on context switches.
29 *
30 * By default threads are added on demand up to 2 * the number of cpus.
31 * Changing struct btrfs_workers->max_workers is one way to prevent
32 * demand creation of kthreads.
33 *
34 * the basic model of these worker threads is to embed a btrfs_work
35 * structure in your own data struct, and use container_of in a
36 * work function to get back to your data struct.
37 */
38struct btrfs_work {
39 /*
40 * only func should be set to the function you want called
41 * your work struct is passed as the only arg
42 */
43 void (*func)(struct btrfs_work *work);
44
45 /*
46 * flags should be set to zero. It is used to make sure the
47 * struct is only inserted once into the list.
48 */
49 unsigned long flags;
50
51 /* don't touch these */
52 struct btrfs_worker_thread *worker;
53 struct list_head list;
54};
55
56struct btrfs_workers {
57 /* current number of running workers */
58 int num_workers;
59
60 /* max number of workers allowed. changed by btrfs_start_workers */
61 int max_workers;
62
63 /* list with all the work threads */
64 struct list_head worker_list;
65
66 /* the last worker thread to have something queued */
67 struct btrfs_worker_thread *last;
68
69 /* lock for finding the next worker thread to queue on */
70 spinlock_t lock;
71};
72
73int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work);
74int btrfs_start_workers(struct btrfs_workers *workers, int num_workers);
75int btrfs_stop_workers(struct btrfs_workers *workers);
76void btrfs_init_workers(struct btrfs_workers *workers, int max);
77int btrfs_requeue_work(struct btrfs_work *work);
78#endif
diff --git a/fs/btrfs/ctree.h b/fs/btrfs/ctree.h
index 49cbc62b42f9..6c91a510c965 100644
--- a/fs/btrfs/ctree.h
+++ b/fs/btrfs/ctree.h
@@ -30,6 +30,7 @@
30#include "bit-radix.h" 30#include "bit-radix.h"
31#include "extent_io.h" 31#include "extent_io.h"
32#include "extent_map.h" 32#include "extent_map.h"
33#include "async-thread.h"
33 34
34struct btrfs_trans_handle; 35struct btrfs_trans_handle;
35struct btrfs_transaction; 36struct btrfs_transaction;
@@ -518,13 +519,20 @@ struct btrfs_fs_info {
518 struct list_head hashers; 519 struct list_head hashers;
519 struct list_head dead_roots; 520 struct list_head dead_roots;
520 struct list_head end_io_work_list; 521 struct list_head end_io_work_list;
521 struct list_head async_submit_work_list;
522 struct work_struct end_io_work; 522 struct work_struct end_io_work;
523 struct work_struct async_submit_work;
524 spinlock_t end_io_work_lock; 523 spinlock_t end_io_work_lock;
525 spinlock_t async_submit_work_lock;
526 atomic_t nr_async_submits; 524 atomic_t nr_async_submits;
527 525
526 /*
527 * there is a pool of worker threads for checksumming during writes
528 * and a pool for checksumming after reads. This is because readers
529 * can run with FS locks held, and the writers may be waiting for
530 * those locks. We don't want ordering in the pending list to cause
531 * deadlocks, and so the two are serviced separately.
532 */
533 struct btrfs_workers workers;
534 struct btrfs_workers endio_workers;
535
528#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) 536#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
529 struct work_struct trans_work; 537 struct work_struct trans_work;
530#else 538#else
diff --git a/fs/btrfs/disk-io.c b/fs/btrfs/disk-io.c
index b9a53646ceb2..98ff4fbcb386 100644
--- a/fs/btrfs/disk-io.c
+++ b/fs/btrfs/disk-io.c
@@ -31,6 +31,7 @@
31#include "btrfs_inode.h" 31#include "btrfs_inode.h"
32#include "volumes.h" 32#include "volumes.h"
33#include "print-tree.h" 33#include "print-tree.h"
34#include "async-thread.h"
34 35
35#if 0 36#if 0
36static int check_tree_block(struct btrfs_root *root, struct extent_buffer *buf) 37static int check_tree_block(struct btrfs_root *root, struct extent_buffer *buf)
@@ -46,8 +47,7 @@ static int check_tree_block(struct btrfs_root *root, struct extent_buffer *buf)
46#endif 47#endif
47 48
48static struct extent_io_ops btree_extent_io_ops; 49static struct extent_io_ops btree_extent_io_ops;
49static struct workqueue_struct *end_io_workqueue; 50static void end_workqueue_fn(struct btrfs_work *work);
50static struct workqueue_struct *async_submit_workqueue;
51 51
52struct end_io_wq { 52struct end_io_wq {
53 struct bio *bio; 53 struct bio *bio;
@@ -57,6 +57,7 @@ struct end_io_wq {
57 int error; 57 int error;
58 int metadata; 58 int metadata;
59 struct list_head list; 59 struct list_head list;
60 struct btrfs_work work;
60}; 61};
61 62
62struct async_submit_bio { 63struct async_submit_bio {
@@ -66,6 +67,7 @@ struct async_submit_bio {
66 extent_submit_bio_hook_t *submit_bio_hook; 67 extent_submit_bio_hook_t *submit_bio_hook;
67 int rw; 68 int rw;
68 int mirror_num; 69 int mirror_num;
70 struct btrfs_work work;
69}; 71};
70 72
71struct extent_map *btree_get_extent(struct inode *inode, struct page *page, 73struct extent_map *btree_get_extent(struct inode *inode, struct page *page,
@@ -389,7 +391,6 @@ static int end_workqueue_bio(struct bio *bio,
389{ 391{
390 struct end_io_wq *end_io_wq = bio->bi_private; 392 struct end_io_wq *end_io_wq = bio->bi_private;
391 struct btrfs_fs_info *fs_info; 393 struct btrfs_fs_info *fs_info;
392 unsigned long flags;
393 394
394#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23) 395#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)
395 if (bio->bi_size) 396 if (bio->bi_size)
@@ -397,11 +398,10 @@ static int end_workqueue_bio(struct bio *bio,
397#endif 398#endif
398 399
399 fs_info = end_io_wq->info; 400 fs_info = end_io_wq->info;
400 spin_lock_irqsave(&fs_info->end_io_work_lock, flags);
401 end_io_wq->error = err; 401 end_io_wq->error = err;
402 list_add_tail(&end_io_wq->list, &fs_info->end_io_work_list); 402 end_io_wq->work.func = end_workqueue_fn;
403 spin_unlock_irqrestore(&fs_info->end_io_work_lock, flags); 403 end_io_wq->work.flags = 0;
404 queue_work(end_io_workqueue, &fs_info->end_io_work); 404 btrfs_queue_worker(&fs_info->endio_workers, &end_io_wq->work);
405 405
406#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23) 406#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)
407 return 0; 407 return 0;
@@ -428,6 +428,19 @@ int btrfs_bio_wq_end_io(struct btrfs_fs_info *info, struct bio *bio,
428 return 0; 428 return 0;
429} 429}
430 430
431static void run_one_async_submit(struct btrfs_work *work)
432{
433 struct btrfs_fs_info *fs_info;
434 struct async_submit_bio *async;
435
436 async = container_of(work, struct async_submit_bio, work);
437 fs_info = BTRFS_I(async->inode)->root->fs_info;
438 atomic_dec(&fs_info->nr_async_submits);
439 async->submit_bio_hook(async->inode, async->rw, async->bio,
440 async->mirror_num);
441 kfree(async);
442}
443
431int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode, 444int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
432 int rw, struct bio *bio, int mirror_num, 445 int rw, struct bio *bio, int mirror_num,
433 extent_submit_bio_hook_t *submit_bio_hook) 446 extent_submit_bio_hook_t *submit_bio_hook)
@@ -443,13 +456,10 @@ int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
443 async->bio = bio; 456 async->bio = bio;
444 async->mirror_num = mirror_num; 457 async->mirror_num = mirror_num;
445 async->submit_bio_hook = submit_bio_hook; 458 async->submit_bio_hook = submit_bio_hook;
446 459 async->work.func = run_one_async_submit;
447 spin_lock(&fs_info->async_submit_work_lock); 460 async->work.flags = 0;
448 list_add_tail(&async->list, &fs_info->async_submit_work_list);
449 atomic_inc(&fs_info->nr_async_submits); 461 atomic_inc(&fs_info->nr_async_submits);
450 spin_unlock(&fs_info->async_submit_work_lock); 462 btrfs_queue_worker(&fs_info->workers, &async->work);
451
452 queue_work(async_submit_workqueue, &fs_info->async_submit_work);
453 return 0; 463 return 0;
454} 464}
455 465
@@ -462,19 +472,32 @@ static int __btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
462 472
463 offset = bio->bi_sector << 9; 473 offset = bio->bi_sector << 9;
464 474
475 /*
476 * when we're called for a write, we're already in the async
477 * submission context. Just jump ingo btrfs_map_bio
478 */
465 if (rw & (1 << BIO_RW)) { 479 if (rw & (1 << BIO_RW)) {
466 return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num); 480 return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio,
481 mirror_num, 0);
467 } 482 }
468 483
484 /*
485 * called for a read, do the setup so that checksum validation
486 * can happen in the async kernel threads
487 */
469 ret = btrfs_bio_wq_end_io(root->fs_info, bio, 1); 488 ret = btrfs_bio_wq_end_io(root->fs_info, bio, 1);
470 BUG_ON(ret); 489 BUG_ON(ret);
471 490
472 return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num); 491 return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num, 1);
473} 492}
474 493
475static int btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, 494static int btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
476 int mirror_num) 495 int mirror_num)
477{ 496{
497 /*
498 * kthread helpers are used to submit writes so that checksumming
499 * can happen in parallel across all CPUs
500 */
478 if (!(rw & (1 << BIO_RW))) { 501 if (!(rw & (1 << BIO_RW))) {
479 return __btree_submit_bio_hook(inode, rw, bio, mirror_num); 502 return __btree_submit_bio_hook(inode, rw, bio, mirror_num);
480 } 503 }
@@ -1036,95 +1059,40 @@ static int bio_ready_for_csum(struct bio *bio)
1036 return ret; 1059 return ret;
1037} 1060}
1038 1061
1039#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) 1062/*
1040static void btrfs_end_io_csum(void *p) 1063 * called by the kthread helper functions to finally call the bio end_io
1041#else 1064 * functions. This is where read checksum verification actually happens
1042static void btrfs_end_io_csum(struct work_struct *work) 1065 */
1043#endif 1066static void end_workqueue_fn(struct btrfs_work *work)
1044{ 1067{
1045#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
1046 struct btrfs_fs_info *fs_info = p;
1047#else
1048 struct btrfs_fs_info *fs_info = container_of(work,
1049 struct btrfs_fs_info,
1050 end_io_work);
1051#endif
1052 unsigned long flags;
1053 struct end_io_wq *end_io_wq;
1054 struct bio *bio; 1068 struct bio *bio;
1055 struct list_head *next; 1069 struct end_io_wq *end_io_wq;
1070 struct btrfs_fs_info *fs_info;
1056 int error; 1071 int error;
1057 int was_empty;
1058 1072
1059 while(1) { 1073 end_io_wq = container_of(work, struct end_io_wq, work);
1060 spin_lock_irqsave(&fs_info->end_io_work_lock, flags); 1074 bio = end_io_wq->bio;
1061 if (list_empty(&fs_info->end_io_work_list)) { 1075 fs_info = end_io_wq->info;
1062 spin_unlock_irqrestore(&fs_info->end_io_work_lock,
1063 flags);
1064 return;
1065 }
1066 next = fs_info->end_io_work_list.next;
1067 list_del(next);
1068 spin_unlock_irqrestore(&fs_info->end_io_work_lock, flags);
1069
1070 end_io_wq = list_entry(next, struct end_io_wq, list);
1071
1072 bio = end_io_wq->bio;
1073 if (end_io_wq->metadata && !bio_ready_for_csum(bio)) {
1074 spin_lock_irqsave(&fs_info->end_io_work_lock, flags);
1075 was_empty = list_empty(&fs_info->end_io_work_list);
1076 list_add_tail(&end_io_wq->list,
1077 &fs_info->end_io_work_list);
1078 spin_unlock_irqrestore(&fs_info->end_io_work_lock,
1079 flags);
1080 if (was_empty)
1081 return;
1082 continue;
1083 }
1084 error = end_io_wq->error;
1085 bio->bi_private = end_io_wq->private;
1086 bio->bi_end_io = end_io_wq->end_io;
1087 kfree(end_io_wq);
1088#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)
1089 bio_endio(bio, bio->bi_size, error);
1090#else
1091 bio_endio(bio, error);
1092#endif
1093 }
1094}
1095 1076
1096#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) 1077 /* metadata bios are special because the whole tree block must
1097static void btrfs_async_submit_work(void *p) 1078 * be checksummed at once. This makes sure the entire block is in
1098#else 1079 * ram and up to date before trying to verify things. For
1099static void btrfs_async_submit_work(struct work_struct *work) 1080 * blocksize <= pagesize, it is basically a noop
1100#endif 1081 */
1101{ 1082 if (end_io_wq->metadata && !bio_ready_for_csum(bio)) {
1102#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) 1083 btrfs_queue_worker(&fs_info->endio_workers,
1103 struct btrfs_fs_info *fs_info = p; 1084 &end_io_wq->work);
1085 return;
1086 }
1087 error = end_io_wq->error;
1088 bio->bi_private = end_io_wq->private;
1089 bio->bi_end_io = end_io_wq->end_io;
1090 kfree(end_io_wq);
1091#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)
1092 bio_endio(bio, bio->bi_size, error);
1104#else 1093#else
1105 struct btrfs_fs_info *fs_info = container_of(work, 1094 bio_endio(bio, error);
1106 struct btrfs_fs_info,
1107 async_submit_work);
1108#endif 1095#endif
1109 struct async_submit_bio *async;
1110 struct list_head *next;
1111
1112 while(1) {
1113 spin_lock(&fs_info->async_submit_work_lock);
1114 if (list_empty(&fs_info->async_submit_work_list)) {
1115 spin_unlock(&fs_info->async_submit_work_lock);
1116 return;
1117 }
1118 next = fs_info->async_submit_work_list.next;
1119 list_del(next);
1120 atomic_dec(&fs_info->nr_async_submits);
1121 spin_unlock(&fs_info->async_submit_work_lock);
1122
1123 async = list_entry(next, struct async_submit_bio, list);
1124 async->submit_bio_hook(async->inode, async->rw, async->bio,
1125 async->mirror_num);
1126 kfree(async);
1127 }
1128} 1096}
1129 1097
1130struct btrfs_root *open_ctree(struct super_block *sb, 1098struct btrfs_root *open_ctree(struct super_block *sb,
@@ -1155,19 +1123,11 @@ struct btrfs_root *open_ctree(struct super_block *sb,
1155 err = -ENOMEM; 1123 err = -ENOMEM;
1156 goto fail; 1124 goto fail;
1157 } 1125 }
1158 end_io_workqueue = create_workqueue("btrfs-end-io");
1159 BUG_ON(!end_io_workqueue);
1160 async_submit_workqueue = create_workqueue("btrfs-async-submit");
1161
1162 INIT_RADIX_TREE(&fs_info->fs_roots_radix, GFP_NOFS); 1126 INIT_RADIX_TREE(&fs_info->fs_roots_radix, GFP_NOFS);
1163 INIT_LIST_HEAD(&fs_info->trans_list); 1127 INIT_LIST_HEAD(&fs_info->trans_list);
1164 INIT_LIST_HEAD(&fs_info->dead_roots); 1128 INIT_LIST_HEAD(&fs_info->dead_roots);
1165 INIT_LIST_HEAD(&fs_info->hashers); 1129 INIT_LIST_HEAD(&fs_info->hashers);
1166 INIT_LIST_HEAD(&fs_info->end_io_work_list);
1167 INIT_LIST_HEAD(&fs_info->async_submit_work_list);
1168 spin_lock_init(&fs_info->hash_lock); 1130 spin_lock_init(&fs_info->hash_lock);
1169 spin_lock_init(&fs_info->end_io_work_lock);
1170 spin_lock_init(&fs_info->async_submit_work_lock);
1171 spin_lock_init(&fs_info->delalloc_lock); 1131 spin_lock_init(&fs_info->delalloc_lock);
1172 spin_lock_init(&fs_info->new_trans_lock); 1132 spin_lock_init(&fs_info->new_trans_lock);
1173 1133
@@ -1222,13 +1182,8 @@ struct btrfs_root *open_ctree(struct super_block *sb,
1222 fs_info->do_barriers = 1; 1182 fs_info->do_barriers = 1;
1223 1183
1224#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) 1184#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
1225 INIT_WORK(&fs_info->end_io_work, btrfs_end_io_csum, fs_info);
1226 INIT_WORK(&fs_info->async_submit_work, btrfs_async_submit_work,
1227 fs_info);
1228 INIT_WORK(&fs_info->trans_work, btrfs_transaction_cleaner, fs_info); 1185 INIT_WORK(&fs_info->trans_work, btrfs_transaction_cleaner, fs_info);
1229#else 1186#else
1230 INIT_WORK(&fs_info->end_io_work, btrfs_end_io_csum);
1231 INIT_WORK(&fs_info->async_submit_work, btrfs_async_submit_work);
1232 INIT_DELAYED_WORK(&fs_info->trans_work, btrfs_transaction_cleaner); 1187 INIT_DELAYED_WORK(&fs_info->trans_work, btrfs_transaction_cleaner);
1233#endif 1188#endif
1234 BTRFS_I(fs_info->btree_inode)->root = tree_root; 1189 BTRFS_I(fs_info->btree_inode)->root = tree_root;
@@ -1240,6 +1195,19 @@ struct btrfs_root *open_ctree(struct super_block *sb,
1240 mutex_init(&fs_info->trans_mutex); 1195 mutex_init(&fs_info->trans_mutex);
1241 mutex_init(&fs_info->fs_mutex); 1196 mutex_init(&fs_info->fs_mutex);
1242 1197
1198 /* we need to start all the end_io workers up front because the
1199 * queue work function gets called at interrupt time. The endio
1200 * workers don't normally start IO, so some number of them <= the
1201 * number of cpus is fine. They handle checksumming after a read.
1202 *
1203 * The other worker threads do start IO, so the max is larger than
1204 * the number of CPUs. FIXME, tune this for huge machines
1205 */
1206 btrfs_init_workers(&fs_info->workers, num_online_cpus() * 2);
1207 btrfs_init_workers(&fs_info->endio_workers, num_online_cpus());
1208 btrfs_start_workers(&fs_info->workers, 1);
1209 btrfs_start_workers(&fs_info->endio_workers, num_online_cpus());
1210
1243#if 0 1211#if 0
1244 ret = add_hasher(fs_info, "crc32c"); 1212 ret = add_hasher(fs_info, "crc32c");
1245 if (ret) { 1213 if (ret) {
@@ -1375,6 +1343,8 @@ fail_sb_buffer:
1375 extent_io_tree_empty_lru(&BTRFS_I(fs_info->btree_inode)->io_tree); 1343 extent_io_tree_empty_lru(&BTRFS_I(fs_info->btree_inode)->io_tree);
1376fail_iput: 1344fail_iput:
1377 iput(fs_info->btree_inode); 1345 iput(fs_info->btree_inode);
1346 btrfs_stop_workers(&fs_info->workers);
1347 btrfs_stop_workers(&fs_info->endio_workers);
1378fail: 1348fail:
1379 btrfs_close_devices(fs_info->fs_devices); 1349 btrfs_close_devices(fs_info->fs_devices);
1380 btrfs_mapping_tree_free(&fs_info->mapping_tree); 1350 btrfs_mapping_tree_free(&fs_info->mapping_tree);
@@ -1623,16 +1593,10 @@ int close_ctree(struct btrfs_root *root)
1623 extent_io_tree_empty_lru(&fs_info->extent_ins); 1593 extent_io_tree_empty_lru(&fs_info->extent_ins);
1624 extent_io_tree_empty_lru(&BTRFS_I(fs_info->btree_inode)->io_tree); 1594 extent_io_tree_empty_lru(&BTRFS_I(fs_info->btree_inode)->io_tree);
1625 1595
1626 flush_workqueue(async_submit_workqueue);
1627 flush_workqueue(end_io_workqueue);
1628
1629 truncate_inode_pages(fs_info->btree_inode->i_mapping, 0); 1596 truncate_inode_pages(fs_info->btree_inode->i_mapping, 0);
1630 1597
1631 flush_workqueue(async_submit_workqueue); 1598 btrfs_stop_workers(&fs_info->workers);
1632 destroy_workqueue(async_submit_workqueue); 1599 btrfs_stop_workers(&fs_info->endio_workers);
1633
1634 flush_workqueue(end_io_workqueue);
1635 destroy_workqueue(end_io_workqueue);
1636 1600
1637 iput(fs_info->btree_inode); 1601 iput(fs_info->btree_inode);
1638#if 0 1602#if 0
diff --git a/fs/btrfs/inode.c b/fs/btrfs/inode.c
index 0f14697becef..7daef8d37006 100644
--- a/fs/btrfs/inode.c
+++ b/fs/btrfs/inode.c
@@ -359,7 +359,7 @@ int __btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
359 359
360 kfree(sums); 360 kfree(sums);
361 361
362 return btrfs_map_bio(root, rw, bio, mirror_num); 362 return btrfs_map_bio(root, rw, bio, mirror_num, 1);
363} 363}
364 364
365int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, 365int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
@@ -383,7 +383,7 @@ int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
383 inode, rw, bio, mirror_num, 383 inode, rw, bio, mirror_num,
384 __btrfs_submit_bio_hook); 384 __btrfs_submit_bio_hook);
385mapit: 385mapit:
386 return btrfs_map_bio(root, rw, bio, mirror_num); 386 return btrfs_map_bio(root, rw, bio, mirror_num, 0);
387} 387}
388 388
389int btrfs_readpage_io_hook(struct page *page, u64 start, u64 end) 389int btrfs_readpage_io_hook(struct page *page, u64 start, u64 end)
diff --git a/fs/btrfs/volumes.c b/fs/btrfs/volumes.c
index 722eb4550154..c57458ce6339 100644
--- a/fs/btrfs/volumes.c
+++ b/fs/btrfs/volumes.c
@@ -27,6 +27,7 @@
27#include "transaction.h" 27#include "transaction.h"
28#include "print-tree.h" 28#include "print-tree.h"
29#include "volumes.h" 29#include "volumes.h"
30#include "async-thread.h"
30 31
31struct map_lookup { 32struct map_lookup {
32 u64 type; 33 u64 type;
@@ -110,6 +111,101 @@ static struct btrfs_fs_devices *find_fsid(u8 *fsid)
110 return NULL; 111 return NULL;
111} 112}
112 113
114/*
115 * we try to collect pending bios for a device so we don't get a large
116 * number of procs sending bios down to the same device. This greatly
117 * improves the schedulers ability to collect and merge the bios.
118 *
119 * But, it also turns into a long list of bios to process and that is sure
120 * to eventually make the worker thread block. The solution here is to
121 * make some progress and then put this work struct back at the end of
122 * the list if the block device is congested. This way, multiple devices
123 * can make progress from a single worker thread.
124 */
125int run_scheduled_bios(struct btrfs_device *device)
126{
127 struct bio *pending;
128 struct backing_dev_info *bdi;
129 struct bio *tail;
130 struct bio *cur;
131 int again = 0;
132 unsigned long num_run = 0;
133
134 bdi = device->bdev->bd_inode->i_mapping->backing_dev_info;
135loop:
136 spin_lock(&device->io_lock);
137
138 /* take all the bios off the list at once and process them
139 * later on (without the lock held). But, remember the
140 * tail and other pointers so the bios can be properly reinserted
141 * into the list if we hit congestion
142 */
143 pending = device->pending_bios;
144 tail = device->pending_bio_tail;
145 WARN_ON(pending && !tail);
146 device->pending_bios = NULL;
147 device->pending_bio_tail = NULL;
148
149 /*
150 * if pending was null this time around, no bios need processing
151 * at all and we can stop. Otherwise it'll loop back up again
152 * and do an additional check so no bios are missed.
153 *
154 * device->running_pending is used to synchronize with the
155 * schedule_bio code.
156 */
157 if (pending) {
158 again = 1;
159 device->running_pending = 1;
160 } else {
161 again = 0;
162 device->running_pending = 0;
163 }
164 spin_unlock(&device->io_lock);
165
166 while(pending) {
167 cur = pending;
168 pending = pending->bi_next;
169 cur->bi_next = NULL;
170 atomic_dec(&device->dev_root->fs_info->nr_async_submits);
171 submit_bio(cur->bi_rw, cur);
172 num_run++;
173
174 /*
175 * we made progress, there is more work to do and the bdi
176 * is now congested. Back off and let other work structs
177 * run instead
178 */
179 if (pending && num_run && bdi_write_congested(bdi)) {
180 struct bio *old_head;
181
182 spin_lock(&device->io_lock);
183 old_head = device->pending_bios;
184 device->pending_bios = pending;
185 if (device->pending_bio_tail)
186 tail->bi_next = old_head;
187 else
188 device->pending_bio_tail = tail;
189
190 spin_unlock(&device->io_lock);
191 btrfs_requeue_work(&device->work);
192 goto done;
193 }
194 }
195 if (again)
196 goto loop;
197done:
198 return 0;
199}
200
201void pending_bios_fn(struct btrfs_work *work)
202{
203 struct btrfs_device *device;
204
205 device = container_of(work, struct btrfs_device, work);
206 run_scheduled_bios(device);
207}
208
113static int device_list_add(const char *path, 209static int device_list_add(const char *path,
114 struct btrfs_super_block *disk_super, 210 struct btrfs_super_block *disk_super,
115 u64 devid, struct btrfs_fs_devices **fs_devices_ret) 211 u64 devid, struct btrfs_fs_devices **fs_devices_ret)
@@ -141,6 +237,7 @@ static int device_list_add(const char *path,
141 return -ENOMEM; 237 return -ENOMEM;
142 } 238 }
143 device->devid = devid; 239 device->devid = devid;
240 device->work.func = pending_bios_fn;
144 memcpy(device->uuid, disk_super->dev_item.uuid, 241 memcpy(device->uuid, disk_super->dev_item.uuid,
145 BTRFS_UUID_SIZE); 242 BTRFS_UUID_SIZE);
146 device->barriers = 1; 243 device->barriers = 1;
@@ -925,6 +1022,7 @@ int btrfs_init_new_device(struct btrfs_root *root, char *device_path)
925 } 1022 }
926 1023
927 device->barriers = 1; 1024 device->barriers = 1;
1025 device->work.func = pending_bios_fn;
928 generate_random_uuid(device->uuid); 1026 generate_random_uuid(device->uuid);
929 spin_lock_init(&device->io_lock); 1027 spin_lock_init(&device->io_lock);
930 device->name = kstrdup(device_path, GFP_NOFS); 1028 device->name = kstrdup(device_path, GFP_NOFS);
@@ -1965,8 +2063,61 @@ static int end_bio_multi_stripe(struct bio *bio,
1965#endif 2063#endif
1966} 2064}
1967 2065
2066struct async_sched {
2067 struct bio *bio;
2068 int rw;
2069 struct btrfs_fs_info *info;
2070 struct btrfs_work work;
2071};
2072
2073/*
2074 * see run_scheduled_bios for a description of why bios are collected for
2075 * async submit.
2076 *
2077 * This will add one bio to the pending list for a device and make sure
2078 * the work struct is scheduled.
2079 */
2080int schedule_bio(struct btrfs_root *root, struct btrfs_device *device,
2081 int rw, struct bio *bio)
2082{
2083 int should_queue = 1;
2084
2085 /* don't bother with additional async steps for reads, right now */
2086 if (!(rw & (1 << BIO_RW))) {
2087 submit_bio(rw, bio);
2088 return 0;
2089 }
2090
2091 /*
2092 * nr_async_sumbits allows us to reliably return congestion to the
2093 * higher layers. Otherwise, the async bio makes it appear we have
2094 * made progress against dirty pages when we've really just put it
2095 * on a queue for later
2096 */
2097 atomic_inc(&root->fs_info->nr_async_submits);
2098 bio->bi_next = NULL;
2099 bio->bi_rw |= rw;
2100
2101 spin_lock(&device->io_lock);
2102
2103 if (device->pending_bio_tail)
2104 device->pending_bio_tail->bi_next = bio;
2105
2106 device->pending_bio_tail = bio;
2107 if (!device->pending_bios)
2108 device->pending_bios = bio;
2109 if (device->running_pending)
2110 should_queue = 0;
2111
2112 spin_unlock(&device->io_lock);
2113
2114 if (should_queue)
2115 btrfs_queue_worker(&root->fs_info->workers, &device->work);
2116 return 0;
2117}
2118
1968int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio, 2119int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio,
1969 int mirror_num) 2120 int mirror_num, int async_submit)
1970{ 2121{
1971 struct btrfs_mapping_tree *map_tree; 2122 struct btrfs_mapping_tree *map_tree;
1972 struct btrfs_device *dev; 2123 struct btrfs_device *dev;
@@ -2012,10 +2163,10 @@ int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio,
2012 dev = multi->stripes[dev_nr].dev; 2163 dev = multi->stripes[dev_nr].dev;
2013 if (dev && dev->bdev) { 2164 if (dev && dev->bdev) {
2014 bio->bi_bdev = dev->bdev; 2165 bio->bi_bdev = dev->bdev;
2015 spin_lock(&dev->io_lock); 2166 if (async_submit)
2016 dev->total_ios++; 2167 schedule_bio(root, dev, rw, bio);
2017 spin_unlock(&dev->io_lock); 2168 else
2018 submit_bio(rw, bio); 2169 submit_bio(rw, bio);
2019 } else { 2170 } else {
2020 bio->bi_bdev = root->fs_info->fs_devices->latest_bdev; 2171 bio->bi_bdev = root->fs_info->fs_devices->latest_bdev;
2021 bio->bi_sector = logical >> 9; 2172 bio->bi_sector = logical >> 9;
@@ -2054,6 +2205,7 @@ static struct btrfs_device *add_missing_dev(struct btrfs_root *root,
2054 device->barriers = 1; 2205 device->barriers = 1;
2055 device->dev_root = root->fs_info->dev_root; 2206 device->dev_root = root->fs_info->dev_root;
2056 device->devid = devid; 2207 device->devid = devid;
2208 device->work.func = pending_bios_fn;
2057 fs_devices->num_devices++; 2209 fs_devices->num_devices++;
2058 spin_lock_init(&device->io_lock); 2210 spin_lock_init(&device->io_lock);
2059 memcpy(device->uuid, dev_uuid, BTRFS_UUID_SIZE); 2211 memcpy(device->uuid, dev_uuid, BTRFS_UUID_SIZE);
diff --git a/fs/btrfs/volumes.h b/fs/btrfs/volumes.h
index 4df6b1608f91..48a44f7a9385 100644
--- a/fs/btrfs/volumes.h
+++ b/fs/btrfs/volumes.h
@@ -20,6 +20,7 @@
20#define __BTRFS_VOLUMES_ 20#define __BTRFS_VOLUMES_
21 21
22#include <linux/bio.h> 22#include <linux/bio.h>
23#include "async-thread.h"
23 24
24struct buffer_head; 25struct buffer_head;
25struct btrfs_device { 26struct btrfs_device {
@@ -27,6 +28,9 @@ struct btrfs_device {
27 struct list_head dev_alloc_list; 28 struct list_head dev_alloc_list;
28 struct btrfs_root *dev_root; 29 struct btrfs_root *dev_root;
29 struct buffer_head *pending_io; 30 struct buffer_head *pending_io;
31 struct bio *pending_bios;
32 struct bio *pending_bio_tail;
33 int running_pending;
30 u64 generation; 34 u64 generation;
31 35
32 int barriers; 36 int barriers;
@@ -36,8 +40,6 @@ struct btrfs_device {
36 40
37 struct block_device *bdev; 41 struct block_device *bdev;
38 42
39 u64 total_ios;
40
41 char *name; 43 char *name;
42 44
43 /* the internal btrfs device id */ 45 /* the internal btrfs device id */
@@ -63,6 +65,8 @@ struct btrfs_device {
63 65
64 /* physical drive uuid (or lvm uuid) */ 66 /* physical drive uuid (or lvm uuid) */
65 u8 uuid[BTRFS_UUID_SIZE]; 67 u8 uuid[BTRFS_UUID_SIZE];
68
69 struct btrfs_work work;
66}; 70};
67 71
68struct btrfs_fs_devices { 72struct btrfs_fs_devices {
@@ -117,7 +121,7 @@ int btrfs_alloc_chunk(struct btrfs_trans_handle *trans,
117void btrfs_mapping_init(struct btrfs_mapping_tree *tree); 121void btrfs_mapping_init(struct btrfs_mapping_tree *tree);
118void btrfs_mapping_tree_free(struct btrfs_mapping_tree *tree); 122void btrfs_mapping_tree_free(struct btrfs_mapping_tree *tree);
119int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio, 123int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio,
120 int mirror_num); 124 int mirror_num, int async_submit);
121int btrfs_read_super_device(struct btrfs_root *root, struct extent_buffer *buf); 125int btrfs_read_super_device(struct btrfs_root *root, struct extent_buffer *buf);
122int btrfs_open_devices(struct btrfs_fs_devices *fs_devices, 126int btrfs_open_devices(struct btrfs_fs_devices *fs_devices,
123 int flags, void *holder); 127 int flags, void *holder);