aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris Mason <chris.mason@oracle.com>2008-06-11 16:50:36 -0400
committerChris Mason <chris.mason@oracle.com>2008-09-25 11:04:03 -0400
commit8b7128429235d9bd72cfd5ed20c77c4f3118f744 (patch)
tree982eda13094af1ccd46e8c3853559a0eb6e298f6
parent43e570b08a6c6b1d75f218566a6240542a386fd9 (diff)
Btrfs: Add async worker threads for pre and post IO checksumming
Btrfs has been using workqueues to spread the checksumming load across other CPUs in the system. But, workqueues only schedule work on the same CPU that queued the work, giving them a limited benefit for systems with higher CPU counts. This code adds a generic facility to schedule work with pools of kthreads, and changes the bio submission code to queue bios up. The queueing is important to make sure large numbers of procs on the system don't turn streaming workloads into random workloads by sending IO down concurrently. The end result of all of this is much higher performance (and CPU usage) when doing checksumming on large machines. Two worker pools are created, one for writes and one for endio processing. The two could deadlock if we tried to service both from a single pool. Signed-off-by: Chris Mason <chris.mason@oracle.com>
-rw-r--r--fs/btrfs/Makefile2
-rw-r--r--fs/btrfs/async-thread.c288
-rw-r--r--fs/btrfs/async-thread.h78
-rw-r--r--fs/btrfs/ctree.h14
-rw-r--r--fs/btrfs/disk-io.c200
-rw-r--r--fs/btrfs/inode.c4
-rw-r--r--fs/btrfs/volumes.c162
-rw-r--r--fs/btrfs/volumes.h10
8 files changed, 626 insertions, 132 deletions
diff --git a/fs/btrfs/Makefile b/fs/btrfs/Makefile
index e5fc3cfea0ab..9dcfc2fe3332 100644
--- a/fs/btrfs/Makefile
+++ b/fs/btrfs/Makefile
@@ -6,7 +6,7 @@ btrfs-y := super.o ctree.o extent-tree.o print-tree.o root-tree.o dir-item.o \
6 hash.o file-item.o inode-item.o inode-map.o disk-io.o \ 6 hash.o file-item.o inode-item.o inode-map.o disk-io.o \
7 transaction.o bit-radix.o inode.o file.o tree-defrag.o \ 7 transaction.o bit-radix.o inode.o file.o tree-defrag.o \
8 extent_map.o sysfs.o struct-funcs.o xattr.o ordered-data.o \ 8 extent_map.o sysfs.o struct-funcs.o xattr.o ordered-data.o \
9 extent_io.o volumes.o 9 extent_io.o volumes.o async-thread.o
10 10
11btrfs-$(CONFIG_FS_POSIX_ACL) += acl.o 11btrfs-$(CONFIG_FS_POSIX_ACL) += acl.o
12else 12else
diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c
new file mode 100644
index 000000000000..2911b67bd6f7
--- /dev/null
+++ b/fs/btrfs/async-thread.c
@@ -0,0 +1,288 @@
1/*
2 * Copyright (C) 2007 Oracle. All rights reserved.
3 *
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public
6 * License v2 as published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public
14 * License along with this program; if not, write to the
15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
16 * Boston, MA 021110-1307, USA.
17 */
18
19#include <linux/kthread.h>
20#include <linux/list.h>
21#include <linux/spinlock.h>
22#include <linux/freezer.h>
23#include "async-thread.h"
24
25/*
26 * container for the kthread task pointer and the list of pending work
27 * One of these is allocated per thread.
28 */
29struct btrfs_worker_thread {
30 /* list of struct btrfs_work that are waiting for service */
31 struct list_head pending;
32
33 /* list of worker threads from struct btrfs_workers */
34 struct list_head worker_list;
35
36 /* kthread */
37 struct task_struct *task;
38
39 /* number of things on the pending list */
40 atomic_t num_pending;
41
42 /* protects the pending list. */
43 spinlock_t lock;
44
45 /* set to non-zero when this thread is already awake and kicking */
46 int working;
47};
48
49/*
50 * main loop for servicing work items
51 */
52static int worker_loop(void *arg)
53{
54 struct btrfs_worker_thread *worker = arg;
55 struct list_head *cur;
56 struct btrfs_work *work;
57 do {
58 spin_lock_irq(&worker->lock);
59 while(!list_empty(&worker->pending)) {
60 cur = worker->pending.next;
61 work = list_entry(cur, struct btrfs_work, list);
62 list_del(&work->list);
63 clear_bit(0, &work->flags);
64
65 work->worker = worker;
66 spin_unlock_irq(&worker->lock);
67
68 work->func(work);
69
70 atomic_dec(&worker->num_pending);
71 spin_lock_irq(&worker->lock);
72 }
73 worker->working = 0;
74 if (freezing(current)) {
75 refrigerator();
76 } else {
77 set_current_state(TASK_INTERRUPTIBLE);
78 spin_unlock_irq(&worker->lock);
79 schedule();
80 __set_current_state(TASK_RUNNING);
81 }
82 } while (!kthread_should_stop());
83 return 0;
84}
85
86/*
87 * this will wait for all the worker threads to shutdown
88 */
89int btrfs_stop_workers(struct btrfs_workers *workers)
90{
91 struct list_head *cur;
92 struct btrfs_worker_thread *worker;
93
94 while(!list_empty(&workers->worker_list)) {
95 cur = workers->worker_list.next;
96 worker = list_entry(cur, struct btrfs_worker_thread,
97 worker_list);
98 kthread_stop(worker->task);
99 list_del(&worker->worker_list);
100 kfree(worker);
101 }
102 return 0;
103}
104
105/*
106 * simple init on struct btrfs_workers
107 */
108void btrfs_init_workers(struct btrfs_workers *workers, int max)
109{
110 workers->num_workers = 0;
111 INIT_LIST_HEAD(&workers->worker_list);
112 workers->last = NULL;
113 spin_lock_init(&workers->lock);
114 workers->max_workers = max;
115}
116
117/*
118 * starts new worker threads. This does not enforce the max worker
119 * count in case you need to temporarily go past it.
120 */
121int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
122{
123 struct btrfs_worker_thread *worker;
124 int ret = 0;
125 int i;
126
127 for (i = 0; i < num_workers; i++) {
128 worker = kzalloc(sizeof(*worker), GFP_NOFS);
129 if (!worker) {
130 ret = -ENOMEM;
131 goto fail;
132 }
133
134 INIT_LIST_HEAD(&worker->pending);
135 INIT_LIST_HEAD(&worker->worker_list);
136 spin_lock_init(&worker->lock);
137 atomic_set(&worker->num_pending, 0);
138 worker->task = kthread_run(worker_loop, worker, "btrfs");
139 if (IS_ERR(worker->task)) {
140 ret = PTR_ERR(worker->task);
141 goto fail;
142 }
143
144 spin_lock_irq(&workers->lock);
145 list_add_tail(&worker->worker_list, &workers->worker_list);
146 workers->last = worker;
147 workers->num_workers++;
148 spin_unlock_irq(&workers->lock);
149 }
150 return 0;
151fail:
152 btrfs_stop_workers(workers);
153 return ret;
154}
155
156/*
157 * run through the list and find a worker thread that doesn't have a lot
158 * to do right now. This can return null if we aren't yet at the thread
159 * count limit and all of the threads are busy.
160 */
161static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
162{
163 struct btrfs_worker_thread *worker;
164 struct list_head *next;
165 struct list_head *start;
166 int enforce_min = workers->num_workers < workers->max_workers;
167
168 /* start with the last thread if it isn't busy */
169 worker = workers->last;
170 if (atomic_read(&worker->num_pending) < 64)
171 goto done;
172
173 next = worker->worker_list.next;
174 start = &worker->worker_list;
175
176 /*
177 * check all the workers for someone that is bored. FIXME, do
178 * something smart here
179 */
180 while(next != start) {
181 if (next == &workers->worker_list) {
182 next = workers->worker_list.next;
183 continue;
184 }
185 worker = list_entry(next, struct btrfs_worker_thread,
186 worker_list);
187 if (atomic_read(&worker->num_pending) < 64 || !enforce_min)
188 goto done;
189 next = next->next;
190 }
191 /*
192 * nobody was bored, if we're already at the max thread count,
193 * use the last thread
194 */
195 if (!enforce_min || atomic_read(&workers->last->num_pending) < 64) {
196 return workers->last;
197 }
198 return NULL;
199done:
200 workers->last = worker;
201 return worker;
202}
203
204static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
205{
206 struct btrfs_worker_thread *worker;
207 unsigned long flags;
208
209again:
210 spin_lock_irqsave(&workers->lock, flags);
211 worker = next_worker(workers);
212 spin_unlock_irqrestore(&workers->lock, flags);
213
214 if (!worker) {
215 spin_lock_irqsave(&workers->lock, flags);
216 if (workers->num_workers >= workers->max_workers) {
217 /*
218 * we have failed to find any workers, just
219 * return the force one
220 */
221 worker = list_entry(workers->worker_list.next,
222 struct btrfs_worker_thread, worker_list);
223 spin_unlock_irqrestore(&workers->lock, flags);
224 } else {
225 spin_unlock_irqrestore(&workers->lock, flags);
226 /* we're below the limit, start another worker */
227 btrfs_start_workers(workers, 1);
228 goto again;
229 }
230 }
231 return worker;
232}
233
234/*
235 * btrfs_requeue_work just puts the work item back on the tail of the list
236 * it was taken from. It is intended for use with long running work functions
237 * that make some progress and want to give the cpu up for others.
238 */
239int btrfs_requeue_work(struct btrfs_work *work)
240{
241 struct btrfs_worker_thread *worker = work->worker;
242 unsigned long flags;
243
244 if (test_and_set_bit(0, &work->flags))
245 goto out;
246
247 spin_lock_irqsave(&worker->lock, flags);
248 atomic_inc(&worker->num_pending);
249 list_add_tail(&work->list, &worker->pending);
250 spin_unlock_irqrestore(&worker->lock, flags);
251out:
252 return 0;
253}
254
255/*
256 * places a struct btrfs_work into the pending queue of one of the kthreads
257 */
258int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
259{
260 struct btrfs_worker_thread *worker;
261 unsigned long flags;
262 int wake = 0;
263
264 /* don't requeue something already on a list */
265 if (test_and_set_bit(0, &work->flags))
266 goto out;
267
268 worker = find_worker(workers);
269
270 spin_lock_irqsave(&worker->lock, flags);
271 atomic_inc(&worker->num_pending);
272 list_add_tail(&work->list, &worker->pending);
273
274 /*
275 * avoid calling into wake_up_process if this thread has already
276 * been kicked
277 */
278 if (!worker->working)
279 wake = 1;
280 worker->working = 1;
281
282 spin_unlock_irqrestore(&worker->lock, flags);
283
284 if (wake)
285 wake_up_process(worker->task);
286out:
287 return 0;
288}
diff --git a/fs/btrfs/async-thread.h b/fs/btrfs/async-thread.h
new file mode 100644
index 000000000000..52fc9da0f9e7
--- /dev/null
+++ b/fs/btrfs/async-thread.h
@@ -0,0 +1,78 @@
1/*
2 * Copyright (C) 2007 Oracle. All rights reserved.
3 *
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public
6 * License v2 as published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public
14 * License along with this program; if not, write to the
15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
16 * Boston, MA 021110-1307, USA.
17 */
18
19#ifndef __BTRFS_ASYNC_THREAD_
20#define __BTRFS_ASYNC_THREAD_
21
22struct btrfs_worker_thread;
23
24/*
25 * This is similar to a workqueue, but it is meant to spread the operations
26 * across all available cpus instead of just the CPU that was used to
27 * queue the work. There is also some batching introduced to try and
28 * cut down on context switches.
29 *
30 * By default threads are added on demand up to 2 * the number of cpus.
31 * Changing struct btrfs_workers->max_workers is one way to prevent
32 * demand creation of kthreads.
33 *
34 * the basic model of these worker threads is to embed a btrfs_work
35 * structure in your own data struct, and use container_of in a
36 * work function to get back to your data struct.
37 */
38struct btrfs_work {
39 /*
40 * only func should be set to the function you want called
41 * your work struct is passed as the only arg
42 */
43 void (*func)(struct btrfs_work *work);
44
45 /*
46 * flags should be set to zero. It is used to make sure the
47 * struct is only inserted once into the list.
48 */
49 unsigned long flags;
50
51 /* don't touch these */
52 struct btrfs_worker_thread *worker;
53 struct list_head list;
54};
55
56struct btrfs_workers {
57 /* current number of running workers */
58 int num_workers;
59
60 /* max number of workers allowed. changed by btrfs_start_workers */
61 int max_workers;
62
63 /* list with all the work threads */
64 struct list_head worker_list;
65
66 /* the last worker thread to have something queued */
67 struct btrfs_worker_thread *last;
68
69 /* lock for finding the next worker thread to queue on */
70 spinlock_t lock;
71};
72
73int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work);
74int btrfs_start_workers(struct btrfs_workers *workers, int num_workers);
75int btrfs_stop_workers(struct btrfs_workers *workers);
76void btrfs_init_workers(struct btrfs_workers *workers, int max);
77int btrfs_requeue_work(struct btrfs_work *work);
78#endif
diff --git a/fs/btrfs/ctree.h b/fs/btrfs/ctree.h
index 49cbc62b42f9..6c91a510c965 100644
--- a/fs/btrfs/ctree.h
+++ b/fs/btrfs/ctree.h
@@ -30,6 +30,7 @@
30#include "bit-radix.h" 30#include "bit-radix.h"
31#include "extent_io.h" 31#include "extent_io.h"
32#include "extent_map.h" 32#include "extent_map.h"
33#include "async-thread.h"
33 34
34struct btrfs_trans_handle; 35struct btrfs_trans_handle;
35struct btrfs_transaction; 36struct btrfs_transaction;
@@ -518,13 +519,20 @@ struct btrfs_fs_info {
518 struct list_head hashers; 519 struct list_head hashers;
519 struct list_head dead_roots; 520 struct list_head dead_roots;
520 struct list_head end_io_work_list; 521 struct list_head end_io_work_list;
521 struct list_head async_submit_work_list;
522 struct work_struct end_io_work; 522 struct work_struct end_io_work;
523 struct work_struct async_submit_work;
524 spinlock_t end_io_work_lock; 523 spinlock_t end_io_work_lock;
525 spinlock_t async_submit_work_lock;
526 atomic_t nr_async_submits; 524 atomic_t nr_async_submits;
527 525
526 /*
527 * there is a pool of worker threads for checksumming during writes
528 * and a pool for checksumming after reads. This is because readers
529 * can run with FS locks held, and the writers may be waiting for
530 * those locks. We don't want ordering in the pending list to cause
531 * deadlocks, and so the two are serviced separately.
532 */
533 struct btrfs_workers workers;
534 struct btrfs_workers endio_workers;
535
528#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) 536#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
529 struct work_struct trans_work; 537 struct work_struct trans_work;
530#else 538#else
diff --git a/fs/btrfs/disk-io.c b/fs/btrfs/disk-io.c
index b9a53646ceb2..98ff4fbcb386 100644
--- a/fs/btrfs/disk-io.c
+++ b/fs/btrfs/disk-io.c
@@ -31,6 +31,7 @@
31#include "btrfs_inode.h" 31#include "btrfs_inode.h"
32#include "volumes.h" 32#include "volumes.h"
33#include "print-tree.h" 33#include "print-tree.h"
34#include "async-thread.h"
34 35
35#if 0 36#if 0
36static int check_tree_block(struct btrfs_root *root, struct extent_buffer *buf) 37static int check_tree_block(struct btrfs_root *root, struct extent_buffer *buf)
@@ -46,8 +47,7 @@ static int check_tree_block(struct btrfs_root *root, struct extent_buffer *buf)
46#endif 47#endif
47 48
48static struct extent_io_ops btree_extent_io_ops; 49static struct extent_io_ops btree_extent_io_ops;
49static struct workqueue_struct *end_io_workqueue; 50static void end_workqueue_fn(struct btrfs_work *work);
50static struct workqueue_struct *async_submit_workqueue;
51 51
52struct end_io_wq { 52struct end_io_wq {
53 struct bio *bio; 53 struct bio *bio;
@@ -57,6 +57,7 @@ struct end_io_wq {
57 int error; 57 int error;
58 int metadata; 58 int metadata;
59 struct list_head list; 59 struct list_head list;
60 struct btrfs_work work;
60}; 61};
61 62
62struct async_submit_bio { 63struct async_submit_bio {
@@ -66,6 +67,7 @@ struct async_submit_bio {
66 extent_submit_bio_hook_t *submit_bio_hook; 67 extent_submit_bio_hook_t *submit_bio_hook;
67 int rw; 68 int rw;
68 int mirror_num; 69 int mirror_num;
70 struct btrfs_work work;
69}; 71};
70 72
71struct extent_map *btree_get_extent(struct inode *inode, struct page *page, 73struct extent_map *btree_get_extent(struct inode *inode, struct page *page,
@@ -389,7 +391,6 @@ static int end_workqueue_bio(struct bio *bio,
389{ 391{
390 struct end_io_wq *end_io_wq = bio->bi_private; 392 struct end_io_wq *end_io_wq = bio->bi_private;
391 struct btrfs_fs_info *fs_info; 393 struct btrfs_fs_info *fs_info;
392 unsigned long flags;
393 394
394#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23) 395#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)
395 if (bio->bi_size) 396 if (bio->bi_size)
@@ -397,11 +398,10 @@ static int end_workqueue_bio(struct bio *bio,
397#endif 398#endif
398 399
399 fs_info = end_io_wq->info; 400 fs_info = end_io_wq->info;
400 spin_lock_irqsave(&fs_info->end_io_work_lock, flags);
401 end_io_wq->error = err; 401 end_io_wq->error = err;
402 list_add_tail(&end_io_wq->list, &fs_info->end_io_work_list); 402 end_io_wq->work.func = end_workqueue_fn;
403 spin_unlock_irqrestore(&fs_info->end_io_work_lock, flags); 403 end_io_wq->work.flags = 0;
404 queue_work(end_io_workqueue, &fs_info->end_io_work); 404 btrfs_queue_worker(&fs_info->endio_workers, &end_io_wq->work);
405 405
406#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23) 406#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)
407 return 0; 407 return 0;
@@ -428,6 +428,19 @@ int btrfs_bio_wq_end_io(struct btrfs_fs_info *info, struct bio *bio,
428 return 0; 428 return 0;
429} 429}
430 430
431static void run_one_async_submit(struct btrfs_work *work)
432{
433 struct btrfs_fs_info *fs_info;
434 struct async_submit_bio *async;
435
436 async = container_of(work, struct async_submit_bio, work);
437 fs_info = BTRFS_I(async->inode)->root->fs_info;
438 atomic_dec(&fs_info->nr_async_submits);
439 async->submit_bio_hook(async->inode, async->rw, async->bio,
440 async->mirror_num);
441 kfree(async);
442}
443
431int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode, 444int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
432 int rw, struct bio *bio, int mirror_num, 445 int rw, struct bio *bio, int mirror_num,
433 extent_submit_bio_hook_t *submit_bio_hook) 446 extent_submit_bio_hook_t *submit_bio_hook)
@@ -443,13 +456,10 @@ int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
443 async->bio = bio; 456 async->bio = bio;
444 async->mirror_num = mirror_num; 457 async->mirror_num = mirror_num;
445 async->submit_bio_hook = submit_bio_hook; 458 async->submit_bio_hook = submit_bio_hook;
446 459 async->work.func = run_one_async_submit;
447 spin_lock(&fs_info->async_submit_work_lock); 460 async->work.flags = 0;
448 list_add_tail(&async->list, &fs_info->async_submit_work_list);
449 atomic_inc(&fs_info->nr_async_submits); 461 atomic_inc(&fs_info->nr_async_submits);
450 spin_unlock(&fs_info->async_submit_work_lock); 462 btrfs_queue_worker(&fs_info->workers, &async->work);
451
452 queue_work(async_submit_workqueue, &fs_info->async_submit_work);
453 return 0; 463 return 0;
454} 464}
455 465
@@ -462,19 +472,32 @@ static int __btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
462 472
463 offset = bio->bi_sector << 9; 473 offset = bio->bi_sector << 9;
464 474
475 /*
476 * when we're called for a write, we're already in the async
477 * submission context. Just jump ingo btrfs_map_bio
478 */
465 if (rw & (1 << BIO_RW)) { 479 if (rw & (1 << BIO_RW)) {
466 return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num); 480 return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio,
481 mirror_num, 0);
467 } 482 }
468 483
484 /*
485 * called for a read, do the setup so that checksum validation
486 * can happen in the async kernel threads
487 */
469 ret = btrfs_bio_wq_end_io(root->fs_info, bio, 1); 488 ret = btrfs_bio_wq_end_io(root->fs_info, bio, 1);
470 BUG_ON(ret); 489 BUG_ON(ret);
471 490
472 return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num); 491 return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num, 1);
473} 492}
474 493
475static int btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, 494static int btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
476 int mirror_num) 495 int mirror_num)
477{ 496{
497 /*
498 * kthread helpers are used to submit writes so that checksumming
499 * can happen in parallel across all CPUs
500 */
478 if (!(rw & (1 << BIO_RW))) { 501 if (!(rw & (1 << BIO_RW))) {
479 return __btree_submit_bio_hook(inode, rw, bio, mirror_num); 502 return __btree_submit_bio_hook(inode, rw, bio, mirror_num);
480 } 503 }
@@ -1036,95 +1059,40 @@ static int bio_ready_for_csum(struct bio *bio)
1036 return ret; 1059 return ret;
1037} 1060}
1038 1061
1039#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) 1062/*
1040static void btrfs_end_io_csum(void *p) 1063 * called by the kthread helper functions to finally call the bio end_io
1041#else 1064 * functions. This is where read checksum verification actually happens
1042static void btrfs_end_io_csum(struct work_struct *work) 1065 */
1043#endif 1066static void end_workqueue_fn(struct btrfs_work *work)
1044{ 1067{
1045#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
1046 struct btrfs_fs_info *fs_info = p;
1047#else
1048 struct btrfs_fs_info *fs_info = container_of(work,
1049 struct btrfs_fs_info,
1050 end_io_work);
1051#endif
1052 unsigned long flags;
1053 struct end_io_wq *end_io_wq;
1054 struct bio *bio; 1068 struct bio *bio;
1055 struct list_head *next; 1069 struct end_io_wq *end_io_wq;
1070 struct btrfs_fs_info *fs_info;
1056 int error; 1071 int error;
1057 int was_empty;
1058 1072
1059 while(1) { 1073 end_io_wq = container_of(work, struct end_io_wq, work);
1060 spin_lock_irqsave(&fs_info->end_io_work_lock, flags); 1074 bio = end_io_wq->bio;
1061 if (list_empty(&fs_info->end_io_work_list)) { 1075 fs_info = end_io_wq->info;
1062 spin_unlock_irqrestore(&fs_info->end_io_work_lock,
1063 flags);
1064 return;
1065 }
1066 next = fs_info->end_io_work_list.next;
1067 list_del(next);
1068 spin_unlock_irqrestore(&fs_info->end_io_work_lock, flags);
1069
1070 end_io_wq = list_entry(next, struct end_io_wq, list);
1071
1072 bio = end_io_wq->bio;
1073 if (end_io_wq->metadata && !bio_ready_for_csum(bio)) {
1074 spin_lock_irqsave(&fs_info->end_io_work_lock, flags);
1075 was_empty = list_empty(&fs_info->end_io_work_list);
1076 list_add_tail(&end_io_wq->list,
1077 &fs_info->end_io_work_list);
1078 spin_unlock_irqrestore(&fs_info->end_io_work_lock,
1079 flags);
1080 if (was_empty)
1081 return;
1082 continue;
1083 }
1084 error = end_io_wq->error;
1085 bio->bi_private = end_io_wq->private;
1086 bio->bi_end_io = end_io_wq->end_io;
1087 kfree(end_io_wq);
1088#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)
1089 bio_endio(bio, bio->bi_size, error);
1090#else
1091 bio_endio(bio, error);
1092#endif
1093 }
1094}
1095 1076
1096#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) 1077 /* metadata bios are special because the whole tree block must
1097static void btrfs_async_submit_work(void *p) 1078 * be checksummed at once. This makes sure the entire block is in
1098#else 1079 * ram and up to date before trying to verify things. For
1099static void btrfs_async_submit_work(struct work_struct *work) 1080 * blocksize <= pagesize, it is basically a noop
1100#endif 1081 */
1101{ 1082 if (end_io_wq->metadata && !bio_ready_for_csum(bio)) {
1102#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) 1083 btrfs_queue_worker(&fs_info->endio_workers,
1103 struct btrfs_fs_info *fs_info = p; 1084 &end_io_wq->work);
1085 return;
1086 }
1087 error = end_io_wq->error;
1088 bio->bi_private = end_io_wq->private;
1089 bio->bi_end_io = end_io_wq->end_io;
1090 kfree(end_io_wq);
1091#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)
1092 bio_endio(bio, bio->bi_size, error);
1104#else 1093#else
1105 struct btrfs_fs_info *fs_info = container_of(work, 1094 bio_endio(bio, error);
1106 struct btrfs_fs_info,
1107 async_submit_work);
1108#endif 1095#endif
1109 struct async_submit_bio *async;
1110 struct list_head *next;
1111
1112 while(1) {
1113 spin_lock(&fs_info->async_submit_work_lock);
1114 if (list_empty(&fs_info->async_submit_work_list)) {
1115 spin_unlock(&fs_info->async_submit_work_lock);
1116 return;
1117 }
1118 next = fs_info->async_submit_work_list.next;
1119 list_del(next);
1120 atomic_dec(&fs_info->nr_async_submits);
1121 spin_unlock(&fs_info->async_submit_work_lock);
1122
1123 async = list_entry(next, struct async_submit_bio, list);
1124 async->submit_bio_hook(async->inode, async->rw, async->bio,
1125 async->mirror_num);
1126 kfree(async);
1127 }
1128} 1096}
1129 1097
1130struct btrfs_root *open_ctree(struct super_block *sb, 1098struct btrfs_root *open_ctree(struct super_block *sb,
@@ -1155,19 +1123,11 @@ struct btrfs_root *open_ctree(struct super_block *sb,
1155 err = -ENOMEM; 1123 err = -ENOMEM;
1156 goto fail; 1124 goto fail;
1157 } 1125 }
1158 end_io_workqueue = create_workqueue("btrfs-end-io");
1159 BUG_ON(!end_io_workqueue);
1160 async_submit_workqueue = create_workqueue("btrfs-async-submit");
1161
1162 INIT_RADIX_TREE(&fs_info->fs_roots_radix, GFP_NOFS); 1126 INIT_RADIX_TREE(&fs_info->fs_roots_radix, GFP_NOFS);
1163 INIT_LIST_HEAD(&fs_info->trans_list); 1127 INIT_LIST_HEAD(&fs_info->trans_list);
1164 INIT_LIST_HEAD(&fs_info->dead_roots); 1128 INIT_LIST_HEAD(&fs_info->dead_roots);
1165 INIT_LIST_HEAD(&fs_info->hashers); 1129 INIT_LIST_HEAD(&fs_info->hashers);
1166 INIT_LIST_HEAD(&fs_info->end_io_work_list);
1167 INIT_LIST_HEAD(&fs_info->async_submit_work_list);
1168 spin_lock_init(&fs_info->hash_lock); 1130 spin_lock_init(&fs_info->hash_lock);
1169 spin_lock_init(&fs_info->end_io_work_lock);
1170 spin_lock_init(&fs_info->async_submit_work_lock);
1171 spin_lock_init(&fs_info->delalloc_lock); 1131 spin_lock_init(&fs_info->delalloc_lock);
1172 spin_lock_init(&fs_info->new_trans_lock); 1132 spin_lock_init(&fs_info->new_trans_lock);
1173 1133
@@ -1222,13 +1182,8 @@ struct btrfs_root *open_ctree(struct super_block *sb,
1222 fs_info->do_barriers = 1; 1182 fs_info->do_barriers = 1;
1223 1183
1224#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) 1184#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
1225 INIT_WORK(&fs_info->end_io_work, btrfs_end_io_csum, fs_info);
1226 INIT_WORK(&fs_info->async_submit_work, btrfs_async_submit_work,
1227 fs_info);
1228 INIT_WORK(&fs_info->trans_work, btrfs_transaction_cleaner, fs_info); 1185 INIT_WORK(&fs_info->trans_work, btrfs_transaction_cleaner, fs_info);
1229#else 1186#else
1230 INIT_WORK(&fs_info->end_io_work, btrfs_end_io_csum);
1231 INIT_WORK(&fs_info->async_submit_work, btrfs_async_submit_work);
1232 INIT_DELAYED_WORK(&fs_info->trans_work, btrfs_transaction_cleaner); 1187 INIT_DELAYED_WORK(&fs_info->trans_work, btrfs_transaction_cleaner);
1233#endif 1188#endif
1234 BTRFS_I(fs_info->btree_inode)->root = tree_root; 1189 BTRFS_I(fs_info->btree_inode)->root = tree_root;
@@ -1240,6 +1195,19 @@ struct btrfs_root *open_ctree(struct super_block *sb,
1240 mutex_init(&fs_info->trans_mutex); 1195 mutex_init(&fs_info->trans_mutex);
1241 mutex_init(&fs_info->fs_mutex); 1196 mutex_init(&fs_info->fs_mutex);
1242 1197
1198 /* we need to start all the end_io workers up front because the
1199 * queue work function gets called at interrupt time. The endio
1200 * workers don't normally start IO, so some number of them <= the
1201 * number of cpus is fine. They handle checksumming after a read.
1202 *
1203 * The other worker threads do start IO, so the max is larger than
1204 * the number of CPUs. FIXME, tune this for huge machines
1205 */
1206 btrfs_init_workers(&fs_info->workers, num_online_cpus() * 2);
1207 btrfs_init_workers(&fs_info->endio_workers, num_online_cpus());
1208 btrfs_start_workers(&fs_info->workers, 1);
1209 btrfs_start_workers(&fs_info->endio_workers, num_online_cpus());
1210
1243#if 0 1211#if 0
1244 ret = add_hasher(fs_info, "crc32c"); 1212 ret = add_hasher(fs_info, "crc32c");
1245 if (ret) { 1213 if (ret) {
@@ -1375,6 +1343,8 @@ fail_sb_buffer:
1375 extent_io_tree_empty_lru(&BTRFS_I(fs_info->btree_inode)->io_tree); 1343 extent_io_tree_empty_lru(&BTRFS_I(fs_info->btree_inode)->io_tree);
1376fail_iput: 1344fail_iput:
1377 iput(fs_info->btree_inode); 1345 iput(fs_info->btree_inode);
1346 btrfs_stop_workers(&fs_info->workers);
1347 btrfs_stop_workers(&fs_info->endio_workers);
1378fail: 1348fail:
1379 btrfs_close_devices(fs_info->fs_devices); 1349 btrfs_close_devices(fs_info->fs_devices);
1380 btrfs_mapping_tree_free(&fs_info->mapping_tree); 1350 btrfs_mapping_tree_free(&fs_info->mapping_tree);
@@ -1623,16 +1593,10 @@ int close_ctree(struct btrfs_root *root)
1623 extent_io_tree_empty_lru(&fs_info->extent_ins); 1593 extent_io_tree_empty_lru(&fs_info->extent_ins);
1624 extent_io_tree_empty_lru(&BTRFS_I(fs_info->btree_inode)->io_tree); 1594 extent_io_tree_empty_lru(&BTRFS_I(fs_info->btree_inode)->io_tree);
1625 1595
1626 flush_workqueue(async_submit_workqueue);
1627 flush_workqueue(end_io_workqueue);
1628
1629 truncate_inode_pages(fs_info->btree_inode->i_mapping, 0); 1596 truncate_inode_pages(fs_info->btree_inode->i_mapping, 0);
1630 1597
1631 flush_workqueue(async_submit_workqueue); 1598 btrfs_stop_workers(&fs_info->workers);
1632 destroy_workqueue(async_submit_workqueue); 1599 btrfs_stop_workers(&fs_info->endio_workers);
1633
1634 flush_workqueue(end_io_workqueue);
1635 destroy_workqueue(end_io_workqueue);
1636 1600
1637 iput(fs_info->btree_inode); 1601 iput(fs_info->btree_inode);
1638#if 0 1602#if 0
diff --git a/fs/btrfs/inode.c b/fs/btrfs/inode.c
index 0f14697becef..7daef8d37006 100644
--- a/fs/btrfs/inode.c
+++ b/fs/btrfs/inode.c
@@ -359,7 +359,7 @@ int __btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
359 359
360 kfree(sums); 360 kfree(sums);
361 361
362 return btrfs_map_bio(root, rw, bio, mirror_num); 362 return btrfs_map_bio(root, rw, bio, mirror_num, 1);
363} 363}
364 364
365int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, 365int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
@@ -383,7 +383,7 @@ int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
383 inode, rw, bio, mirror_num, 383 inode, rw, bio, mirror_num,
384 __btrfs_submit_bio_hook); 384 __btrfs_submit_bio_hook);
385mapit: 385mapit:
386 return btrfs_map_bio(root, rw, bio, mirror_num); 386 return btrfs_map_bio(root, rw, bio, mirror_num, 0);
387} 387}
388 388
389int btrfs_readpage_io_hook(struct page *page, u64 start, u64 end) 389int btrfs_readpage_io_hook(struct page *page, u64 start, u64 end)
diff --git a/fs/btrfs/volumes.c b/fs/btrfs/volumes.c
index 722eb4550154..c57458ce6339 100644
--- a/fs/btrfs/volumes.c
+++ b/fs/btrfs/volumes.c
@@ -27,6 +27,7 @@
27#include "transaction.h" 27#include "transaction.h"
28#include "print-tree.h" 28#include "print-tree.h"
29#include "volumes.h" 29#include "volumes.h"
30#include "async-thread.h"
30 31
31struct map_lookup { 32struct map_lookup {
32 u64 type; 33 u64 type;
@@ -110,6 +111,101 @@ static struct btrfs_fs_devices *find_fsid(u8 *fsid)
110 return NULL; 111 return NULL;
111} 112}
112 113
114/*
115 * we try to collect pending bios for a device so we don't get a large
116 * number of procs sending bios down to the same device. This greatly
117 * improves the schedulers ability to collect and merge the bios.
118 *
119 * But, it also turns into a long list of bios to process and that is sure
120 * to eventually make the worker thread block. The solution here is to
121 * make some progress and then put this work struct back at the end of
122 * the list if the block device is congested. This way, multiple devices
123 * can make progress from a single worker thread.
124 */
125int run_scheduled_bios(struct btrfs_device *device)
126{
127 struct bio *pending;
128 struct backing_dev_info *bdi;
129 struct bio *tail;
130 struct bio *cur;
131 int again = 0;
132 unsigned long num_run = 0;
133
134 bdi = device->bdev->bd_inode->i_mapping->backing_dev_info;
135loop:
136 spin_lock(&device->io_lock);
137
138 /* take all the bios off the list at once and process them
139 * later on (without the lock held). But, remember the
140 * tail and other pointers so the bios can be properly reinserted
141 * into the list if we hit congestion
142 */
143 pending = device->pending_bios;
144 tail = device->pending_bio_tail;
145 WARN_ON(pending && !tail);
146 device->pending_bios = NULL;
147 device->pending_bio_tail = NULL;
148
149 /*
150 * if pending was null this time around, no bios need processing
151 * at all and we can stop. Otherwise it'll loop back up again
152 * and do an additional check so no bios are missed.
153 *
154 * device->running_pending is used to synchronize with the
155 * schedule_bio code.
156 */
157 if (pending) {
158 again = 1;
159 device->running_pending = 1;
160 } else {
161 again = 0;
162 device->running_pending = 0;
163 }
164 spin_unlock(&device->io_lock);
165
166 while(pending) {
167 cur = pending;
168 pending = pending->bi_next;
169 cur->bi_next = NULL;
170 atomic_dec(&device->dev_root->fs_info->nr_async_submits);
171 submit_bio(cur->bi_rw, cur);
172 num_run++;
173
174 /*
175 * we made progress, there is more work to do and the bdi
176 * is now congested. Back off and let other work structs
177 * run instead
178 */
179 if (pending && num_run && bdi_write_congested(bdi)) {
180 struct bio *old_head;
181
182 spin_lock(&device->io_lock);
183 old_head = device->pending_bios;
184 device->pending_bios = pending;
185 if (device->pending_bio_tail)
186 tail->bi_next = old_head;
187 else
188 device->pending_bio_tail = tail;
189
190 spin_unlock(&device->io_lock);
191 btrfs_requeue_work(&device->work);
192 goto done;
193 }
194 }
195 if (again)
196 goto loop;
197done:
198 return 0;
199}
200
201void pending_bios_fn(struct btrfs_work *work)
202{
203 struct btrfs_device *device;
204
205 device = container_of(work, struct btrfs_device, work);
206 run_scheduled_bios(device);
207}
208
113static int device_list_add(const char *path, 209static int device_list_add(const char *path,
114 struct btrfs_super_block *disk_super, 210 struct btrfs_super_block *disk_super,
115 u64 devid, struct btrfs_fs_devices **fs_devices_ret) 211 u64 devid, struct btrfs_fs_devices **fs_devices_ret)
@@ -141,6 +237,7 @@ static int device_list_add(const char *path,
141 return -ENOMEM; 237 return -ENOMEM;
142 } 238 }
143 device->devid = devid; 239 device->devid = devid;
240 device->work.func = pending_bios_fn;
144 memcpy(device->uuid, disk_super->dev_item.uuid, 241 memcpy(device->uuid, disk_super->dev_item.uuid,
145 BTRFS_UUID_SIZE); 242 BTRFS_UUID_SIZE);
146 device->barriers = 1; 243 device->barriers = 1;
@@ -925,6 +1022,7 @@ int btrfs_init_new_device(struct btrfs_root *root, char *device_path)
925 } 1022 }
926 1023
927 device->barriers = 1; 1024 device->barriers = 1;
1025 device->work.func = pending_bios_fn;
928 generate_random_uuid(device->uuid); 1026 generate_random_uuid(device->uuid);
929 spin_lock_init(&device->io_lock); 1027 spin_lock_init(&device->io_lock);
930 device->name = kstrdup(device_path, GFP_NOFS); 1028 device->name = kstrdup(device_path, GFP_NOFS);
@@ -1965,8 +2063,61 @@ static int end_bio_multi_stripe(struct bio *bio,
1965#endif 2063#endif
1966} 2064}
1967 2065
2066struct async_sched {
2067 struct bio *bio;
2068 int rw;
2069 struct btrfs_fs_info *info;
2070 struct btrfs_work work;
2071};
2072
2073/*
2074 * see run_scheduled_bios for a description of why bios are collected for
2075 * async submit.
2076 *
2077 * This will add one bio to the pending list for a device and make sure
2078 * the work struct is scheduled.
2079 */
2080int schedule_bio(struct btrfs_root *root, struct btrfs_device *device,
2081 int rw, struct bio *bio)
2082{
2083 int should_queue = 1;
2084
2085 /* don't bother with additional async steps for reads, right now */
2086 if (!(rw & (1 << BIO_RW))) {
2087 submit_bio(rw, bio);
2088 return 0;
2089 }
2090
2091 /*
2092 * nr_async_sumbits allows us to reliably return congestion to the
2093 * higher layers. Otherwise, the async bio makes it appear we have
2094 * made progress against dirty pages when we've really just put it
2095 * on a queue for later
2096 */
2097 atomic_inc(&root->fs_info->nr_async_submits);
2098 bio->bi_next = NULL;
2099 bio->bi_rw |= rw;
2100
2101 spin_lock(&device->io_lock);
2102
2103 if (device->pending_bio_tail)
2104 device->pending_bio_tail->bi_next = bio;
2105
2106 device->pending_bio_tail = bio;
2107 if (!device->pending_bios)
2108 device->pending_bios = bio;
2109 if (device->running_pending)
2110 should_queue = 0;
2111
2112 spin_unlock(&device->io_lock);
2113
2114 if (should_queue)
2115 btrfs_queue_worker(&root->fs_info->workers, &device->work);
2116 return 0;
2117}
2118
1968int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio, 2119int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio,
1969 int mirror_num) 2120 int mirror_num, int async_submit)
1970{ 2121{
1971 struct btrfs_mapping_tree *map_tree; 2122 struct btrfs_mapping_tree *map_tree;
1972 struct btrfs_device *dev; 2123 struct btrfs_device *dev;
@@ -2012,10 +2163,10 @@ int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio,
2012 dev = multi->stripes[dev_nr].dev; 2163 dev = multi->stripes[dev_nr].dev;
2013 if (dev && dev->bdev) { 2164 if (dev && dev->bdev) {
2014 bio->bi_bdev = dev->bdev; 2165 bio->bi_bdev = dev->bdev;
2015 spin_lock(&dev->io_lock); 2166 if (async_submit)
2016 dev->total_ios++; 2167 schedule_bio(root, dev, rw, bio);
2017 spin_unlock(&dev->io_lock); 2168 else
2018 submit_bio(rw, bio); 2169 submit_bio(rw, bio);
2019 } else { 2170 } else {
2020 bio->bi_bdev = root->fs_info->fs_devices->latest_bdev; 2171 bio->bi_bdev = root->fs_info->fs_devices->latest_bdev;
2021 bio->bi_sector = logical >> 9; 2172 bio->bi_sector = logical >> 9;
@@ -2054,6 +2205,7 @@ static struct btrfs_device *add_missing_dev(struct btrfs_root *root,
2054 device->barriers = 1; 2205 device->barriers = 1;
2055 device->dev_root = root->fs_info->dev_root; 2206 device->dev_root = root->fs_info->dev_root;
2056 device->devid = devid; 2207 device->devid = devid;
2208 device->work.func = pending_bios_fn;
2057 fs_devices->num_devices++; 2209 fs_devices->num_devices++;
2058 spin_lock_init(&device->io_lock); 2210 spin_lock_init(&device->io_lock);
2059 memcpy(device->uuid, dev_uuid, BTRFS_UUID_SIZE); 2211 memcpy(device->uuid, dev_uuid, BTRFS_UUID_SIZE);
diff --git a/fs/btrfs/volumes.h b/fs/btrfs/volumes.h
index 4df6b1608f91..48a44f7a9385 100644
--- a/fs/btrfs/volumes.h
+++ b/fs/btrfs/volumes.h
@@ -20,6 +20,7 @@
20#define __BTRFS_VOLUMES_ 20#define __BTRFS_VOLUMES_
21 21
22#include <linux/bio.h> 22#include <linux/bio.h>
23#include "async-thread.h"
23 24
24struct buffer_head; 25struct buffer_head;
25struct btrfs_device { 26struct btrfs_device {
@@ -27,6 +28,9 @@ struct btrfs_device {
27 struct list_head dev_alloc_list; 28 struct list_head dev_alloc_list;
28 struct btrfs_root *dev_root; 29 struct btrfs_root *dev_root;
29 struct buffer_head *pending_io; 30 struct buffer_head *pending_io;
31 struct bio *pending_bios;
32 struct bio *pending_bio_tail;
33 int running_pending;
30 u64 generation; 34 u64 generation;
31 35
32 int barriers; 36 int barriers;
@@ -36,8 +40,6 @@ struct btrfs_device {
36 40
37 struct block_device *bdev; 41 struct block_device *bdev;
38 42
39 u64 total_ios;
40
41 char *name; 43 char *name;
42 44
43 /* the internal btrfs device id */ 45 /* the internal btrfs device id */
@@ -63,6 +65,8 @@ struct btrfs_device {
63 65
64 /* physical drive uuid (or lvm uuid) */ 66 /* physical drive uuid (or lvm uuid) */
65 u8 uuid[BTRFS_UUID_SIZE]; 67 u8 uuid[BTRFS_UUID_SIZE];
68
69 struct btrfs_work work;
66}; 70};
67 71
68struct btrfs_fs_devices { 72struct btrfs_fs_devices {
@@ -117,7 +121,7 @@ int btrfs_alloc_chunk(struct btrfs_trans_handle *trans,
117void btrfs_mapping_init(struct btrfs_mapping_tree *tree); 121void btrfs_mapping_init(struct btrfs_mapping_tree *tree);
118void btrfs_mapping_tree_free(struct btrfs_mapping_tree *tree); 122void btrfs_mapping_tree_free(struct btrfs_mapping_tree *tree);
119int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio, 123int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio,
120 int mirror_num); 124 int mirror_num, int async_submit);
121int btrfs_read_super_device(struct btrfs_root *root, struct extent_buffer *buf); 125int btrfs_read_super_device(struct btrfs_root *root, struct extent_buffer *buf);
122int btrfs_open_devices(struct btrfs_fs_devices *fs_devices, 126int btrfs_open_devices(struct btrfs_fs_devices *fs_devices,
123 int flags, void *holder); 127 int flags, void *holder);