aboutsummaryrefslogtreecommitdiffstats
path: root/kernel/workqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/workqueue.c')
-rw-r--r--kernel/workqueue.c783
1 files changed, 406 insertions, 377 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index b6fa5e63085d..fb56fedd5c02 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -36,30 +36,20 @@
36/* 36/*
37 * The per-CPU workqueue (if single thread, we always use the first 37 * The per-CPU workqueue (if single thread, we always use the first
38 * possible cpu). 38 * possible cpu).
39 *
40 * The sequence counters are for flush_scheduled_work(). It wants to wait
41 * until all currently-scheduled works are completed, but it doesn't
42 * want to be livelocked by new, incoming ones. So it waits until
43 * remove_sequence is >= the insert_sequence which pertained when
44 * flush_scheduled_work() was called.
45 */ 39 */
46struct cpu_workqueue_struct { 40struct cpu_workqueue_struct {
47 41
48 spinlock_t lock; 42 spinlock_t lock;
49 43
50 long remove_sequence; /* Least-recently added (next to run) */
51 long insert_sequence; /* Next to add */
52
53 struct list_head worklist; 44 struct list_head worklist;
54 wait_queue_head_t more_work; 45 wait_queue_head_t more_work;
55 wait_queue_head_t work_done; 46 struct work_struct *current_work;
56 47
57 struct workqueue_struct *wq; 48 struct workqueue_struct *wq;
58 struct task_struct *thread; 49 struct task_struct *thread;
50 int should_stop;
59 51
60 int run_depth; /* Detect run_workqueue() recursion depth */ 52 int run_depth; /* Detect run_workqueue() recursion depth */
61
62 int freezeable; /* Freeze the thread during suspend */
63} ____cacheline_aligned; 53} ____cacheline_aligned;
64 54
65/* 55/*
@@ -68,8 +58,10 @@ struct cpu_workqueue_struct {
68 */ 58 */
69struct workqueue_struct { 59struct workqueue_struct {
70 struct cpu_workqueue_struct *cpu_wq; 60 struct cpu_workqueue_struct *cpu_wq;
61 struct list_head list;
71 const char *name; 62 const char *name;
72 struct list_head list; /* Empty if single thread */ 63 int singlethread;
64 int freezeable; /* Freeze threads during suspend */
73}; 65};
74 66
75/* All the per-cpu workqueues on the system, for hotplug cpu to add/remove 67/* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
@@ -77,106 +69,68 @@ struct workqueue_struct {
77static DEFINE_MUTEX(workqueue_mutex); 69static DEFINE_MUTEX(workqueue_mutex);
78static LIST_HEAD(workqueues); 70static LIST_HEAD(workqueues);
79 71
80static int singlethread_cpu; 72static int singlethread_cpu __read_mostly;
73static cpumask_t cpu_singlethread_map __read_mostly;
74/* optimization, we could use cpu_possible_map */
75static cpumask_t cpu_populated_map __read_mostly;
81 76
82/* If it's single threaded, it isn't in the list of workqueues. */ 77/* If it's single threaded, it isn't in the list of workqueues. */
83static inline int is_single_threaded(struct workqueue_struct *wq) 78static inline int is_single_threaded(struct workqueue_struct *wq)
84{ 79{
85 return list_empty(&wq->list); 80 return wq->singlethread;
81}
82
83static const cpumask_t *wq_cpu_map(struct workqueue_struct *wq)
84{
85 return is_single_threaded(wq)
86 ? &cpu_singlethread_map : &cpu_populated_map;
87}
88
89static
90struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
91{
92 if (unlikely(is_single_threaded(wq)))
93 cpu = singlethread_cpu;
94 return per_cpu_ptr(wq->cpu_wq, cpu);
86} 95}
87 96
88/* 97/*
89 * Set the workqueue on which a work item is to be run 98 * Set the workqueue on which a work item is to be run
90 * - Must *only* be called if the pending flag is set 99 * - Must *only* be called if the pending flag is set
91 */ 100 */
92static inline void set_wq_data(struct work_struct *work, void *wq) 101static inline void set_wq_data(struct work_struct *work,
102 struct cpu_workqueue_struct *cwq)
93{ 103{
94 unsigned long new; 104 unsigned long new;
95 105
96 BUG_ON(!work_pending(work)); 106 BUG_ON(!work_pending(work));
97 107
98 new = (unsigned long) wq | (1UL << WORK_STRUCT_PENDING); 108 new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING);
99 new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work); 109 new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
100 atomic_long_set(&work->data, new); 110 atomic_long_set(&work->data, new);
101} 111}
102 112
103static inline void *get_wq_data(struct work_struct *work) 113static inline
114struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
104{ 115{
105 return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK); 116 return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
106} 117}
107 118
108static int __run_work(struct cpu_workqueue_struct *cwq, struct work_struct *work) 119static void insert_work(struct cpu_workqueue_struct *cwq,
120 struct work_struct *work, int tail)
109{ 121{
110 int ret = 0; 122 set_wq_data(work, cwq);
111 unsigned long flags;
112
113 spin_lock_irqsave(&cwq->lock, flags);
114 /* 123 /*
115 * We need to re-validate the work info after we've gotten 124 * Ensure that we get the right work->data if we see the
116 * the cpu_workqueue lock. We can run the work now iff: 125 * result of list_add() below, see try_to_grab_pending().
117 *
118 * - the wq_data still matches the cpu_workqueue_struct
119 * - AND the work is still marked pending
120 * - AND the work is still on a list (which will be this
121 * workqueue_struct list)
122 *
123 * All these conditions are important, because we
124 * need to protect against the work being run right
125 * now on another CPU (all but the last one might be
126 * true if it's currently running and has not been
127 * released yet, for example).
128 */ 126 */
129 if (get_wq_data(work) == cwq 127 smp_wmb();
130 && work_pending(work) 128 if (tail)
131 && !list_empty(&work->entry)) { 129 list_add_tail(&work->entry, &cwq->worklist);
132 work_func_t f = work->func; 130 else
133 list_del_init(&work->entry); 131 list_add(&work->entry, &cwq->worklist);
134 spin_unlock_irqrestore(&cwq->lock, flags); 132 wake_up(&cwq->more_work);
135
136 if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work)))
137 work_release(work);
138 f(work);
139
140 spin_lock_irqsave(&cwq->lock, flags);
141 cwq->remove_sequence++;
142 wake_up(&cwq->work_done);
143 ret = 1;
144 }
145 spin_unlock_irqrestore(&cwq->lock, flags);
146 return ret;
147}
148
149/**
150 * run_scheduled_work - run scheduled work synchronously
151 * @work: work to run
152 *
153 * This checks if the work was pending, and runs it
154 * synchronously if so. It returns a boolean to indicate
155 * whether it had any scheduled work to run or not.
156 *
157 * NOTE! This _only_ works for normal work_structs. You
158 * CANNOT use this for delayed work, because the wq data
159 * for delayed work will not point properly to the per-
160 * CPU workqueue struct, but will change!
161 */
162int fastcall run_scheduled_work(struct work_struct *work)
163{
164 for (;;) {
165 struct cpu_workqueue_struct *cwq;
166
167 if (!work_pending(work))
168 return 0;
169 if (list_empty(&work->entry))
170 return 0;
171 /* NOTE! This depends intimately on __queue_work! */
172 cwq = get_wq_data(work);
173 if (!cwq)
174 return 0;
175 if (__run_work(cwq, work))
176 return 1;
177 }
178} 133}
179EXPORT_SYMBOL(run_scheduled_work);
180 134
181/* Preempt must be disabled. */ 135/* Preempt must be disabled. */
182static void __queue_work(struct cpu_workqueue_struct *cwq, 136static void __queue_work(struct cpu_workqueue_struct *cwq,
@@ -185,10 +139,7 @@ static void __queue_work(struct cpu_workqueue_struct *cwq,
185 unsigned long flags; 139 unsigned long flags;
186 140
187 spin_lock_irqsave(&cwq->lock, flags); 141 spin_lock_irqsave(&cwq->lock, flags);
188 set_wq_data(work, cwq); 142 insert_work(cwq, work, 1);
189 list_add_tail(&work->entry, &cwq->worklist);
190 cwq->insert_sequence++;
191 wake_up(&cwq->more_work);
192 spin_unlock_irqrestore(&cwq->lock, flags); 143 spin_unlock_irqrestore(&cwq->lock, flags);
193} 144}
194 145
@@ -204,16 +155,14 @@ static void __queue_work(struct cpu_workqueue_struct *cwq,
204 */ 155 */
205int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work) 156int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
206{ 157{
207 int ret = 0, cpu = get_cpu(); 158 int ret = 0;
208 159
209 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 160 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
210 if (unlikely(is_single_threaded(wq)))
211 cpu = singlethread_cpu;
212 BUG_ON(!list_empty(&work->entry)); 161 BUG_ON(!list_empty(&work->entry));
213 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 162 __queue_work(wq_per_cpu(wq, get_cpu()), work);
163 put_cpu();
214 ret = 1; 164 ret = 1;
215 } 165 }
216 put_cpu();
217 return ret; 166 return ret;
218} 167}
219EXPORT_SYMBOL_GPL(queue_work); 168EXPORT_SYMBOL_GPL(queue_work);
@@ -221,13 +170,10 @@ EXPORT_SYMBOL_GPL(queue_work);
221void delayed_work_timer_fn(unsigned long __data) 170void delayed_work_timer_fn(unsigned long __data)
222{ 171{
223 struct delayed_work *dwork = (struct delayed_work *)__data; 172 struct delayed_work *dwork = (struct delayed_work *)__data;
224 struct workqueue_struct *wq = get_wq_data(&dwork->work); 173 struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
225 int cpu = smp_processor_id(); 174 struct workqueue_struct *wq = cwq->wq;
226 175
227 if (unlikely(is_single_threaded(wq))) 176 __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work);
228 cpu = singlethread_cpu;
229
230 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), &dwork->work);
231} 177}
232 178
233/** 179/**
@@ -241,27 +187,11 @@ void delayed_work_timer_fn(unsigned long __data)
241int fastcall queue_delayed_work(struct workqueue_struct *wq, 187int fastcall queue_delayed_work(struct workqueue_struct *wq,
242 struct delayed_work *dwork, unsigned long delay) 188 struct delayed_work *dwork, unsigned long delay)
243{ 189{
244 int ret = 0; 190 timer_stats_timer_set_start_info(&dwork->timer);
245 struct timer_list *timer = &dwork->timer;
246 struct work_struct *work = &dwork->work;
247
248 timer_stats_timer_set_start_info(timer);
249 if (delay == 0) 191 if (delay == 0)
250 return queue_work(wq, work); 192 return queue_work(wq, &dwork->work);
251
252 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
253 BUG_ON(timer_pending(timer));
254 BUG_ON(!list_empty(&work->entry));
255 193
256 /* This stores wq for the moment, for the timer_fn */ 194 return queue_delayed_work_on(-1, wq, dwork, delay);
257 set_wq_data(work, wq);
258 timer->expires = jiffies + delay;
259 timer->data = (unsigned long)dwork;
260 timer->function = delayed_work_timer_fn;
261 add_timer(timer);
262 ret = 1;
263 }
264 return ret;
265} 195}
266EXPORT_SYMBOL_GPL(queue_delayed_work); 196EXPORT_SYMBOL_GPL(queue_delayed_work);
267 197
@@ -285,12 +215,16 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
285 BUG_ON(timer_pending(timer)); 215 BUG_ON(timer_pending(timer));
286 BUG_ON(!list_empty(&work->entry)); 216 BUG_ON(!list_empty(&work->entry));
287 217
288 /* This stores wq for the moment, for the timer_fn */ 218 /* This stores cwq for the moment, for the timer_fn */
289 set_wq_data(work, wq); 219 set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id()));
290 timer->expires = jiffies + delay; 220 timer->expires = jiffies + delay;
291 timer->data = (unsigned long)dwork; 221 timer->data = (unsigned long)dwork;
292 timer->function = delayed_work_timer_fn; 222 timer->function = delayed_work_timer_fn;
293 add_timer_on(timer, cpu); 223
224 if (unlikely(cpu >= 0))
225 add_timer_on(timer, cpu);
226 else
227 add_timer(timer);
294 ret = 1; 228 ret = 1;
295 } 229 }
296 return ret; 230 return ret;
@@ -299,13 +233,7 @@ EXPORT_SYMBOL_GPL(queue_delayed_work_on);
299 233
300static void run_workqueue(struct cpu_workqueue_struct *cwq) 234static void run_workqueue(struct cpu_workqueue_struct *cwq)
301{ 235{
302 unsigned long flags; 236 spin_lock_irq(&cwq->lock);
303
304 /*
305 * Keep taking off work from the queue until
306 * done.
307 */
308 spin_lock_irqsave(&cwq->lock, flags);
309 cwq->run_depth++; 237 cwq->run_depth++;
310 if (cwq->run_depth > 3) { 238 if (cwq->run_depth > 3) {
311 /* morton gets to eat his hat */ 239 /* morton gets to eat his hat */
@@ -318,12 +246,12 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq)
318 struct work_struct, entry); 246 struct work_struct, entry);
319 work_func_t f = work->func; 247 work_func_t f = work->func;
320 248
249 cwq->current_work = work;
321 list_del_init(cwq->worklist.next); 250 list_del_init(cwq->worklist.next);
322 spin_unlock_irqrestore(&cwq->lock, flags); 251 spin_unlock_irq(&cwq->lock);
323 252
324 BUG_ON(get_wq_data(work) != cwq); 253 BUG_ON(get_wq_data(work) != cwq);
325 if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work))) 254 work_clear_pending(work);
326 work_release(work);
327 f(work); 255 f(work);
328 256
329 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { 257 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
@@ -337,63 +265,81 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq)
337 dump_stack(); 265 dump_stack();
338 } 266 }
339 267
340 spin_lock_irqsave(&cwq->lock, flags); 268 spin_lock_irq(&cwq->lock);
341 cwq->remove_sequence++; 269 cwq->current_work = NULL;
342 wake_up(&cwq->work_done);
343 } 270 }
344 cwq->run_depth--; 271 cwq->run_depth--;
345 spin_unlock_irqrestore(&cwq->lock, flags); 272 spin_unlock_irq(&cwq->lock);
273}
274
275/*
276 * NOTE: the caller must not touch *cwq if this func returns true
277 */
278static int cwq_should_stop(struct cpu_workqueue_struct *cwq)
279{
280 int should_stop = cwq->should_stop;
281
282 if (unlikely(should_stop)) {
283 spin_lock_irq(&cwq->lock);
284 should_stop = cwq->should_stop && list_empty(&cwq->worklist);
285 if (should_stop)
286 cwq->thread = NULL;
287 spin_unlock_irq(&cwq->lock);
288 }
289
290 return should_stop;
346} 291}
347 292
348static int worker_thread(void *__cwq) 293static int worker_thread(void *__cwq)
349{ 294{
350 struct cpu_workqueue_struct *cwq = __cwq; 295 struct cpu_workqueue_struct *cwq = __cwq;
351 DECLARE_WAITQUEUE(wait, current); 296 DEFINE_WAIT(wait);
352 struct k_sigaction sa;
353 sigset_t blocked;
354 297
355 if (!cwq->freezeable) 298 if (!cwq->wq->freezeable)
356 current->flags |= PF_NOFREEZE; 299 current->flags |= PF_NOFREEZE;
357 300
358 set_user_nice(current, -5); 301 set_user_nice(current, -5);
359 302
360 /* Block and flush all signals */ 303 for (;;) {
361 sigfillset(&blocked); 304 prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
362 sigprocmask(SIG_BLOCK, &blocked, NULL); 305 if (!freezing(current) && !cwq->should_stop
363 flush_signals(current); 306 && list_empty(&cwq->worklist))
364 307 schedule();
365 /* 308 finish_wait(&cwq->more_work, &wait);
366 * We inherited MPOL_INTERLEAVE from the booting kernel.
367 * Set MPOL_DEFAULT to insure node local allocations.
368 */
369 numa_default_policy();
370
371 /* SIG_IGN makes children autoreap: see do_notify_parent(). */
372 sa.sa.sa_handler = SIG_IGN;
373 sa.sa.sa_flags = 0;
374 siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
375 do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
376 309
377 set_current_state(TASK_INTERRUPTIBLE); 310 try_to_freeze();
378 while (!kthread_should_stop()) {
379 if (cwq->freezeable)
380 try_to_freeze();
381 311
382 add_wait_queue(&cwq->more_work, &wait); 312 if (cwq_should_stop(cwq))
383 if (list_empty(&cwq->worklist)) 313 break;
384 schedule();
385 else
386 __set_current_state(TASK_RUNNING);
387 remove_wait_queue(&cwq->more_work, &wait);
388 314
389 if (!list_empty(&cwq->worklist)) 315 run_workqueue(cwq);
390 run_workqueue(cwq);
391 set_current_state(TASK_INTERRUPTIBLE);
392 } 316 }
393 __set_current_state(TASK_RUNNING); 317
394 return 0; 318 return 0;
395} 319}
396 320
321struct wq_barrier {
322 struct work_struct work;
323 struct completion done;
324};
325
326static void wq_barrier_func(struct work_struct *work)
327{
328 struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
329 complete(&barr->done);
330}
331
332static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
333 struct wq_barrier *barr, int tail)
334{
335 INIT_WORK(&barr->work, wq_barrier_func);
336 __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
337
338 init_completion(&barr->done);
339
340 insert_work(cwq, &barr->work, tail);
341}
342
397static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 343static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
398{ 344{
399 if (cwq->thread == current) { 345 if (cwq->thread == current) {
@@ -403,21 +349,18 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
403 */ 349 */
404 run_workqueue(cwq); 350 run_workqueue(cwq);
405 } else { 351 } else {
406 DEFINE_WAIT(wait); 352 struct wq_barrier barr;
407 long sequence_needed; 353 int active = 0;
408 354
409 spin_lock_irq(&cwq->lock); 355 spin_lock_irq(&cwq->lock);
410 sequence_needed = cwq->insert_sequence; 356 if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
411 357 insert_wq_barrier(cwq, &barr, 1);
412 while (sequence_needed - cwq->remove_sequence > 0) { 358 active = 1;
413 prepare_to_wait(&cwq->work_done, &wait,
414 TASK_UNINTERRUPTIBLE);
415 spin_unlock_irq(&cwq->lock);
416 schedule();
417 spin_lock_irq(&cwq->lock);
418 } 359 }
419 finish_wait(&cwq->work_done, &wait);
420 spin_unlock_irq(&cwq->lock); 360 spin_unlock_irq(&cwq->lock);
361
362 if (active)
363 wait_for_completion(&barr.done);
421 } 364 }
422} 365}
423 366
@@ -428,151 +371,145 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
428 * Forces execution of the workqueue and blocks until its completion. 371 * Forces execution of the workqueue and blocks until its completion.
429 * This is typically used in driver shutdown handlers. 372 * This is typically used in driver shutdown handlers.
430 * 373 *
431 * This function will sample each workqueue's current insert_sequence number and 374 * We sleep until all works which were queued on entry have been handled,
432 * will sleep until the head sequence is greater than or equal to that. This 375 * but we are not livelocked by new incoming ones.
433 * means that we sleep until all works which were queued on entry have been
434 * handled, but we are not livelocked by new incoming ones.
435 * 376 *
436 * This function used to run the workqueues itself. Now we just wait for the 377 * This function used to run the workqueues itself. Now we just wait for the
437 * helper threads to do it. 378 * helper threads to do it.
438 */ 379 */
439void fastcall flush_workqueue(struct workqueue_struct *wq) 380void fastcall flush_workqueue(struct workqueue_struct *wq)
440{ 381{
382 const cpumask_t *cpu_map = wq_cpu_map(wq);
383 int cpu;
384
441 might_sleep(); 385 might_sleep();
386 for_each_cpu_mask(cpu, *cpu_map)
387 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
388}
389EXPORT_SYMBOL_GPL(flush_workqueue);
442 390
443 if (is_single_threaded(wq)) { 391/*
444 /* Always use first cpu's area. */ 392 * Upon a successful return, the caller "owns" WORK_STRUCT_PENDING bit,
445 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); 393 * so this work can't be re-armed in any way.
446 } else { 394 */
447 int cpu; 395static int try_to_grab_pending(struct work_struct *work)
396{
397 struct cpu_workqueue_struct *cwq;
398 int ret = 0;
448 399
449 mutex_lock(&workqueue_mutex); 400 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work)))
450 for_each_online_cpu(cpu) 401 return 1;
451 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 402
452 mutex_unlock(&workqueue_mutex); 403 /*
404 * The queueing is in progress, or it is already queued. Try to
405 * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
406 */
407
408 cwq = get_wq_data(work);
409 if (!cwq)
410 return ret;
411
412 spin_lock_irq(&cwq->lock);
413 if (!list_empty(&work->entry)) {
414 /*
415 * This work is queued, but perhaps we locked the wrong cwq.
416 * In that case we must see the new value after rmb(), see
417 * insert_work()->wmb().
418 */
419 smp_rmb();
420 if (cwq == get_wq_data(work)) {
421 list_del_init(&work->entry);
422 ret = 1;
423 }
453 } 424 }
425 spin_unlock_irq(&cwq->lock);
426
427 return ret;
454} 428}
455EXPORT_SYMBOL_GPL(flush_workqueue);
456 429
457static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, 430static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
458 int cpu, int freezeable) 431 struct work_struct *work)
459{ 432{
460 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 433 struct wq_barrier barr;
461 struct task_struct *p; 434 int running = 0;
462 435
463 spin_lock_init(&cwq->lock); 436 spin_lock_irq(&cwq->lock);
464 cwq->wq = wq; 437 if (unlikely(cwq->current_work == work)) {
465 cwq->thread = NULL; 438 insert_wq_barrier(cwq, &barr, 0);
466 cwq->insert_sequence = 0; 439 running = 1;
467 cwq->remove_sequence = 0; 440 }
468 cwq->freezeable = freezeable; 441 spin_unlock_irq(&cwq->lock);
469 INIT_LIST_HEAD(&cwq->worklist);
470 init_waitqueue_head(&cwq->more_work);
471 init_waitqueue_head(&cwq->work_done);
472 442
473 if (is_single_threaded(wq)) 443 if (unlikely(running))
474 p = kthread_create(worker_thread, cwq, "%s", wq->name); 444 wait_for_completion(&barr.done);
475 else
476 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
477 if (IS_ERR(p))
478 return NULL;
479 cwq->thread = p;
480 return p;
481} 445}
482 446
483struct workqueue_struct *__create_workqueue(const char *name, 447static void wait_on_work(struct work_struct *work)
484 int singlethread, int freezeable)
485{ 448{
486 int cpu, destroy = 0; 449 struct cpu_workqueue_struct *cwq;
487 struct workqueue_struct *wq; 450 struct workqueue_struct *wq;
488 struct task_struct *p; 451 const cpumask_t *cpu_map;
452 int cpu;
489 453
490 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 454 might_sleep();
491 if (!wq)
492 return NULL;
493 455
494 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 456 cwq = get_wq_data(work);
495 if (!wq->cpu_wq) { 457 if (!cwq)
496 kfree(wq); 458 return;
497 return NULL;
498 }
499 459
500 wq->name = name; 460 wq = cwq->wq;
501 mutex_lock(&workqueue_mutex); 461 cpu_map = wq_cpu_map(wq);
502 if (singlethread) {
503 INIT_LIST_HEAD(&wq->list);
504 p = create_workqueue_thread(wq, singlethread_cpu, freezeable);
505 if (!p)
506 destroy = 1;
507 else
508 wake_up_process(p);
509 } else {
510 list_add(&wq->list, &workqueues);
511 for_each_online_cpu(cpu) {
512 p = create_workqueue_thread(wq, cpu, freezeable);
513 if (p) {
514 kthread_bind(p, cpu);
515 wake_up_process(p);
516 } else
517 destroy = 1;
518 }
519 }
520 mutex_unlock(&workqueue_mutex);
521 462
522 /* 463 for_each_cpu_mask(cpu, *cpu_map)
523 * Was there any error during startup? If yes then clean up: 464 wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
524 */
525 if (destroy) {
526 destroy_workqueue(wq);
527 wq = NULL;
528 }
529 return wq;
530} 465}
531EXPORT_SYMBOL_GPL(__create_workqueue);
532 466
533static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu) 467/**
468 * cancel_work_sync - block until a work_struct's callback has terminated
469 * @work: the work which is to be flushed
470 *
471 * cancel_work_sync() will cancel the work if it is queued. If the work's
472 * callback appears to be running, cancel_work_sync() will block until it
473 * has completed.
474 *
475 * It is possible to use this function if the work re-queues itself. It can
476 * cancel the work even if it migrates to another workqueue, however in that
477 * case it only guarantees that work->func() has completed on the last queued
478 * workqueue.
479 *
480 * cancel_work_sync(&delayed_work->work) should be used only if ->timer is not
481 * pending, otherwise it goes into a busy-wait loop until the timer expires.
482 *
483 * The caller must ensure that workqueue_struct on which this work was last
484 * queued can't be destroyed before this function returns.
485 */
486void cancel_work_sync(struct work_struct *work)
534{ 487{
535 struct cpu_workqueue_struct *cwq; 488 while (!try_to_grab_pending(work))
536 unsigned long flags; 489 cpu_relax();
537 struct task_struct *p; 490 wait_on_work(work);
538 491 work_clear_pending(work);
539 cwq = per_cpu_ptr(wq->cpu_wq, cpu);
540 spin_lock_irqsave(&cwq->lock, flags);
541 p = cwq->thread;
542 cwq->thread = NULL;
543 spin_unlock_irqrestore(&cwq->lock, flags);
544 if (p)
545 kthread_stop(p);
546} 492}
493EXPORT_SYMBOL_GPL(cancel_work_sync);
547 494
548/** 495/**
549 * destroy_workqueue - safely terminate a workqueue 496 * cancel_rearming_delayed_work - reliably kill off a delayed work.
550 * @wq: target workqueue 497 * @dwork: the delayed work struct
551 * 498 *
552 * Safely destroy a workqueue. All work currently pending will be done first. 499 * It is possible to use this function if @dwork rearms itself via queue_work()
500 * or queue_delayed_work(). See also the comment for cancel_work_sync().
553 */ 501 */
554void destroy_workqueue(struct workqueue_struct *wq) 502void cancel_rearming_delayed_work(struct delayed_work *dwork)
555{ 503{
556 int cpu; 504 while (!del_timer(&dwork->timer) &&
557 505 !try_to_grab_pending(&dwork->work))
558 flush_workqueue(wq); 506 cpu_relax();
559 507 wait_on_work(&dwork->work);
560 /* We don't need the distraction of CPUs appearing and vanishing. */ 508 work_clear_pending(&dwork->work);
561 mutex_lock(&workqueue_mutex);
562 if (is_single_threaded(wq))
563 cleanup_workqueue_thread(wq, singlethread_cpu);
564 else {
565 for_each_online_cpu(cpu)
566 cleanup_workqueue_thread(wq, cpu);
567 list_del(&wq->list);
568 }
569 mutex_unlock(&workqueue_mutex);
570 free_percpu(wq->cpu_wq);
571 kfree(wq);
572} 509}
573EXPORT_SYMBOL_GPL(destroy_workqueue); 510EXPORT_SYMBOL(cancel_rearming_delayed_work);
574 511
575static struct workqueue_struct *keventd_wq; 512static struct workqueue_struct *keventd_wq __read_mostly;
576 513
577/** 514/**
578 * schedule_work - put work task in global workqueue 515 * schedule_work - put work task in global workqueue
@@ -638,7 +575,7 @@ int schedule_on_each_cpu(work_func_t func)
638 if (!works) 575 if (!works)
639 return -ENOMEM; 576 return -ENOMEM;
640 577
641 mutex_lock(&workqueue_mutex); 578 preempt_disable(); /* CPU hotplug */
642 for_each_online_cpu(cpu) { 579 for_each_online_cpu(cpu) {
643 struct work_struct *work = per_cpu_ptr(works, cpu); 580 struct work_struct *work = per_cpu_ptr(works, cpu);
644 581
@@ -646,7 +583,7 @@ int schedule_on_each_cpu(work_func_t func)
646 set_bit(WORK_STRUCT_PENDING, work_data_bits(work)); 583 set_bit(WORK_STRUCT_PENDING, work_data_bits(work));
647 __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), work); 584 __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), work);
648 } 585 }
649 mutex_unlock(&workqueue_mutex); 586 preempt_enable();
650 flush_workqueue(keventd_wq); 587 flush_workqueue(keventd_wq);
651 free_percpu(works); 588 free_percpu(works);
652 return 0; 589 return 0;
@@ -659,29 +596,6 @@ void flush_scheduled_work(void)
659EXPORT_SYMBOL(flush_scheduled_work); 596EXPORT_SYMBOL(flush_scheduled_work);
660 597
661/** 598/**
662 * cancel_rearming_delayed_workqueue - reliably kill off a delayed work whose handler rearms the delayed work.
663 * @wq: the controlling workqueue structure
664 * @dwork: the delayed work struct
665 */
666void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
667 struct delayed_work *dwork)
668{
669 while (!cancel_delayed_work(dwork))
670 flush_workqueue(wq);
671}
672EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
673
674/**
675 * cancel_rearming_delayed_work - reliably kill off a delayed keventd work whose handler rearms the delayed work.
676 * @dwork: the delayed work struct
677 */
678void cancel_rearming_delayed_work(struct delayed_work *dwork)
679{
680 cancel_rearming_delayed_workqueue(keventd_wq, dwork);
681}
682EXPORT_SYMBOL(cancel_rearming_delayed_work);
683
684/**
685 * execute_in_process_context - reliably execute the routine with user context 599 * execute_in_process_context - reliably execute the routine with user context
686 * @fn: the function to execute 600 * @fn: the function to execute
687 * @ew: guaranteed storage for the execute work structure (must 601 * @ew: guaranteed storage for the execute work structure (must
@@ -728,94 +642,209 @@ int current_is_keventd(void)
728 642
729} 643}
730 644
731/* Take the work from this (downed) CPU. */ 645static struct cpu_workqueue_struct *
732static void take_over_work(struct workqueue_struct *wq, unsigned int cpu) 646init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
733{ 647{
734 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 648 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
735 struct list_head list;
736 struct work_struct *work;
737 649
738 spin_lock_irq(&cwq->lock); 650 cwq->wq = wq;
739 list_replace_init(&cwq->worklist, &list); 651 spin_lock_init(&cwq->lock);
652 INIT_LIST_HEAD(&cwq->worklist);
653 init_waitqueue_head(&cwq->more_work);
654
655 return cwq;
656}
657
658static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
659{
660 struct workqueue_struct *wq = cwq->wq;
661 const char *fmt = is_single_threaded(wq) ? "%s" : "%s/%d";
662 struct task_struct *p;
663
664 p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
665 /*
666 * Nobody can add the work_struct to this cwq,
667 * if (caller is __create_workqueue)
668 * nobody should see this wq
669 * else // caller is CPU_UP_PREPARE
670 * cpu is not on cpu_online_map
671 * so we can abort safely.
672 */
673 if (IS_ERR(p))
674 return PTR_ERR(p);
675
676 cwq->thread = p;
677 cwq->should_stop = 0;
678
679 return 0;
680}
681
682static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
683{
684 struct task_struct *p = cwq->thread;
740 685
741 while (!list_empty(&list)) { 686 if (p != NULL) {
742 printk("Taking work for %s\n", wq->name); 687 if (cpu >= 0)
743 work = list_entry(list.next,struct work_struct,entry); 688 kthread_bind(p, cpu);
744 list_del(&work->entry); 689 wake_up_process(p);
745 __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work);
746 } 690 }
747 spin_unlock_irq(&cwq->lock);
748} 691}
749 692
750/* We're holding the cpucontrol mutex here */ 693struct workqueue_struct *__create_workqueue(const char *name,
751static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, 694 int singlethread, int freezeable)
752 unsigned long action,
753 void *hcpu)
754{ 695{
755 unsigned int hotcpu = (unsigned long)hcpu;
756 struct workqueue_struct *wq; 696 struct workqueue_struct *wq;
697 struct cpu_workqueue_struct *cwq;
698 int err = 0, cpu;
757 699
758 switch (action) { 700 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
759 case CPU_UP_PREPARE: 701 if (!wq)
760 mutex_lock(&workqueue_mutex); 702 return NULL;
761 /* Create a new workqueue thread for it. */
762 list_for_each_entry(wq, &workqueues, list) {
763 if (!create_workqueue_thread(wq, hotcpu, 0)) {
764 printk("workqueue for %i failed\n", hotcpu);
765 return NOTIFY_BAD;
766 }
767 }
768 break;
769 703
770 case CPU_ONLINE: 704 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
771 /* Kick off worker threads. */ 705 if (!wq->cpu_wq) {
772 list_for_each_entry(wq, &workqueues, list) { 706 kfree(wq);
773 struct cpu_workqueue_struct *cwq; 707 return NULL;
708 }
774 709
775 cwq = per_cpu_ptr(wq->cpu_wq, hotcpu); 710 wq->name = name;
776 kthread_bind(cwq->thread, hotcpu); 711 wq->singlethread = singlethread;
777 wake_up_process(cwq->thread); 712 wq->freezeable = freezeable;
778 } 713 INIT_LIST_HEAD(&wq->list);
779 mutex_unlock(&workqueue_mutex);
780 break;
781 714
782 case CPU_UP_CANCELED: 715 if (singlethread) {
783 list_for_each_entry(wq, &workqueues, list) { 716 cwq = init_cpu_workqueue(wq, singlethread_cpu);
784 if (!per_cpu_ptr(wq->cpu_wq, hotcpu)->thread) 717 err = create_workqueue_thread(cwq, singlethread_cpu);
718 start_workqueue_thread(cwq, -1);
719 } else {
720 mutex_lock(&workqueue_mutex);
721 list_add(&wq->list, &workqueues);
722
723 for_each_possible_cpu(cpu) {
724 cwq = init_cpu_workqueue(wq, cpu);
725 if (err || !cpu_online(cpu))
785 continue; 726 continue;
786 /* Unbind so it can run. */ 727 err = create_workqueue_thread(cwq, cpu);
787 kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread, 728 start_workqueue_thread(cwq, cpu);
788 any_online_cpu(cpu_online_map));
789 cleanup_workqueue_thread(wq, hotcpu);
790 } 729 }
791 mutex_unlock(&workqueue_mutex); 730 mutex_unlock(&workqueue_mutex);
792 break; 731 }
732
733 if (err) {
734 destroy_workqueue(wq);
735 wq = NULL;
736 }
737 return wq;
738}
739EXPORT_SYMBOL_GPL(__create_workqueue);
740
741static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
742{
743 struct wq_barrier barr;
744 int alive = 0;
745
746 spin_lock_irq(&cwq->lock);
747 if (cwq->thread != NULL) {
748 insert_wq_barrier(cwq, &barr, 1);
749 cwq->should_stop = 1;
750 alive = 1;
751 }
752 spin_unlock_irq(&cwq->lock);
753
754 if (alive) {
755 wait_for_completion(&barr.done);
793 756
794 case CPU_DOWN_PREPARE: 757 while (unlikely(cwq->thread != NULL))
758 cpu_relax();
759 /*
760 * Wait until cwq->thread unlocks cwq->lock,
761 * it won't touch *cwq after that.
762 */
763 smp_rmb();
764 spin_unlock_wait(&cwq->lock);
765 }
766}
767
768/**
769 * destroy_workqueue - safely terminate a workqueue
770 * @wq: target workqueue
771 *
772 * Safely destroy a workqueue. All work currently pending will be done first.
773 */
774void destroy_workqueue(struct workqueue_struct *wq)
775{
776 const cpumask_t *cpu_map = wq_cpu_map(wq);
777 struct cpu_workqueue_struct *cwq;
778 int cpu;
779
780 mutex_lock(&workqueue_mutex);
781 list_del(&wq->list);
782 mutex_unlock(&workqueue_mutex);
783
784 for_each_cpu_mask(cpu, *cpu_map) {
785 cwq = per_cpu_ptr(wq->cpu_wq, cpu);
786 cleanup_workqueue_thread(cwq, cpu);
787 }
788
789 free_percpu(wq->cpu_wq);
790 kfree(wq);
791}
792EXPORT_SYMBOL_GPL(destroy_workqueue);
793
794static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
795 unsigned long action,
796 void *hcpu)
797{
798 unsigned int cpu = (unsigned long)hcpu;
799 struct cpu_workqueue_struct *cwq;
800 struct workqueue_struct *wq;
801
802 action &= ~CPU_TASKS_FROZEN;
803
804 switch (action) {
805 case CPU_LOCK_ACQUIRE:
795 mutex_lock(&workqueue_mutex); 806 mutex_lock(&workqueue_mutex);
796 break; 807 return NOTIFY_OK;
797 808
798 case CPU_DOWN_FAILED: 809 case CPU_LOCK_RELEASE:
799 mutex_unlock(&workqueue_mutex); 810 mutex_unlock(&workqueue_mutex);
800 break; 811 return NOTIFY_OK;
801 812
802 case CPU_DEAD: 813 case CPU_UP_PREPARE:
803 list_for_each_entry(wq, &workqueues, list) 814 cpu_set(cpu, cpu_populated_map);
804 cleanup_workqueue_thread(wq, hotcpu); 815 }
805 list_for_each_entry(wq, &workqueues, list) 816
806 take_over_work(wq, hotcpu); 817 list_for_each_entry(wq, &workqueues, list) {
807 mutex_unlock(&workqueue_mutex); 818 cwq = per_cpu_ptr(wq->cpu_wq, cpu);
808 break; 819
820 switch (action) {
821 case CPU_UP_PREPARE:
822 if (!create_workqueue_thread(cwq, cpu))
823 break;
824 printk(KERN_ERR "workqueue for %i failed\n", cpu);
825 return NOTIFY_BAD;
826
827 case CPU_ONLINE:
828 start_workqueue_thread(cwq, cpu);
829 break;
830
831 case CPU_UP_CANCELED:
832 start_workqueue_thread(cwq, -1);
833 case CPU_DEAD:
834 cleanup_workqueue_thread(cwq, cpu);
835 break;
836 }
809 } 837 }
810 838
811 return NOTIFY_OK; 839 return NOTIFY_OK;
812} 840}
813 841
814void init_workqueues(void) 842void __init init_workqueues(void)
815{ 843{
844 cpu_populated_map = cpu_online_map;
816 singlethread_cpu = first_cpu(cpu_possible_map); 845 singlethread_cpu = first_cpu(cpu_possible_map);
846 cpu_singlethread_map = cpumask_of_cpu(singlethread_cpu);
817 hotcpu_notifier(workqueue_cpu_callback, 0); 847 hotcpu_notifier(workqueue_cpu_callback, 0);
818 keventd_wq = create_workqueue("events"); 848 keventd_wq = create_workqueue("events");
819 BUG_ON(!keventd_wq); 849 BUG_ON(!keventd_wq);
820} 850}
821