aboutsummaryrefslogtreecommitdiffstats
path: root/kernel/workqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/workqueue.c')
-rw-r--r--kernel/workqueue.c135
1 files changed, 100 insertions, 35 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index f57855f718d7..cfb8aa567e17 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -114,8 +114,7 @@ struct global_cwq {
114} ____cacheline_aligned_in_smp; 114} ____cacheline_aligned_in_smp;
115 115
116/* 116/*
117 * The per-CPU workqueue (if single thread, we always use the first 117 * The per-CPU workqueue. The lower WORK_STRUCT_FLAG_BITS of
118 * possible cpu). The lower WORK_STRUCT_FLAG_BITS of
119 * work_struct->data are used for flags and thus cwqs need to be 118 * work_struct->data are used for flags and thus cwqs need to be
120 * aligned at two's power of the number of flag bits. 119 * aligned at two's power of the number of flag bits.
121 */ 120 */
@@ -159,6 +158,8 @@ struct workqueue_struct {
159 struct list_head flusher_queue; /* F: flush waiters */ 158 struct list_head flusher_queue; /* F: flush waiters */
160 struct list_head flusher_overflow; /* F: flush overflow list */ 159 struct list_head flusher_overflow; /* F: flush overflow list */
161 160
161 unsigned long single_cpu; /* cpu for single cpu wq */
162
162 int saved_max_active; /* I: saved cwq max_active */ 163 int saved_max_active; /* I: saved cwq max_active */
163 const char *name; /* I: workqueue name */ 164 const char *name; /* I: workqueue name */
164#ifdef CONFIG_LOCKDEP 165#ifdef CONFIG_LOCKDEP
@@ -289,8 +290,6 @@ static DEFINE_PER_CPU(struct global_cwq, global_cwq);
289 290
290static int worker_thread(void *__worker); 291static int worker_thread(void *__worker);
291 292
292static int singlethread_cpu __read_mostly;
293
294static struct global_cwq *get_gcwq(unsigned int cpu) 293static struct global_cwq *get_gcwq(unsigned int cpu)
295{ 294{
296 return &per_cpu(global_cwq, cpu); 295 return &per_cpu(global_cwq, cpu);
@@ -302,14 +301,6 @@ static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
302 return per_cpu_ptr(wq->cpu_wq, cpu); 301 return per_cpu_ptr(wq->cpu_wq, cpu);
303} 302}
304 303
305static struct cpu_workqueue_struct *target_cwq(unsigned int cpu,
306 struct workqueue_struct *wq)
307{
308 if (unlikely(wq->flags & WQ_SINGLE_THREAD))
309 cpu = singlethread_cpu;
310 return get_cwq(cpu, wq);
311}
312
313static unsigned int work_color_to_flags(int color) 304static unsigned int work_color_to_flags(int color)
314{ 305{
315 return color << WORK_STRUCT_COLOR_SHIFT; 306 return color << WORK_STRUCT_COLOR_SHIFT;
@@ -410,17 +401,87 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
410 wake_up_process(cwq->worker->task); 401 wake_up_process(cwq->worker->task);
411} 402}
412 403
404/**
405 * cwq_unbind_single_cpu - unbind cwq from single cpu workqueue processing
406 * @cwq: cwq to unbind
407 *
408 * Try to unbind @cwq from single cpu workqueue processing. If
409 * @cwq->wq is frozen, unbind is delayed till the workqueue is thawed.
410 *
411 * CONTEXT:
412 * spin_lock_irq(gcwq->lock).
413 */
414static void cwq_unbind_single_cpu(struct cpu_workqueue_struct *cwq)
415{
416 struct workqueue_struct *wq = cwq->wq;
417 struct global_cwq *gcwq = cwq->gcwq;
418
419 BUG_ON(wq->single_cpu != gcwq->cpu);
420 /*
421 * Unbind from workqueue if @cwq is not frozen. If frozen,
422 * thaw_workqueues() will either restart processing on this
423 * cpu or unbind if empty. This keeps works queued while
424 * frozen fully ordered and flushable.
425 */
426 if (likely(!(gcwq->flags & GCWQ_FREEZING))) {
427 smp_wmb(); /* paired with cmpxchg() in __queue_work() */
428 wq->single_cpu = NR_CPUS;
429 }
430}
431
413static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, 432static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
414 struct work_struct *work) 433 struct work_struct *work)
415{ 434{
416 struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq); 435 struct global_cwq *gcwq;
417 struct global_cwq *gcwq = cwq->gcwq; 436 struct cpu_workqueue_struct *cwq;
418 struct list_head *worklist; 437 struct list_head *worklist;
419 unsigned long flags; 438 unsigned long flags;
439 bool arbitrate;
420 440
421 debug_work_activate(work); 441 debug_work_activate(work);
422 442
423 spin_lock_irqsave(&gcwq->lock, flags); 443 /* determine gcwq to use */
444 if (!(wq->flags & WQ_SINGLE_CPU)) {
445 /* just use the requested cpu for multicpu workqueues */
446 gcwq = get_gcwq(cpu);
447 spin_lock_irqsave(&gcwq->lock, flags);
448 } else {
449 unsigned int req_cpu = cpu;
450
451 /*
452 * It's a bit more complex for single cpu workqueues.
453 * We first need to determine which cpu is going to be
454 * used. If no cpu is currently serving this
455 * workqueue, arbitrate using atomic accesses to
456 * wq->single_cpu; otherwise, use the current one.
457 */
458 retry:
459 cpu = wq->single_cpu;
460 arbitrate = cpu == NR_CPUS;
461 if (arbitrate)
462 cpu = req_cpu;
463
464 gcwq = get_gcwq(cpu);
465 spin_lock_irqsave(&gcwq->lock, flags);
466
467 /*
468 * The following cmpxchg() is a full barrier paired
469 * with smp_wmb() in cwq_unbind_single_cpu() and
470 * guarantees that all changes to wq->st_* fields are
471 * visible on the new cpu after this point.
472 */
473 if (arbitrate)
474 cmpxchg(&wq->single_cpu, NR_CPUS, cpu);
475
476 if (unlikely(wq->single_cpu != cpu)) {
477 spin_unlock_irqrestore(&gcwq->lock, flags);
478 goto retry;
479 }
480 }
481
482 /* gcwq determined, get cwq and queue */
483 cwq = get_cwq(gcwq->cpu, wq);
484
424 BUG_ON(!list_empty(&work->entry)); 485 BUG_ON(!list_empty(&work->entry));
425 486
426 cwq->nr_in_flight[cwq->work_color]++; 487 cwq->nr_in_flight[cwq->work_color]++;
@@ -530,7 +591,7 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
530 timer_stats_timer_set_start_info(&dwork->timer); 591 timer_stats_timer_set_start_info(&dwork->timer);
531 592
532 /* This stores cwq for the moment, for the timer_fn */ 593 /* This stores cwq for the moment, for the timer_fn */
533 set_wq_data(work, target_cwq(raw_smp_processor_id(), wq), 0); 594 set_wq_data(work, get_cwq(raw_smp_processor_id(), wq), 0);
534 timer->expires = jiffies + delay; 595 timer->expires = jiffies + delay;
535 timer->data = (unsigned long)dwork; 596 timer->data = (unsigned long)dwork;
536 timer->function = delayed_work_timer_fn; 597 timer->function = delayed_work_timer_fn;
@@ -790,10 +851,14 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
790 cwq->nr_in_flight[color]--; 851 cwq->nr_in_flight[color]--;
791 cwq->nr_active--; 852 cwq->nr_active--;
792 853
793 /* one down, submit a delayed one */ 854 if (!list_empty(&cwq->delayed_works)) {
794 if (!list_empty(&cwq->delayed_works) && 855 /* one down, submit a delayed one */
795 cwq->nr_active < cwq->max_active) 856 if (cwq->nr_active < cwq->max_active)
796 cwq_activate_first_delayed(cwq); 857 cwq_activate_first_delayed(cwq);
858 } else if (!cwq->nr_active && cwq->wq->flags & WQ_SINGLE_CPU) {
859 /* this was the last work, unbind from single cpu */
860 cwq_unbind_single_cpu(cwq);
861 }
797 862
798 /* is flush in progress and are we at the flushing tip? */ 863 /* is flush in progress and are we at the flushing tip? */
799 if (likely(cwq->flush_color != color)) 864 if (likely(cwq->flush_color != color))
@@ -1727,7 +1792,6 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1727 struct lock_class_key *key, 1792 struct lock_class_key *key,
1728 const char *lock_name) 1793 const char *lock_name)
1729{ 1794{
1730 bool singlethread = flags & WQ_SINGLE_THREAD;
1731 struct workqueue_struct *wq; 1795 struct workqueue_struct *wq;
1732 bool failed = false; 1796 bool failed = false;
1733 unsigned int cpu; 1797 unsigned int cpu;
@@ -1748,6 +1812,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1748 atomic_set(&wq->nr_cwqs_to_flush, 0); 1812 atomic_set(&wq->nr_cwqs_to_flush, 0);
1749 INIT_LIST_HEAD(&wq->flusher_queue); 1813 INIT_LIST_HEAD(&wq->flusher_queue);
1750 INIT_LIST_HEAD(&wq->flusher_overflow); 1814 INIT_LIST_HEAD(&wq->flusher_overflow);
1815 wq->single_cpu = NR_CPUS;
1816
1751 wq->name = name; 1817 wq->name = name;
1752 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); 1818 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
1753 INIT_LIST_HEAD(&wq->list); 1819 INIT_LIST_HEAD(&wq->list);
@@ -1773,8 +1839,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1773 1839
1774 if (failed) 1840 if (failed)
1775 continue; 1841 continue;
1776 cwq->worker = create_worker(cwq, 1842 cwq->worker = create_worker(cwq, cpu_online(cpu));
1777 cpu_online(cpu) && !singlethread);
1778 if (cwq->worker) 1843 if (cwq->worker)
1779 start_worker(cwq->worker); 1844 start_worker(cwq->worker);
1780 else 1845 else
@@ -1958,18 +2023,16 @@ static int __cpuinit trustee_thread(void *__gcwq)
1958 2023
1959 spin_lock_irq(&gcwq->lock); 2024 spin_lock_irq(&gcwq->lock);
1960 /* 2025 /*
1961 * Make all multithread workers rogue. Trustee must be bound 2026 * Make all workers rogue. Trustee must be bound to the
1962 * to the target cpu and can't be cancelled. 2027 * target cpu and can't be cancelled.
1963 */ 2028 */
1964 BUG_ON(gcwq->cpu != smp_processor_id()); 2029 BUG_ON(gcwq->cpu != smp_processor_id());
1965 2030
1966 list_for_each_entry(worker, &gcwq->idle_list, entry) 2031 list_for_each_entry(worker, &gcwq->idle_list, entry)
1967 if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD)) 2032 worker->flags |= WORKER_ROGUE;
1968 worker->flags |= WORKER_ROGUE;
1969 2033
1970 for_each_busy_worker(worker, i, pos, gcwq) 2034 for_each_busy_worker(worker, i, pos, gcwq)
1971 if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD)) 2035 worker->flags |= WORKER_ROGUE;
1972 worker->flags |= WORKER_ROGUE;
1973 2036
1974 /* 2037 /*
1975 * We're now in charge. Notify and proceed to drain. We need 2038 * We're now in charge. Notify and proceed to drain. We need
@@ -2074,14 +2137,12 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
2074 wait_trustee_state(gcwq, TRUSTEE_DONE); 2137 wait_trustee_state(gcwq, TRUSTEE_DONE);
2075 } 2138 }
2076 2139
2077 /* clear ROGUE from all multithread workers */ 2140 /* clear ROGUE from all workers */
2078 list_for_each_entry(worker, &gcwq->idle_list, entry) 2141 list_for_each_entry(worker, &gcwq->idle_list, entry)
2079 if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD)) 2142 worker->flags &= ~WORKER_ROGUE;
2080 worker->flags &= ~WORKER_ROGUE;
2081 2143
2082 for_each_busy_worker(worker, i, pos, gcwq) 2144 for_each_busy_worker(worker, i, pos, gcwq)
2083 if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD)) 2145 worker->flags &= ~WORKER_ROGUE;
2084 worker->flags &= ~WORKER_ROGUE;
2085 break; 2146 break;
2086 } 2147 }
2087 2148
@@ -2266,6 +2327,11 @@ void thaw_workqueues(void)
2266 cwq->nr_active < cwq->max_active) 2327 cwq->nr_active < cwq->max_active)
2267 cwq_activate_first_delayed(cwq); 2328 cwq_activate_first_delayed(cwq);
2268 2329
2330 /* perform delayed unbind from single cpu if empty */
2331 if (wq->single_cpu == gcwq->cpu &&
2332 !cwq->nr_active && list_empty(&cwq->delayed_works))
2333 cwq_unbind_single_cpu(cwq);
2334
2269 wake_up_process(cwq->worker->task); 2335 wake_up_process(cwq->worker->task);
2270 } 2336 }
2271 2337
@@ -2283,7 +2349,6 @@ void __init init_workqueues(void)
2283 unsigned int cpu; 2349 unsigned int cpu;
2284 int i; 2350 int i;
2285 2351
2286 singlethread_cpu = cpumask_first(cpu_possible_mask);
2287 hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE); 2352 hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
2288 2353
2289 /* initialize gcwqs */ 2354 /* initialize gcwqs */