aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTejun Heo <tj@kernel.org>2010-06-29 04:07:13 -0400
committerTejun Heo <tj@kernel.org>2010-06-29 04:07:13 -0400
commit502ca9d819792e7d79b6e002afe9094c641fe410 (patch)
tree5f06a8845643f1007ce9807636cde4057f8761a9
parentdb7bccf45cb87522096b8f43144e31ca605a9f24 (diff)
workqueue: make single thread workqueue shared worker pool friendly
Reimplement st (single thread) workqueue so that it's friendly to shared worker pool. It was originally implemented by confining st workqueues to use cwq of a fixed cpu and always having a worker for the cpu. This implementation isn't very friendly to shared worker pool and suboptimal in that it ends up crossing cpu boundaries often. Reimplement st workqueue using dynamic single cpu binding and cwq->limit. WQ_SINGLE_THREAD is replaced with WQ_SINGLE_CPU. In a single cpu workqueue, at most single cwq is bound to the wq at any given time. Arbitration is done using atomic accesses to wq->single_cpu when queueing a work. Once bound, the binding stays till the workqueue is drained. Note that the binding is never broken while a workqueue is frozen. This is because idle cwqs may have works waiting in delayed_works queue while frozen. On thaw, the cwq is restarted if there are any delayed works or unbound otherwise. When combined with max_active limit of 1, single cpu workqueue has exactly the same execution properties as the original single thread workqueue while allowing sharing of per-cpu workers. Signed-off-by: Tejun Heo <tj@kernel.org>
-rw-r--r--include/linux/workqueue.h6
-rw-r--r--kernel/workqueue.c135
2 files changed, 103 insertions, 38 deletions
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index ab0b7fb99bc2..10611f7fc809 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -221,7 +221,7 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; }
221 221
222enum { 222enum {
223 WQ_FREEZEABLE = 1 << 0, /* freeze during suspend */ 223 WQ_FREEZEABLE = 1 << 0, /* freeze during suspend */
224 WQ_SINGLE_THREAD = 1 << 1, /* no per-cpu worker */ 224 WQ_SINGLE_CPU = 1 << 1, /* only single cpu at a time */
225}; 225};
226 226
227extern struct workqueue_struct * 227extern struct workqueue_struct *
@@ -250,9 +250,9 @@ __create_workqueue_key(const char *name, unsigned int flags, int max_active,
250#define create_workqueue(name) \ 250#define create_workqueue(name) \
251 __create_workqueue((name), 0, 1) 251 __create_workqueue((name), 0, 1)
252#define create_freezeable_workqueue(name) \ 252#define create_freezeable_workqueue(name) \
253 __create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_THREAD, 1) 253 __create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_CPU, 1)
254#define create_singlethread_workqueue(name) \ 254#define create_singlethread_workqueue(name) \
255 __create_workqueue((name), WQ_SINGLE_THREAD, 1) 255 __create_workqueue((name), WQ_SINGLE_CPU, 1)
256 256
257extern void destroy_workqueue(struct workqueue_struct *wq); 257extern void destroy_workqueue(struct workqueue_struct *wq);
258 258
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index f57855f718d7..cfb8aa567e17 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -114,8 +114,7 @@ struct global_cwq {
114} ____cacheline_aligned_in_smp; 114} ____cacheline_aligned_in_smp;
115 115
116/* 116/*
117 * The per-CPU workqueue (if single thread, we always use the first 117 * The per-CPU workqueue. The lower WORK_STRUCT_FLAG_BITS of
118 * possible cpu). The lower WORK_STRUCT_FLAG_BITS of
119 * work_struct->data are used for flags and thus cwqs need to be 118 * work_struct->data are used for flags and thus cwqs need to be
120 * aligned at two's power of the number of flag bits. 119 * aligned at two's power of the number of flag bits.
121 */ 120 */
@@ -159,6 +158,8 @@ struct workqueue_struct {
159 struct list_head flusher_queue; /* F: flush waiters */ 158 struct list_head flusher_queue; /* F: flush waiters */
160 struct list_head flusher_overflow; /* F: flush overflow list */ 159 struct list_head flusher_overflow; /* F: flush overflow list */
161 160
161 unsigned long single_cpu; /* cpu for single cpu wq */
162
162 int saved_max_active; /* I: saved cwq max_active */ 163 int saved_max_active; /* I: saved cwq max_active */
163 const char *name; /* I: workqueue name */ 164 const char *name; /* I: workqueue name */
164#ifdef CONFIG_LOCKDEP 165#ifdef CONFIG_LOCKDEP
@@ -289,8 +290,6 @@ static DEFINE_PER_CPU(struct global_cwq, global_cwq);
289 290
290static int worker_thread(void *__worker); 291static int worker_thread(void *__worker);
291 292
292static int singlethread_cpu __read_mostly;
293
294static struct global_cwq *get_gcwq(unsigned int cpu) 293static struct global_cwq *get_gcwq(unsigned int cpu)
295{ 294{
296 return &per_cpu(global_cwq, cpu); 295 return &per_cpu(global_cwq, cpu);
@@ -302,14 +301,6 @@ static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
302 return per_cpu_ptr(wq->cpu_wq, cpu); 301 return per_cpu_ptr(wq->cpu_wq, cpu);
303} 302}
304 303
305static struct cpu_workqueue_struct *target_cwq(unsigned int cpu,
306 struct workqueue_struct *wq)
307{
308 if (unlikely(wq->flags & WQ_SINGLE_THREAD))
309 cpu = singlethread_cpu;
310 return get_cwq(cpu, wq);
311}
312
313static unsigned int work_color_to_flags(int color) 304static unsigned int work_color_to_flags(int color)
314{ 305{
315 return color << WORK_STRUCT_COLOR_SHIFT; 306 return color << WORK_STRUCT_COLOR_SHIFT;
@@ -410,17 +401,87 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
410 wake_up_process(cwq->worker->task); 401 wake_up_process(cwq->worker->task);
411} 402}
412 403
404/**
405 * cwq_unbind_single_cpu - unbind cwq from single cpu workqueue processing
406 * @cwq: cwq to unbind
407 *
408 * Try to unbind @cwq from single cpu workqueue processing. If
409 * @cwq->wq is frozen, unbind is delayed till the workqueue is thawed.
410 *
411 * CONTEXT:
412 * spin_lock_irq(gcwq->lock).
413 */
414static void cwq_unbind_single_cpu(struct cpu_workqueue_struct *cwq)
415{
416 struct workqueue_struct *wq = cwq->wq;
417 struct global_cwq *gcwq = cwq->gcwq;
418
419 BUG_ON(wq->single_cpu != gcwq->cpu);
420 /*
421 * Unbind from workqueue if @cwq is not frozen. If frozen,
422 * thaw_workqueues() will either restart processing on this
423 * cpu or unbind if empty. This keeps works queued while
424 * frozen fully ordered and flushable.
425 */
426 if (likely(!(gcwq->flags & GCWQ_FREEZING))) {
427 smp_wmb(); /* paired with cmpxchg() in __queue_work() */
428 wq->single_cpu = NR_CPUS;
429 }
430}
431
413static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, 432static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
414 struct work_struct *work) 433 struct work_struct *work)
415{ 434{
416 struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq); 435 struct global_cwq *gcwq;
417 struct global_cwq *gcwq = cwq->gcwq; 436 struct cpu_workqueue_struct *cwq;
418 struct list_head *worklist; 437 struct list_head *worklist;
419 unsigned long flags; 438 unsigned long flags;
439 bool arbitrate;
420 440
421 debug_work_activate(work); 441 debug_work_activate(work);
422 442
423 spin_lock_irqsave(&gcwq->lock, flags); 443 /* determine gcwq to use */
444 if (!(wq->flags & WQ_SINGLE_CPU)) {
445 /* just use the requested cpu for multicpu workqueues */
446 gcwq = get_gcwq(cpu);
447 spin_lock_irqsave(&gcwq->lock, flags);
448 } else {
449 unsigned int req_cpu = cpu;
450
451 /*
452 * It's a bit more complex for single cpu workqueues.
453 * We first need to determine which cpu is going to be
454 * used. If no cpu is currently serving this
455 * workqueue, arbitrate using atomic accesses to
456 * wq->single_cpu; otherwise, use the current one.
457 */
458 retry:
459 cpu = wq->single_cpu;
460 arbitrate = cpu == NR_CPUS;
461 if (arbitrate)
462 cpu = req_cpu;
463
464 gcwq = get_gcwq(cpu);
465 spin_lock_irqsave(&gcwq->lock, flags);
466
467 /*
468 * The following cmpxchg() is a full barrier paired
469 * with smp_wmb() in cwq_unbind_single_cpu() and
470 * guarantees that all changes to wq->st_* fields are
471 * visible on the new cpu after this point.
472 */
473 if (arbitrate)
474 cmpxchg(&wq->single_cpu, NR_CPUS, cpu);
475
476 if (unlikely(wq->single_cpu != cpu)) {
477 spin_unlock_irqrestore(&gcwq->lock, flags);
478 goto retry;
479 }
480 }
481
482 /* gcwq determined, get cwq and queue */
483 cwq = get_cwq(gcwq->cpu, wq);
484
424 BUG_ON(!list_empty(&work->entry)); 485 BUG_ON(!list_empty(&work->entry));
425 486
426 cwq->nr_in_flight[cwq->work_color]++; 487 cwq->nr_in_flight[cwq->work_color]++;
@@ -530,7 +591,7 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
530 timer_stats_timer_set_start_info(&dwork->timer); 591 timer_stats_timer_set_start_info(&dwork->timer);
531 592
532 /* This stores cwq for the moment, for the timer_fn */ 593 /* This stores cwq for the moment, for the timer_fn */
533 set_wq_data(work, target_cwq(raw_smp_processor_id(), wq), 0); 594 set_wq_data(work, get_cwq(raw_smp_processor_id(), wq), 0);
534 timer->expires = jiffies + delay; 595 timer->expires = jiffies + delay;
535 timer->data = (unsigned long)dwork; 596 timer->data = (unsigned long)dwork;
536 timer->function = delayed_work_timer_fn; 597 timer->function = delayed_work_timer_fn;
@@ -790,10 +851,14 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
790 cwq->nr_in_flight[color]--; 851 cwq->nr_in_flight[color]--;
791 cwq->nr_active--; 852 cwq->nr_active--;
792 853
793 /* one down, submit a delayed one */ 854 if (!list_empty(&cwq->delayed_works)) {
794 if (!list_empty(&cwq->delayed_works) && 855 /* one down, submit a delayed one */
795 cwq->nr_active < cwq->max_active) 856 if (cwq->nr_active < cwq->max_active)
796 cwq_activate_first_delayed(cwq); 857 cwq_activate_first_delayed(cwq);
858 } else if (!cwq->nr_active && cwq->wq->flags & WQ_SINGLE_CPU) {
859 /* this was the last work, unbind from single cpu */
860 cwq_unbind_single_cpu(cwq);
861 }
797 862
798 /* is flush in progress and are we at the flushing tip? */ 863 /* is flush in progress and are we at the flushing tip? */
799 if (likely(cwq->flush_color != color)) 864 if (likely(cwq->flush_color != color))
@@ -1727,7 +1792,6 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1727 struct lock_class_key *key, 1792 struct lock_class_key *key,
1728 const char *lock_name) 1793 const char *lock_name)
1729{ 1794{
1730 bool singlethread = flags & WQ_SINGLE_THREAD;
1731 struct workqueue_struct *wq; 1795 struct workqueue_struct *wq;
1732 bool failed = false; 1796 bool failed = false;
1733 unsigned int cpu; 1797 unsigned int cpu;
@@ -1748,6 +1812,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1748 atomic_set(&wq->nr_cwqs_to_flush, 0); 1812 atomic_set(&wq->nr_cwqs_to_flush, 0);
1749 INIT_LIST_HEAD(&wq->flusher_queue); 1813 INIT_LIST_HEAD(&wq->flusher_queue);
1750 INIT_LIST_HEAD(&wq->flusher_overflow); 1814 INIT_LIST_HEAD(&wq->flusher_overflow);
1815 wq->single_cpu = NR_CPUS;
1816
1751 wq->name = name; 1817 wq->name = name;
1752 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); 1818 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
1753 INIT_LIST_HEAD(&wq->list); 1819 INIT_LIST_HEAD(&wq->list);
@@ -1773,8 +1839,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1773 1839
1774 if (failed) 1840 if (failed)
1775 continue; 1841 continue;
1776 cwq->worker = create_worker(cwq, 1842 cwq->worker = create_worker(cwq, cpu_online(cpu));
1777 cpu_online(cpu) && !singlethread);
1778 if (cwq->worker) 1843 if (cwq->worker)
1779 start_worker(cwq->worker); 1844 start_worker(cwq->worker);
1780 else 1845 else
@@ -1958,18 +2023,16 @@ static int __cpuinit trustee_thread(void *__gcwq)
1958 2023
1959 spin_lock_irq(&gcwq->lock); 2024 spin_lock_irq(&gcwq->lock);
1960 /* 2025 /*
1961 * Make all multithread workers rogue. Trustee must be bound 2026 * Make all workers rogue. Trustee must be bound to the
1962 * to the target cpu and can't be cancelled. 2027 * target cpu and can't be cancelled.
1963 */ 2028 */
1964 BUG_ON(gcwq->cpu != smp_processor_id()); 2029 BUG_ON(gcwq->cpu != smp_processor_id());
1965 2030
1966 list_for_each_entry(worker, &gcwq->idle_list, entry) 2031 list_for_each_entry(worker, &gcwq->idle_list, entry)
1967 if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD)) 2032 worker->flags |= WORKER_ROGUE;
1968 worker->flags |= WORKER_ROGUE;
1969 2033
1970 for_each_busy_worker(worker, i, pos, gcwq) 2034 for_each_busy_worker(worker, i, pos, gcwq)
1971 if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD)) 2035 worker->flags |= WORKER_ROGUE;
1972 worker->flags |= WORKER_ROGUE;
1973 2036
1974 /* 2037 /*
1975 * We're now in charge. Notify and proceed to drain. We need 2038 * We're now in charge. Notify and proceed to drain. We need
@@ -2074,14 +2137,12 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
2074 wait_trustee_state(gcwq, TRUSTEE_DONE); 2137 wait_trustee_state(gcwq, TRUSTEE_DONE);
2075 } 2138 }
2076 2139
2077 /* clear ROGUE from all multithread workers */ 2140 /* clear ROGUE from all workers */
2078 list_for_each_entry(worker, &gcwq->idle_list, entry) 2141 list_for_each_entry(worker, &gcwq->idle_list, entry)
2079 if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD)) 2142 worker->flags &= ~WORKER_ROGUE;
2080 worker->flags &= ~WORKER_ROGUE;
2081 2143
2082 for_each_busy_worker(worker, i, pos, gcwq) 2144 for_each_busy_worker(worker, i, pos, gcwq)
2083 if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD)) 2145 worker->flags &= ~WORKER_ROGUE;
2084 worker->flags &= ~WORKER_ROGUE;
2085 break; 2146 break;
2086 } 2147 }
2087 2148
@@ -2266,6 +2327,11 @@ void thaw_workqueues(void)
2266 cwq->nr_active < cwq->max_active) 2327 cwq->nr_active < cwq->max_active)
2267 cwq_activate_first_delayed(cwq); 2328 cwq_activate_first_delayed(cwq);
2268 2329
2330 /* perform delayed unbind from single cpu if empty */
2331 if (wq->single_cpu == gcwq->cpu &&
2332 !cwq->nr_active && list_empty(&cwq->delayed_works))
2333 cwq_unbind_single_cpu(cwq);
2334
2269 wake_up_process(cwq->worker->task); 2335 wake_up_process(cwq->worker->task);
2270 } 2336 }
2271 2337
@@ -2283,7 +2349,6 @@ void __init init_workqueues(void)
2283 unsigned int cpu; 2349 unsigned int cpu;
2284 int i; 2350 int i;
2285 2351
2286 singlethread_cpu = cpumask_first(cpu_possible_mask);
2287 hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE); 2352 hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
2288 2353
2289 /* initialize gcwqs */ 2354 /* initialize gcwqs */