aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTejun Heo <tj@kernel.org>2010-06-29 04:07:13 -0400
committerTejun Heo <tj@kernel.org>2010-06-29 04:07:13 -0400
commit7e11629d0efec829cbf62366143ba1081944a70e (patch)
tree946f6383f420fafc89550ad2f7efbdfee441ccbb
parent18aa9effad4adb2c1efe123af4eb24fec9f59b30 (diff)
workqueue: use shared worklist and pool all workers per cpu
Use gcwq->worklist instead of cwq->worklist and break the strict association between a cwq and its worker. All works queued on a cpu are queued on gcwq->worklist and processed by any available worker on the gcwq. As there no longer is strict association between a cwq and its worker, whether a work is executing can now only be determined by calling [__]find_worker_executing_work(). After this change, the only association between a cwq and its worker is that a cwq puts a worker into shared worker pool on creation and kills it on destruction. As all workqueues are still limited to max_active of one, this means that there are always at least as many workers as active works and thus there's no danger for deadlock. The break of strong association between cwqs and workers requires somewhat clumsy changes to current_is_keventd() and destroy_workqueue(). Dynamic worker pool management will remove both clumsy changes. current_is_keventd() won't be necessary at all as the only reason it exists is to avoid queueing a work from a work which will be allowed just fine. The clumsy part of destroy_workqueue() is added because a worker can only be destroyed while idle and there's no guarantee a worker is idle when its wq is going down. With dynamic pool management, workers are not associated with workqueues at all and only idle ones will be submitted to destroy_workqueue() so the code won't be necessary anymore. Signed-off-by: Tejun Heo <tj@kernel.org>
-rw-r--r--kernel/workqueue.c131
1 files changed, 99 insertions, 32 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index bce1074bdec1..e697d6c72daa 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -34,6 +34,7 @@
34#include <linux/debug_locks.h> 34#include <linux/debug_locks.h>
35#include <linux/lockdep.h> 35#include <linux/lockdep.h>
36#include <linux/idr.h> 36#include <linux/idr.h>
37#include <linux/delay.h>
37 38
38enum { 39enum {
39 /* global_cwq flags */ 40 /* global_cwq flags */
@@ -72,7 +73,6 @@ enum {
72 */ 73 */
73 74
74struct global_cwq; 75struct global_cwq;
75struct cpu_workqueue_struct;
76 76
77struct worker { 77struct worker {
78 /* on idle list while idle, on busy hash table while busy */ 78 /* on idle list while idle, on busy hash table while busy */
@@ -86,7 +86,6 @@ struct worker {
86 struct list_head scheduled; /* L: scheduled works */ 86 struct list_head scheduled; /* L: scheduled works */
87 struct task_struct *task; /* I: worker task */ 87 struct task_struct *task; /* I: worker task */
88 struct global_cwq *gcwq; /* I: the associated gcwq */ 88 struct global_cwq *gcwq; /* I: the associated gcwq */
89 struct cpu_workqueue_struct *cwq; /* I: the associated cwq */
90 unsigned int flags; /* L: flags */ 89 unsigned int flags; /* L: flags */
91 int id; /* I: worker id */ 90 int id; /* I: worker id */
92}; 91};
@@ -96,6 +95,7 @@ struct worker {
96 */ 95 */
97struct global_cwq { 96struct global_cwq {
98 spinlock_t lock; /* the gcwq lock */ 97 spinlock_t lock; /* the gcwq lock */
98 struct list_head worklist; /* L: list of pending works */
99 unsigned int cpu; /* I: the associated cpu */ 99 unsigned int cpu; /* I: the associated cpu */
100 unsigned int flags; /* L: GCWQ_* flags */ 100 unsigned int flags; /* L: GCWQ_* flags */
101 101
@@ -121,7 +121,6 @@ struct global_cwq {
121 */ 121 */
122struct cpu_workqueue_struct { 122struct cpu_workqueue_struct {
123 struct global_cwq *gcwq; /* I: the associated gcwq */ 123 struct global_cwq *gcwq; /* I: the associated gcwq */
124 struct list_head worklist;
125 struct worker *worker; 124 struct worker *worker;
126 struct workqueue_struct *wq; /* I: the owning workqueue */ 125 struct workqueue_struct *wq; /* I: the owning workqueue */
127 int work_color; /* L: current color */ 126 int work_color; /* L: current color */
@@ -386,6 +385,32 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work)
386 return get_gcwq(cpu); 385 return get_gcwq(cpu);
387} 386}
388 387
388/* Return the first worker. Safe with preemption disabled */
389static struct worker *first_worker(struct global_cwq *gcwq)
390{
391 if (unlikely(list_empty(&gcwq->idle_list)))
392 return NULL;
393
394 return list_first_entry(&gcwq->idle_list, struct worker, entry);
395}
396
397/**
398 * wake_up_worker - wake up an idle worker
399 * @gcwq: gcwq to wake worker for
400 *
401 * Wake up the first idle worker of @gcwq.
402 *
403 * CONTEXT:
404 * spin_lock_irq(gcwq->lock).
405 */
406static void wake_up_worker(struct global_cwq *gcwq)
407{
408 struct worker *worker = first_worker(gcwq);
409
410 if (likely(worker))
411 wake_up_process(worker->task);
412}
413
389/** 414/**
390 * busy_worker_head - return the busy hash head for a work 415 * busy_worker_head - return the busy hash head for a work
391 * @gcwq: gcwq of interest 416 * @gcwq: gcwq of interest
@@ -467,13 +492,14 @@ static struct worker *find_worker_executing_work(struct global_cwq *gcwq,
467} 492}
468 493
469/** 494/**
470 * insert_work - insert a work into cwq 495 * insert_work - insert a work into gcwq
471 * @cwq: cwq @work belongs to 496 * @cwq: cwq @work belongs to
472 * @work: work to insert 497 * @work: work to insert
473 * @head: insertion point 498 * @head: insertion point
474 * @extra_flags: extra WORK_STRUCT_* flags to set 499 * @extra_flags: extra WORK_STRUCT_* flags to set
475 * 500 *
476 * Insert @work into @cwq after @head. 501 * Insert @work which belongs to @cwq into @gcwq after @head.
502 * @extra_flags is or'd to work_struct flags.
477 * 503 *
478 * CONTEXT: 504 * CONTEXT:
479 * spin_lock_irq(gcwq->lock). 505 * spin_lock_irq(gcwq->lock).
@@ -492,7 +518,7 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
492 smp_wmb(); 518 smp_wmb();
493 519
494 list_add_tail(&work->entry, head); 520 list_add_tail(&work->entry, head);
495 wake_up_process(cwq->worker->task); 521 wake_up_worker(cwq->gcwq);
496} 522}
497 523
498/** 524/**
@@ -608,7 +634,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
608 634
609 if (likely(cwq->nr_active < cwq->max_active)) { 635 if (likely(cwq->nr_active < cwq->max_active)) {
610 cwq->nr_active++; 636 cwq->nr_active++;
611 worklist = &cwq->worklist; 637 worklist = &gcwq->worklist;
612 } else 638 } else
613 worklist = &cwq->delayed_works; 639 worklist = &cwq->delayed_works;
614 640
@@ -793,10 +819,10 @@ static struct worker *alloc_worker(void)
793 819
794/** 820/**
795 * create_worker - create a new workqueue worker 821 * create_worker - create a new workqueue worker
796 * @cwq: cwq the new worker will belong to 822 * @gcwq: gcwq the new worker will belong to
797 * @bind: whether to set affinity to @cpu or not 823 * @bind: whether to set affinity to @cpu or not
798 * 824 *
799 * Create a new worker which is bound to @cwq. The returned worker 825 * Create a new worker which is bound to @gcwq. The returned worker
800 * can be started by calling start_worker() or destroyed using 826 * can be started by calling start_worker() or destroyed using
801 * destroy_worker(). 827 * destroy_worker().
802 * 828 *
@@ -806,9 +832,8 @@ static struct worker *alloc_worker(void)
806 * RETURNS: 832 * RETURNS:
807 * Pointer to the newly created worker. 833 * Pointer to the newly created worker.
808 */ 834 */
809static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind) 835static struct worker *create_worker(struct global_cwq *gcwq, bool bind)
810{ 836{
811 struct global_cwq *gcwq = cwq->gcwq;
812 int id = -1; 837 int id = -1;
813 struct worker *worker = NULL; 838 struct worker *worker = NULL;
814 839
@@ -826,7 +851,6 @@ static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
826 goto fail; 851 goto fail;
827 852
828 worker->gcwq = gcwq; 853 worker->gcwq = gcwq;
829 worker->cwq = cwq;
830 worker->id = id; 854 worker->id = id;
831 855
832 worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d", 856 worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
@@ -953,7 +977,7 @@ static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
953 struct work_struct *work = list_first_entry(&cwq->delayed_works, 977 struct work_struct *work = list_first_entry(&cwq->delayed_works,
954 struct work_struct, entry); 978 struct work_struct, entry);
955 979
956 move_linked_works(work, &cwq->worklist, NULL); 980 move_linked_works(work, &cwq->gcwq->worklist, NULL);
957 cwq->nr_active++; 981 cwq->nr_active++;
958} 982}
959 983
@@ -1021,11 +1045,12 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
1021 */ 1045 */
1022static void process_one_work(struct worker *worker, struct work_struct *work) 1046static void process_one_work(struct worker *worker, struct work_struct *work)
1023{ 1047{
1024 struct cpu_workqueue_struct *cwq = worker->cwq; 1048 struct cpu_workqueue_struct *cwq = get_work_cwq(work);
1025 struct global_cwq *gcwq = cwq->gcwq; 1049 struct global_cwq *gcwq = cwq->gcwq;
1026 struct hlist_head *bwh = busy_worker_head(gcwq, work); 1050 struct hlist_head *bwh = busy_worker_head(gcwq, work);
1027 work_func_t f = work->func; 1051 work_func_t f = work->func;
1028 int work_color; 1052 int work_color;
1053 struct worker *collision;
1029#ifdef CONFIG_LOCKDEP 1054#ifdef CONFIG_LOCKDEP
1030 /* 1055 /*
1031 * It is permissible to free the struct work_struct from 1056 * It is permissible to free the struct work_struct from
@@ -1036,6 +1061,18 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
1036 */ 1061 */
1037 struct lockdep_map lockdep_map = work->lockdep_map; 1062 struct lockdep_map lockdep_map = work->lockdep_map;
1038#endif 1063#endif
1064 /*
1065 * A single work shouldn't be executed concurrently by
1066 * multiple workers on a single cpu. Check whether anyone is
1067 * already processing the work. If so, defer the work to the
1068 * currently executing one.
1069 */
1070 collision = __find_worker_executing_work(gcwq, bwh, work);
1071 if (unlikely(collision)) {
1072 move_linked_works(work, &collision->scheduled, NULL);
1073 return;
1074 }
1075
1039 /* claim and process */ 1076 /* claim and process */
1040 debug_work_deactivate(work); 1077 debug_work_deactivate(work);
1041 hlist_add_head(&worker->hentry, bwh); 1078 hlist_add_head(&worker->hentry, bwh);
@@ -1043,7 +1080,6 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
1043 worker->current_cwq = cwq; 1080 worker->current_cwq = cwq;
1044 work_color = get_work_color(work); 1081 work_color = get_work_color(work);
1045 1082
1046 BUG_ON(get_work_cwq(work) != cwq);
1047 /* record the current cpu number in the work data and dequeue */ 1083 /* record the current cpu number in the work data and dequeue */
1048 set_work_cpu(work, gcwq->cpu); 1084 set_work_cpu(work, gcwq->cpu);
1049 list_del_init(&work->entry); 1085 list_del_init(&work->entry);
@@ -1107,7 +1143,6 @@ static int worker_thread(void *__worker)
1107{ 1143{
1108 struct worker *worker = __worker; 1144 struct worker *worker = __worker;
1109 struct global_cwq *gcwq = worker->gcwq; 1145 struct global_cwq *gcwq = worker->gcwq;
1110 struct cpu_workqueue_struct *cwq = worker->cwq;
1111 1146
1112woke_up: 1147woke_up:
1113 spin_lock_irq(&gcwq->lock); 1148 spin_lock_irq(&gcwq->lock);
@@ -1127,9 +1162,9 @@ recheck:
1127 */ 1162 */
1128 BUG_ON(!list_empty(&worker->scheduled)); 1163 BUG_ON(!list_empty(&worker->scheduled));
1129 1164
1130 while (!list_empty(&cwq->worklist)) { 1165 while (!list_empty(&gcwq->worklist)) {
1131 struct work_struct *work = 1166 struct work_struct *work =
1132 list_first_entry(&cwq->worklist, 1167 list_first_entry(&gcwq->worklist,
1133 struct work_struct, entry); 1168 struct work_struct, entry);
1134 1169
1135 /* 1170 /*
@@ -1844,18 +1879,37 @@ int keventd_up(void)
1844 1879
1845int current_is_keventd(void) 1880int current_is_keventd(void)
1846{ 1881{
1847 struct cpu_workqueue_struct *cwq; 1882 bool found = false;
1848 int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */ 1883 unsigned int cpu;
1849 int ret = 0;
1850 1884
1851 BUG_ON(!keventd_wq); 1885 /*
1886 * There no longer is one-to-one relation between worker and
1887 * work queue and a worker task might be unbound from its cpu
1888 * if the cpu was offlined. Match all busy workers. This
1889 * function will go away once dynamic pool is implemented.
1890 */
1891 for_each_possible_cpu(cpu) {
1892 struct global_cwq *gcwq = get_gcwq(cpu);
1893 struct worker *worker;
1894 struct hlist_node *pos;
1895 unsigned long flags;
1896 int i;
1852 1897
1853 cwq = get_cwq(cpu, keventd_wq); 1898 spin_lock_irqsave(&gcwq->lock, flags);
1854 if (current == cwq->worker->task)
1855 ret = 1;
1856 1899
1857 return ret; 1900 for_each_busy_worker(worker, i, pos, gcwq) {
1901 if (worker->task == current) {
1902 found = true;
1903 break;
1904 }
1905 }
1906
1907 spin_unlock_irqrestore(&gcwq->lock, flags);
1908 if (found)
1909 break;
1910 }
1858 1911
1912 return found;
1859} 1913}
1860 1914
1861static struct cpu_workqueue_struct *alloc_cwqs(void) 1915static struct cpu_workqueue_struct *alloc_cwqs(void)
@@ -1953,12 +2007,11 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1953 cwq->wq = wq; 2007 cwq->wq = wq;
1954 cwq->flush_color = -1; 2008 cwq->flush_color = -1;
1955 cwq->max_active = max_active; 2009 cwq->max_active = max_active;
1956 INIT_LIST_HEAD(&cwq->worklist);
1957 INIT_LIST_HEAD(&cwq->delayed_works); 2010 INIT_LIST_HEAD(&cwq->delayed_works);
1958 2011
1959 if (failed) 2012 if (failed)
1960 continue; 2013 continue;
1961 cwq->worker = create_worker(cwq, cpu_online(cpu)); 2014 cwq->worker = create_worker(gcwq, cpu_online(cpu));
1962 if (cwq->worker) 2015 if (cwq->worker)
1963 start_worker(cwq->worker); 2016 start_worker(cwq->worker);
1964 else 2017 else
@@ -2020,13 +2073,26 @@ void destroy_workqueue(struct workqueue_struct *wq)
2020 2073
2021 for_each_possible_cpu(cpu) { 2074 for_each_possible_cpu(cpu) {
2022 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 2075 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2076 struct global_cwq *gcwq = cwq->gcwq;
2023 int i; 2077 int i;
2024 2078
2025 if (cwq->worker) { 2079 if (cwq->worker) {
2026 spin_lock_irq(&cwq->gcwq->lock); 2080 retry:
2081 spin_lock_irq(&gcwq->lock);
2082 /*
2083 * Worker can only be destroyed while idle.
2084 * Wait till it becomes idle. This is ugly
2085 * and prone to starvation. It will go away
2086 * once dynamic worker pool is implemented.
2087 */
2088 if (!(cwq->worker->flags & WORKER_IDLE)) {
2089 spin_unlock_irq(&gcwq->lock);
2090 msleep(100);
2091 goto retry;
2092 }
2027 destroy_worker(cwq->worker); 2093 destroy_worker(cwq->worker);
2028 cwq->worker = NULL; 2094 cwq->worker = NULL;
2029 spin_unlock_irq(&cwq->gcwq->lock); 2095 spin_unlock_irq(&gcwq->lock);
2030 } 2096 }
2031 2097
2032 for (i = 0; i < WORK_NR_COLORS; i++) 2098 for (i = 0; i < WORK_NR_COLORS; i++)
@@ -2324,7 +2390,7 @@ EXPORT_SYMBOL_GPL(work_on_cpu);
2324 * 2390 *
2325 * Start freezing workqueues. After this function returns, all 2391 * Start freezing workqueues. After this function returns, all
2326 * freezeable workqueues will queue new works to their frozen_works 2392 * freezeable workqueues will queue new works to their frozen_works
2327 * list instead of the cwq ones. 2393 * list instead of gcwq->worklist.
2328 * 2394 *
2329 * CONTEXT: 2395 * CONTEXT:
2330 * Grabs and releases workqueue_lock and gcwq->lock's. 2396 * Grabs and releases workqueue_lock and gcwq->lock's.
@@ -2410,7 +2476,7 @@ out_unlock:
2410 * thaw_workqueues - thaw workqueues 2476 * thaw_workqueues - thaw workqueues
2411 * 2477 *
2412 * Thaw workqueues. Normal queueing is restored and all collected 2478 * Thaw workqueues. Normal queueing is restored and all collected
2413 * frozen works are transferred to their respective cwq worklists. 2479 * frozen works are transferred to their respective gcwq worklists.
2414 * 2480 *
2415 * CONTEXT: 2481 * CONTEXT:
2416 * Grabs and releases workqueue_lock and gcwq->lock's. 2482 * Grabs and releases workqueue_lock and gcwq->lock's.
@@ -2483,6 +2549,7 @@ void __init init_workqueues(void)
2483 struct global_cwq *gcwq = get_gcwq(cpu); 2549 struct global_cwq *gcwq = get_gcwq(cpu);
2484 2550
2485 spin_lock_init(&gcwq->lock); 2551 spin_lock_init(&gcwq->lock);
2552 INIT_LIST_HEAD(&gcwq->worklist);
2486 gcwq->cpu = cpu; 2553 gcwq->cpu = cpu;
2487 2554
2488 INIT_LIST_HEAD(&gcwq->idle_list); 2555 INIT_LIST_HEAD(&gcwq->idle_list);