aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris Mason <chris.mason@oracle.com>2008-11-06 22:03:00 -0500
committerChris Mason <chris.mason@oracle.com>2008-11-06 22:03:00 -0500
commit4a69a41009c4ac691f7d9c289f5f37fabeddce46 (patch)
tree1dac90d2f8e4ad4114fb1f4c168925daf2769d28
parent537fb0671549a9a6457ce42a25ab34b29d97a256 (diff)
Btrfs: Add ordered async work queues
Btrfs uses kernel threads to create async work queues for cpu intensive operations such as checksumming and decompression. These work well, but they make it difficult to keep IO order intact. A single writepages call from pdflush or fsync will turn into a number of bios, and each bio is checksummed in parallel. Once the checksum is computed, the bio is sent down to the disk, and since we don't control the order in which the parallel operations happen, they might go down to the disk in almost any order. The code deals with this somewhat by having deep work queues for a single kernel thread, making it very likely that a single thread will process all the bios for a single inode. This patch introduces an explicitly ordered work queue. As work structs are placed into the queue they are put onto the tail of a list. They have three callbacks: ->func (cpu intensive processing here) ->ordered_func (order sensitive processing here) ->ordered_free (free the work struct, all processing is done) The work struct has three callbacks. The func callback does the cpu intensive work, and when it completes the work struct is marked as done. Every time a work struct completes, the list is checked to see if the head is marked as done. If so the ordered_func callback is used to do the order sensitive processing and the ordered_free callback is used to do any cleanup. Then we loop back and check the head of the list again. This patch also changes the checksumming code to use the ordered workqueues. One a 4 drive array, it increases streaming writes from 280MB/s to 350MB/s. Signed-off-by: Chris Mason <chris.mason@oracle.com>
-rw-r--r--fs/btrfs/async-thread.c67
-rw-r--r--fs/btrfs/async-thread.h18
-rw-r--r--fs/btrfs/disk-io.c81
-rw-r--r--fs/btrfs/disk-io.h4
-rw-r--r--fs/btrfs/inode.c19
5 files changed, 158 insertions, 31 deletions
diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c
index d82efd722a48..e1e49715459e 100644
--- a/fs/btrfs/async-thread.c
+++ b/fs/btrfs/async-thread.c
@@ -23,6 +23,10 @@
23# include <linux/freezer.h> 23# include <linux/freezer.h>
24#include "async-thread.h" 24#include "async-thread.h"
25 25
26#define WORK_QUEUED_BIT 0
27#define WORK_DONE_BIT 1
28#define WORK_ORDER_DONE_BIT 2
29
26/* 30/*
27 * container for the kthread task pointer and the list of pending work 31 * container for the kthread task pointer and the list of pending work
28 * One of these is allocated per thread. 32 * One of these is allocated per thread.
@@ -88,6 +92,47 @@ static void check_busy_worker(struct btrfs_worker_thread *worker)
88 } 92 }
89} 93}
90 94
95static noinline int run_ordered_completions(struct btrfs_workers *workers,
96 struct btrfs_work *work)
97{
98 unsigned long flags;
99
100 if (!workers->ordered)
101 return 0;
102
103 set_bit(WORK_DONE_BIT, &work->flags);
104
105 spin_lock_irqsave(&workers->lock, flags);
106
107 while(!list_empty(&workers->order_list)) {
108 work = list_entry(workers->order_list.next,
109 struct btrfs_work, order_list);
110
111 if (!test_bit(WORK_DONE_BIT, &work->flags))
112 break;
113
114 /* we are going to call the ordered done function, but
115 * we leave the work item on the list as a barrier so
116 * that later work items that are done don't have their
117 * functions called before this one returns
118 */
119 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
120 break;
121
122 spin_unlock_irqrestore(&workers->lock, flags);
123
124 work->ordered_func(work);
125
126 /* now take the lock again and call the freeing code */
127 spin_lock_irqsave(&workers->lock, flags);
128 list_del(&work->order_list);
129 work->ordered_free(work);
130 }
131
132 spin_unlock_irqrestore(&workers->lock, flags);
133 return 0;
134}
135
91/* 136/*
92 * main loop for servicing work items 137 * main loop for servicing work items
93 */ 138 */
@@ -102,7 +147,7 @@ static int worker_loop(void *arg)
102 cur = worker->pending.next; 147 cur = worker->pending.next;
103 work = list_entry(cur, struct btrfs_work, list); 148 work = list_entry(cur, struct btrfs_work, list);
104 list_del(&work->list); 149 list_del(&work->list);
105 clear_bit(0, &work->flags); 150 clear_bit(WORK_QUEUED_BIT, &work->flags);
106 151
107 work->worker = worker; 152 work->worker = worker;
108 spin_unlock_irq(&worker->lock); 153 spin_unlock_irq(&worker->lock);
@@ -110,8 +155,15 @@ static int worker_loop(void *arg)
110 work->func(work); 155 work->func(work);
111 156
112 atomic_dec(&worker->num_pending); 157 atomic_dec(&worker->num_pending);
158 /*
159 * unless this is an ordered work queue,
160 * 'work' was probably freed by func above.
161 */
162 run_ordered_completions(worker->workers, work);
163
113 spin_lock_irq(&worker->lock); 164 spin_lock_irq(&worker->lock);
114 check_idle_worker(worker); 165 check_idle_worker(worker);
166
115 } 167 }
116 worker->working = 0; 168 worker->working = 0;
117 if (freezing(current)) { 169 if (freezing(current)) {
@@ -154,10 +206,12 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
154 workers->num_workers = 0; 206 workers->num_workers = 0;
155 INIT_LIST_HEAD(&workers->worker_list); 207 INIT_LIST_HEAD(&workers->worker_list);
156 INIT_LIST_HEAD(&workers->idle_list); 208 INIT_LIST_HEAD(&workers->idle_list);
209 INIT_LIST_HEAD(&workers->order_list);
157 spin_lock_init(&workers->lock); 210 spin_lock_init(&workers->lock);
158 workers->max_workers = max; 211 workers->max_workers = max;
159 workers->idle_thresh = 32; 212 workers->idle_thresh = 32;
160 workers->name = name; 213 workers->name = name;
214 workers->ordered = 0;
161} 215}
162 216
163/* 217/*
@@ -296,7 +350,7 @@ int btrfs_requeue_work(struct btrfs_work *work)
296 struct btrfs_worker_thread *worker = work->worker; 350 struct btrfs_worker_thread *worker = work->worker;
297 unsigned long flags; 351 unsigned long flags;
298 352
299 if (test_and_set_bit(0, &work->flags)) 353 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
300 goto out; 354 goto out;
301 355
302 spin_lock_irqsave(&worker->lock, flags); 356 spin_lock_irqsave(&worker->lock, flags);
@@ -330,10 +384,17 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
330 int wake = 0; 384 int wake = 0;
331 385
332 /* don't requeue something already on a list */ 386 /* don't requeue something already on a list */
333 if (test_and_set_bit(0, &work->flags)) 387 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
334 goto out; 388 goto out;
335 389
336 worker = find_worker(workers); 390 worker = find_worker(workers);
391 if (workers->ordered) {
392 spin_lock_irqsave(&workers->lock, flags);
393 list_add_tail(&work->order_list, &workers->order_list);
394 spin_unlock_irqrestore(&workers->lock, flags);
395 } else {
396 INIT_LIST_HEAD(&work->order_list);
397 }
337 398
338 spin_lock_irqsave(&worker->lock, flags); 399 spin_lock_irqsave(&worker->lock, flags);
339 atomic_inc(&worker->num_pending); 400 atomic_inc(&worker->num_pending);
diff --git a/fs/btrfs/async-thread.h b/fs/btrfs/async-thread.h
index 4ec9a2ee0f9d..31be4ed8b63e 100644
--- a/fs/btrfs/async-thread.h
+++ b/fs/btrfs/async-thread.h
@@ -37,10 +37,16 @@ struct btrfs_worker_thread;
37 */ 37 */
38struct btrfs_work { 38struct btrfs_work {
39 /* 39 /*
40 * only func should be set to the function you want called 40 * func should be set to the function you want called
41 * your work struct is passed as the only arg 41 * your work struct is passed as the only arg
42 *
43 * ordered_func must be set for work sent to an ordered work queue,
44 * and it is called to complete a given work item in the same
45 * order they were sent to the queue.
42 */ 46 */
43 void (*func)(struct btrfs_work *work); 47 void (*func)(struct btrfs_work *work);
48 void (*ordered_func)(struct btrfs_work *work);
49 void (*ordered_free)(struct btrfs_work *work);
44 50
45 /* 51 /*
46 * flags should be set to zero. It is used to make sure the 52 * flags should be set to zero. It is used to make sure the
@@ -51,6 +57,7 @@ struct btrfs_work {
51 /* don't touch these */ 57 /* don't touch these */
52 struct btrfs_worker_thread *worker; 58 struct btrfs_worker_thread *worker;
53 struct list_head list; 59 struct list_head list;
60 struct list_head order_list;
54}; 61};
55 62
56struct btrfs_workers { 63struct btrfs_workers {
@@ -63,6 +70,9 @@ struct btrfs_workers {
63 /* once a worker has this many requests or fewer, it is idle */ 70 /* once a worker has this many requests or fewer, it is idle */
64 int idle_thresh; 71 int idle_thresh;
65 72
73 /* force completions in the order they were queued */
74 int ordered;
75
66 /* list with all the work threads. The workers on the idle thread 76 /* list with all the work threads. The workers on the idle thread
67 * may be actively servicing jobs, but they haven't yet hit the 77 * may be actively servicing jobs, but they haven't yet hit the
68 * idle thresh limit above. 78 * idle thresh limit above.
@@ -70,6 +80,12 @@ struct btrfs_workers {
70 struct list_head worker_list; 80 struct list_head worker_list;
71 struct list_head idle_list; 81 struct list_head idle_list;
72 82
83 /*
84 * when operating in ordered mode, this maintains the list
85 * of work items waiting for completion
86 */
87 struct list_head order_list;
88
73 /* lock for finding the next worker thread to queue on */ 89 /* lock for finding the next worker thread to queue on */
74 spinlock_t lock; 90 spinlock_t lock;
75 91
diff --git a/fs/btrfs/disk-io.c b/fs/btrfs/disk-io.c
index 94b4e50f6b2c..e0a28f705a64 100644
--- a/fs/btrfs/disk-io.c
+++ b/fs/btrfs/disk-io.c
@@ -80,7 +80,8 @@ struct async_submit_bio {
80 struct inode *inode; 80 struct inode *inode;
81 struct bio *bio; 81 struct bio *bio;
82 struct list_head list; 82 struct list_head list;
83 extent_submit_bio_hook_t *submit_bio_hook; 83 extent_submit_bio_hook_t *submit_bio_start;
84 extent_submit_bio_hook_t *submit_bio_done;
84 int rw; 85 int rw;
85 int mirror_num; 86 int mirror_num;
86 unsigned long bio_flags; 87 unsigned long bio_flags;
@@ -452,7 +453,18 @@ int btrfs_congested_async(struct btrfs_fs_info *info, int iodone)
452 btrfs_async_submit_limit(info); 453 btrfs_async_submit_limit(info);
453} 454}
454 455
455static void run_one_async_submit(struct btrfs_work *work) 456static void run_one_async_start(struct btrfs_work *work)
457{
458 struct btrfs_fs_info *fs_info;
459 struct async_submit_bio *async;
460
461 async = container_of(work, struct async_submit_bio, work);
462 fs_info = BTRFS_I(async->inode)->root->fs_info;
463 async->submit_bio_start(async->inode, async->rw, async->bio,
464 async->mirror_num, async->bio_flags);
465}
466
467static void run_one_async_done(struct btrfs_work *work)
456{ 468{
457 struct btrfs_fs_info *fs_info; 469 struct btrfs_fs_info *fs_info;
458 struct async_submit_bio *async; 470 struct async_submit_bio *async;
@@ -470,15 +482,23 @@ static void run_one_async_submit(struct btrfs_work *work)
470 waitqueue_active(&fs_info->async_submit_wait)) 482 waitqueue_active(&fs_info->async_submit_wait))
471 wake_up(&fs_info->async_submit_wait); 483 wake_up(&fs_info->async_submit_wait);
472 484
473 async->submit_bio_hook(async->inode, async->rw, async->bio, 485 async->submit_bio_done(async->inode, async->rw, async->bio,
474 async->mirror_num, async->bio_flags); 486 async->mirror_num, async->bio_flags);
487}
488
489static void run_one_async_free(struct btrfs_work *work)
490{
491 struct async_submit_bio *async;
492
493 async = container_of(work, struct async_submit_bio, work);
475 kfree(async); 494 kfree(async);
476} 495}
477 496
478int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode, 497int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
479 int rw, struct bio *bio, int mirror_num, 498 int rw, struct bio *bio, int mirror_num,
480 unsigned long bio_flags, 499 unsigned long bio_flags,
481 extent_submit_bio_hook_t *submit_bio_hook) 500 extent_submit_bio_hook_t *submit_bio_start,
501 extent_submit_bio_hook_t *submit_bio_done)
482{ 502{
483 struct async_submit_bio *async; 503 struct async_submit_bio *async;
484 int limit = btrfs_async_submit_limit(fs_info); 504 int limit = btrfs_async_submit_limit(fs_info);
@@ -491,8 +511,13 @@ int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
491 async->rw = rw; 511 async->rw = rw;
492 async->bio = bio; 512 async->bio = bio;
493 async->mirror_num = mirror_num; 513 async->mirror_num = mirror_num;
494 async->submit_bio_hook = submit_bio_hook; 514 async->submit_bio_start = submit_bio_start;
495 async->work.func = run_one_async_submit; 515 async->submit_bio_done = submit_bio_done;
516
517 async->work.func = run_one_async_start;
518 async->work.ordered_func = run_one_async_done;
519 async->work.ordered_free = run_one_async_free;
520
496 async->work.flags = 0; 521 async->work.flags = 0;
497 async->bio_flags = bio_flags; 522 async->bio_flags = bio_flags;
498 523
@@ -533,29 +558,25 @@ static int btree_csum_one_bio(struct bio *bio)
533 return 0; 558 return 0;
534} 559}
535 560
536static int __btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, 561static int __btree_submit_bio_start(struct inode *inode, int rw,
537 int mirror_num, unsigned long bio_flags) 562 struct bio *bio, int mirror_num,
563 unsigned long bio_flags)
538{ 564{
539 struct btrfs_root *root = BTRFS_I(inode)->root;
540 int ret;
541
542 /* 565 /*
543 * when we're called for a write, we're already in the async 566 * when we're called for a write, we're already in the async
544 * submission context. Just jump into btrfs_map_bio 567 * submission context. Just jump into btrfs_map_bio
545 */ 568 */
546 if (rw & (1 << BIO_RW)) { 569 btree_csum_one_bio(bio);
547 btree_csum_one_bio(bio); 570 return 0;
548 return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, 571}
549 mirror_num, 1);
550 }
551 572
573static int __btree_submit_bio_done(struct inode *inode, int rw, struct bio *bio,
574 int mirror_num, unsigned long bio_flags)
575{
552 /* 576 /*
553 * called for a read, do the setup so that checksum validation 577 * when we're called for a write, we're already in the async
554 * can happen in the async kernel threads 578 * submission context. Just jump into btrfs_map_bio
555 */ 579 */
556 ret = btrfs_bio_wq_end_io(root->fs_info, bio, 1);
557 BUG_ON(ret);
558
559 return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num, 1); 580 return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num, 1);
560} 581}
561 582
@@ -567,11 +588,22 @@ static int btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
567 * can happen in parallel across all CPUs 588 * can happen in parallel across all CPUs
568 */ 589 */
569 if (!(rw & (1 << BIO_RW))) { 590 if (!(rw & (1 << BIO_RW))) {
570 return __btree_submit_bio_hook(inode, rw, bio, mirror_num, 0); 591 int ret;
592 /*
593 * called for a read, do the setup so that checksum validation
594 * can happen in the async kernel threads
595 */
596 ret = btrfs_bio_wq_end_io(BTRFS_I(inode)->root->fs_info,
597 bio, 1);
598 BUG_ON(ret);
599
600 return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio,
601 mirror_num, 1);
571 } 602 }
572 return btrfs_wq_submit_bio(BTRFS_I(inode)->root->fs_info, 603 return btrfs_wq_submit_bio(BTRFS_I(inode)->root->fs_info,
573 inode, rw, bio, mirror_num, 0, 604 inode, rw, bio, mirror_num, 0,
574 __btree_submit_bio_hook); 605 __btree_submit_bio_start,
606 __btree_submit_bio_done);
575} 607}
576 608
577static int btree_writepage(struct page *page, struct writeback_control *wbc) 609static int btree_writepage(struct page *page, struct writeback_control *wbc)
@@ -1534,7 +1566,8 @@ struct btrfs_root *open_ctree(struct super_block *sb,
1534 * were sent by the writeback daemons, improving overall locality 1566 * were sent by the writeback daemons, improving overall locality
1535 * of the IO going down the pipe. 1567 * of the IO going down the pipe.
1536 */ 1568 */
1537 fs_info->workers.idle_thresh = 128; 1569 fs_info->workers.idle_thresh = 8;
1570 fs_info->workers.ordered = 1;
1538 1571
1539 btrfs_init_workers(&fs_info->fixup_workers, "fixup", 1); 1572 btrfs_init_workers(&fs_info->fixup_workers, "fixup", 1);
1540 btrfs_init_workers(&fs_info->endio_workers, "endio", 1573 btrfs_init_workers(&fs_info->endio_workers, "endio",
diff --git a/fs/btrfs/disk-io.h b/fs/btrfs/disk-io.h
index 4eb1f1408d21..b8d5948fa279 100644
--- a/fs/btrfs/disk-io.h
+++ b/fs/btrfs/disk-io.h
@@ -72,7 +72,9 @@ int btrfs_bio_wq_end_io(struct btrfs_fs_info *info, struct bio *bio,
72int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode, 72int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
73 int rw, struct bio *bio, int mirror_num, 73 int rw, struct bio *bio, int mirror_num,
74 unsigned long bio_flags, 74 unsigned long bio_flags,
75 extent_submit_bio_hook_t *submit_bio_hook); 75 extent_submit_bio_hook_t *submit_bio_start,
76 extent_submit_bio_hook_t *submit_bio_done);
77
76int btrfs_congested_async(struct btrfs_fs_info *info, int iodone); 78int btrfs_congested_async(struct btrfs_fs_info *info, int iodone);
77unsigned long btrfs_async_submit_limit(struct btrfs_fs_info *info); 79unsigned long btrfs_async_submit_limit(struct btrfs_fs_info *info);
78int btrfs_write_tree_block(struct extent_buffer *buf); 80int btrfs_write_tree_block(struct extent_buffer *buf);
diff --git a/fs/btrfs/inode.c b/fs/btrfs/inode.c
index 806708dd7e38..3df0ffad976e 100644
--- a/fs/btrfs/inode.c
+++ b/fs/btrfs/inode.c
@@ -881,7 +881,7 @@ int btrfs_merge_bio_hook(struct page *page, unsigned long offset,
881 * At IO completion time the cums attached on the ordered extent record 881 * At IO completion time the cums attached on the ordered extent record
882 * are inserted into the btree 882 * are inserted into the btree
883 */ 883 */
884int __btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, 884int __btrfs_submit_bio_start(struct inode *inode, int rw, struct bio *bio,
885 int mirror_num, unsigned long bio_flags) 885 int mirror_num, unsigned long bio_flags)
886{ 886{
887 struct btrfs_root *root = BTRFS_I(inode)->root; 887 struct btrfs_root *root = BTRFS_I(inode)->root;
@@ -889,7 +889,21 @@ int __btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
889 889
890 ret = btrfs_csum_one_bio(root, inode, bio); 890 ret = btrfs_csum_one_bio(root, inode, bio);
891 BUG_ON(ret); 891 BUG_ON(ret);
892 return 0;
893}
892 894
895/*
896 * in order to insert checksums into the metadata in large chunks,
897 * we wait until bio submission time. All the pages in the bio are
898 * checksummed and sums are attached onto the ordered extent record.
899 *
900 * At IO completion time the cums attached on the ordered extent record
901 * are inserted into the btree
902 */
903int __btrfs_submit_bio_done(struct inode *inode, int rw, struct bio *bio,
904 int mirror_num, unsigned long bio_flags)
905{
906 struct btrfs_root *root = BTRFS_I(inode)->root;
893 return btrfs_map_bio(root, rw, bio, mirror_num, 1); 907 return btrfs_map_bio(root, rw, bio, mirror_num, 1);
894} 908}
895 909
@@ -922,7 +936,8 @@ int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
922 /* we're doing a write, do the async checksumming */ 936 /* we're doing a write, do the async checksumming */
923 return btrfs_wq_submit_bio(BTRFS_I(inode)->root->fs_info, 937 return btrfs_wq_submit_bio(BTRFS_I(inode)->root->fs_info,
924 inode, rw, bio, mirror_num, 938 inode, rw, bio, mirror_num,
925 bio_flags, __btrfs_submit_bio_hook); 939 bio_flags, __btrfs_submit_bio_start,
940 __btrfs_submit_bio_done);
926 } 941 }
927 942
928mapit: 943mapit: