aboutsummaryrefslogtreecommitdiffstats
path: root/fs/btrfs/async-thread.c
diff options
context:
space:
mode:
authorChris Mason <chris.mason@oracle.com>2008-11-06 22:03:00 -0500
committerChris Mason <chris.mason@oracle.com>2008-11-06 22:03:00 -0500
commit4a69a41009c4ac691f7d9c289f5f37fabeddce46 (patch)
tree1dac90d2f8e4ad4114fb1f4c168925daf2769d28 /fs/btrfs/async-thread.c
parent537fb0671549a9a6457ce42a25ab34b29d97a256 (diff)
Btrfs: Add ordered async work queues
Btrfs uses kernel threads to create async work queues for cpu intensive operations such as checksumming and decompression. These work well, but they make it difficult to keep IO order intact. A single writepages call from pdflush or fsync will turn into a number of bios, and each bio is checksummed in parallel. Once the checksum is computed, the bio is sent down to the disk, and since we don't control the order in which the parallel operations happen, they might go down to the disk in almost any order. The code deals with this somewhat by having deep work queues for a single kernel thread, making it very likely that a single thread will process all the bios for a single inode. This patch introduces an explicitly ordered work queue. As work structs are placed into the queue they are put onto the tail of a list. They have three callbacks: ->func (cpu intensive processing here) ->ordered_func (order sensitive processing here) ->ordered_free (free the work struct, all processing is done) The work struct has three callbacks. The func callback does the cpu intensive work, and when it completes the work struct is marked as done. Every time a work struct completes, the list is checked to see if the head is marked as done. If so the ordered_func callback is used to do the order sensitive processing and the ordered_free callback is used to do any cleanup. Then we loop back and check the head of the list again. This patch also changes the checksumming code to use the ordered workqueues. One a 4 drive array, it increases streaming writes from 280MB/s to 350MB/s. Signed-off-by: Chris Mason <chris.mason@oracle.com>
Diffstat (limited to 'fs/btrfs/async-thread.c')
-rw-r--r--fs/btrfs/async-thread.c67
1 files changed, 64 insertions, 3 deletions
diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c
index d82efd722a48..e1e49715459e 100644
--- a/fs/btrfs/async-thread.c
+++ b/fs/btrfs/async-thread.c
@@ -23,6 +23,10 @@
23# include <linux/freezer.h> 23# include <linux/freezer.h>
24#include "async-thread.h" 24#include "async-thread.h"
25 25
26#define WORK_QUEUED_BIT 0
27#define WORK_DONE_BIT 1
28#define WORK_ORDER_DONE_BIT 2
29
26/* 30/*
27 * container for the kthread task pointer and the list of pending work 31 * container for the kthread task pointer and the list of pending work
28 * One of these is allocated per thread. 32 * One of these is allocated per thread.
@@ -88,6 +92,47 @@ static void check_busy_worker(struct btrfs_worker_thread *worker)
88 } 92 }
89} 93}
90 94
95static noinline int run_ordered_completions(struct btrfs_workers *workers,
96 struct btrfs_work *work)
97{
98 unsigned long flags;
99
100 if (!workers->ordered)
101 return 0;
102
103 set_bit(WORK_DONE_BIT, &work->flags);
104
105 spin_lock_irqsave(&workers->lock, flags);
106
107 while(!list_empty(&workers->order_list)) {
108 work = list_entry(workers->order_list.next,
109 struct btrfs_work, order_list);
110
111 if (!test_bit(WORK_DONE_BIT, &work->flags))
112 break;
113
114 /* we are going to call the ordered done function, but
115 * we leave the work item on the list as a barrier so
116 * that later work items that are done don't have their
117 * functions called before this one returns
118 */
119 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
120 break;
121
122 spin_unlock_irqrestore(&workers->lock, flags);
123
124 work->ordered_func(work);
125
126 /* now take the lock again and call the freeing code */
127 spin_lock_irqsave(&workers->lock, flags);
128 list_del(&work->order_list);
129 work->ordered_free(work);
130 }
131
132 spin_unlock_irqrestore(&workers->lock, flags);
133 return 0;
134}
135
91/* 136/*
92 * main loop for servicing work items 137 * main loop for servicing work items
93 */ 138 */
@@ -102,7 +147,7 @@ static int worker_loop(void *arg)
102 cur = worker->pending.next; 147 cur = worker->pending.next;
103 work = list_entry(cur, struct btrfs_work, list); 148 work = list_entry(cur, struct btrfs_work, list);
104 list_del(&work->list); 149 list_del(&work->list);
105 clear_bit(0, &work->flags); 150 clear_bit(WORK_QUEUED_BIT, &work->flags);
106 151
107 work->worker = worker; 152 work->worker = worker;
108 spin_unlock_irq(&worker->lock); 153 spin_unlock_irq(&worker->lock);
@@ -110,8 +155,15 @@ static int worker_loop(void *arg)
110 work->func(work); 155 work->func(work);
111 156
112 atomic_dec(&worker->num_pending); 157 atomic_dec(&worker->num_pending);
158 /*
159 * unless this is an ordered work queue,
160 * 'work' was probably freed by func above.
161 */
162 run_ordered_completions(worker->workers, work);
163
113 spin_lock_irq(&worker->lock); 164 spin_lock_irq(&worker->lock);
114 check_idle_worker(worker); 165 check_idle_worker(worker);
166
115 } 167 }
116 worker->working = 0; 168 worker->working = 0;
117 if (freezing(current)) { 169 if (freezing(current)) {
@@ -154,10 +206,12 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
154 workers->num_workers = 0; 206 workers->num_workers = 0;
155 INIT_LIST_HEAD(&workers->worker_list); 207 INIT_LIST_HEAD(&workers->worker_list);
156 INIT_LIST_HEAD(&workers->idle_list); 208 INIT_LIST_HEAD(&workers->idle_list);
209 INIT_LIST_HEAD(&workers->order_list);
157 spin_lock_init(&workers->lock); 210 spin_lock_init(&workers->lock);
158 workers->max_workers = max; 211 workers->max_workers = max;
159 workers->idle_thresh = 32; 212 workers->idle_thresh = 32;
160 workers->name = name; 213 workers->name = name;
214 workers->ordered = 0;
161} 215}
162 216
163/* 217/*
@@ -296,7 +350,7 @@ int btrfs_requeue_work(struct btrfs_work *work)
296 struct btrfs_worker_thread *worker = work->worker; 350 struct btrfs_worker_thread *worker = work->worker;
297 unsigned long flags; 351 unsigned long flags;
298 352
299 if (test_and_set_bit(0, &work->flags)) 353 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
300 goto out; 354 goto out;
301 355
302 spin_lock_irqsave(&worker->lock, flags); 356 spin_lock_irqsave(&worker->lock, flags);
@@ -330,10 +384,17 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
330 int wake = 0; 384 int wake = 0;
331 385
332 /* don't requeue something already on a list */ 386 /* don't requeue something already on a list */
333 if (test_and_set_bit(0, &work->flags)) 387 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
334 goto out; 388 goto out;
335 389
336 worker = find_worker(workers); 390 worker = find_worker(workers);
391 if (workers->ordered) {
392 spin_lock_irqsave(&workers->lock, flags);
393 list_add_tail(&work->order_list, &workers->order_list);
394 spin_unlock_irqrestore(&workers->lock, flags);
395 } else {
396 INIT_LIST_HEAD(&work->order_list);
397 }
337 398
338 spin_lock_irqsave(&worker->lock, flags); 399 spin_lock_irqsave(&worker->lock, flags);
339 atomic_inc(&worker->num_pending); 400 atomic_inc(&worker->num_pending);