aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTejun Heo <tj@kernel.org>2010-06-29 04:07:11 -0400
committerTejun Heo <tj@kernel.org>2010-06-29 04:07:11 -0400
commit1537663f5763892cacf1409ac0efef1b4f332d1e (patch)
treeb2fe110d52315438c71b16f14d8a1b043b91deb4
parent64166699752006f1a23a9cf7c96ae36654ccfc2c (diff)
workqueue: kill cpu_populated_map
Worker management is about to be overhauled. Simplify things by removing cpu_populated_map, creating workers for all possible cpus and making single threaded workqueues behave more like multi threaded ones. After this patch, all cwqs are always initialized, all workqueues are linked on the workqueues list and workers for all possibles cpus always exist. This also makes CPU hotplug support simpler - checking ->cpus_allowed before processing works in worker_thread() and flushing cwqs on CPU_POST_DEAD are enough. While at it, make get_cwq() always return the cwq for the specified cpu, add target_cwq() for cases where single thread distinction is necessary and drop all direct usage of per_cpu_ptr() on wq->cpu_wq. Signed-off-by: Tejun Heo <tj@kernel.org>
-rw-r--r--kernel/workqueue.c173
1 files changed, 59 insertions, 114 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index f7ab703285a6..dc78956ccf03 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -55,6 +55,7 @@ struct cpu_workqueue_struct {
55 struct list_head worklist; 55 struct list_head worklist;
56 wait_queue_head_t more_work; 56 wait_queue_head_t more_work;
57 struct work_struct *current_work; 57 struct work_struct *current_work;
58 unsigned int cpu;
58 59
59 struct workqueue_struct *wq; /* I: the owning workqueue */ 60 struct workqueue_struct *wq; /* I: the owning workqueue */
60 struct task_struct *thread; 61 struct task_struct *thread;
@@ -189,34 +190,19 @@ static DEFINE_SPINLOCK(workqueue_lock);
189static LIST_HEAD(workqueues); 190static LIST_HEAD(workqueues);
190 191
191static int singlethread_cpu __read_mostly; 192static int singlethread_cpu __read_mostly;
192static const struct cpumask *cpu_singlethread_map __read_mostly;
193/*
194 * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD
195 * flushes cwq->worklist. This means that flush_workqueue/wait_on_work
196 * which comes in between can't use for_each_online_cpu(). We could
197 * use cpu_possible_map, the cpumask below is more a documentation
198 * than optimization.
199 */
200static cpumask_var_t cpu_populated_map __read_mostly;
201
202/* If it's single threaded, it isn't in the list of workqueues. */
203static inline bool is_wq_single_threaded(struct workqueue_struct *wq)
204{
205 return wq->flags & WQ_SINGLE_THREAD;
206}
207 193
208static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq) 194static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
195 struct workqueue_struct *wq)
209{ 196{
210 return is_wq_single_threaded(wq) 197 return per_cpu_ptr(wq->cpu_wq, cpu);
211 ? cpu_singlethread_map : cpu_populated_map;
212} 198}
213 199
214static struct cpu_workqueue_struct *get_cwq(unsigned int cpu, 200static struct cpu_workqueue_struct *target_cwq(unsigned int cpu,
215 struct workqueue_struct *wq) 201 struct workqueue_struct *wq)
216{ 202{
217 if (unlikely(is_wq_single_threaded(wq))) 203 if (unlikely(wq->flags & WQ_SINGLE_THREAD))
218 cpu = singlethread_cpu; 204 cpu = singlethread_cpu;
219 return per_cpu_ptr(wq->cpu_wq, cpu); 205 return get_cwq(cpu, wq);
220} 206}
221 207
222/* 208/*
@@ -279,7 +265,7 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
279static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, 265static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
280 struct work_struct *work) 266 struct work_struct *work)
281{ 267{
282 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 268 struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq);
283 unsigned long flags; 269 unsigned long flags;
284 270
285 debug_work_activate(work); 271 debug_work_activate(work);
@@ -383,7 +369,7 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
383 timer_stats_timer_set_start_info(&dwork->timer); 369 timer_stats_timer_set_start_info(&dwork->timer);
384 370
385 /* This stores cwq for the moment, for the timer_fn */ 371 /* This stores cwq for the moment, for the timer_fn */
386 set_wq_data(work, get_cwq(raw_smp_processor_id(), wq), 0); 372 set_wq_data(work, target_cwq(raw_smp_processor_id(), wq), 0);
387 timer->expires = jiffies + delay; 373 timer->expires = jiffies + delay;
388 timer->data = (unsigned long)dwork; 374 timer->data = (unsigned long)dwork;
389 timer->function = delayed_work_timer_fn; 375 timer->function = delayed_work_timer_fn;
@@ -495,6 +481,10 @@ static int worker_thread(void *__cwq)
495 if (kthread_should_stop()) 481 if (kthread_should_stop())
496 break; 482 break;
497 483
484 if (unlikely(!cpumask_equal(&cwq->thread->cpus_allowed,
485 get_cpu_mask(cwq->cpu))))
486 set_cpus_allowed_ptr(cwq->thread,
487 get_cpu_mask(cwq->cpu));
498 run_workqueue(cwq); 488 run_workqueue(cwq);
499 } 489 }
500 490
@@ -574,14 +564,13 @@ static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
574 */ 564 */
575void flush_workqueue(struct workqueue_struct *wq) 565void flush_workqueue(struct workqueue_struct *wq)
576{ 566{
577 const struct cpumask *cpu_map = wq_cpu_map(wq);
578 int cpu; 567 int cpu;
579 568
580 might_sleep(); 569 might_sleep();
581 lock_map_acquire(&wq->lockdep_map); 570 lock_map_acquire(&wq->lockdep_map);
582 lock_map_release(&wq->lockdep_map); 571 lock_map_release(&wq->lockdep_map);
583 for_each_cpu(cpu, cpu_map) 572 for_each_possible_cpu(cpu)
584 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 573 flush_cpu_workqueue(get_cwq(cpu, wq));
585} 574}
586EXPORT_SYMBOL_GPL(flush_workqueue); 575EXPORT_SYMBOL_GPL(flush_workqueue);
587 576
@@ -699,7 +688,6 @@ static void wait_on_work(struct work_struct *work)
699{ 688{
700 struct cpu_workqueue_struct *cwq; 689 struct cpu_workqueue_struct *cwq;
701 struct workqueue_struct *wq; 690 struct workqueue_struct *wq;
702 const struct cpumask *cpu_map;
703 int cpu; 691 int cpu;
704 692
705 might_sleep(); 693 might_sleep();
@@ -712,9 +700,8 @@ static void wait_on_work(struct work_struct *work)
712 return; 700 return;
713 701
714 wq = cwq->wq; 702 wq = cwq->wq;
715 cpu_map = wq_cpu_map(wq);
716 703
717 for_each_cpu(cpu, cpu_map) 704 for_each_possible_cpu(cpu)
718 wait_on_cpu_work(get_cwq(cpu, wq), work); 705 wait_on_cpu_work(get_cwq(cpu, wq), work);
719} 706}
720 707
@@ -972,7 +959,7 @@ int current_is_keventd(void)
972 959
973 BUG_ON(!keventd_wq); 960 BUG_ON(!keventd_wq);
974 961
975 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); 962 cwq = get_cwq(cpu, keventd_wq);
976 if (current == cwq->thread) 963 if (current == cwq->thread)
977 ret = 1; 964 ret = 1;
978 965
@@ -980,26 +967,12 @@ int current_is_keventd(void)
980 967
981} 968}
982 969
983static struct cpu_workqueue_struct *
984init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
985{
986 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
987
988 cwq->wq = wq;
989 spin_lock_init(&cwq->lock);
990 INIT_LIST_HEAD(&cwq->worklist);
991 init_waitqueue_head(&cwq->more_work);
992
993 return cwq;
994}
995
996static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 970static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
997{ 971{
998 struct workqueue_struct *wq = cwq->wq; 972 struct workqueue_struct *wq = cwq->wq;
999 const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
1000 struct task_struct *p; 973 struct task_struct *p;
1001 974
1002 p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu); 975 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
1003 /* 976 /*
1004 * Nobody can add the work_struct to this cwq, 977 * Nobody can add the work_struct to this cwq,
1005 * if (caller is __create_workqueue) 978 * if (caller is __create_workqueue)
@@ -1031,8 +1004,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1031 struct lock_class_key *key, 1004 struct lock_class_key *key,
1032 const char *lock_name) 1005 const char *lock_name)
1033{ 1006{
1007 bool singlethread = flags & WQ_SINGLE_THREAD;
1034 struct workqueue_struct *wq; 1008 struct workqueue_struct *wq;
1035 struct cpu_workqueue_struct *cwq;
1036 int err = 0, cpu; 1009 int err = 0, cpu;
1037 1010
1038 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 1011 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
@@ -1048,37 +1021,37 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1048 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); 1021 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
1049 INIT_LIST_HEAD(&wq->list); 1022 INIT_LIST_HEAD(&wq->list);
1050 1023
1051 if (flags & WQ_SINGLE_THREAD) { 1024 cpu_maps_update_begin();
1052 cwq = init_cpu_workqueue(wq, singlethread_cpu); 1025 /*
1053 err = create_workqueue_thread(cwq, singlethread_cpu); 1026 * We must initialize cwqs for each possible cpu even if we
1054 start_workqueue_thread(cwq, -1); 1027 * are going to call destroy_workqueue() finally. Otherwise
1055 } else { 1028 * cpu_up() can hit the uninitialized cwq once we drop the
1056 cpu_maps_update_begin(); 1029 * lock.
1057 /* 1030 */
1058 * We must place this wq on list even if the code below fails. 1031 for_each_possible_cpu(cpu) {
1059 * cpu_down(cpu) can remove cpu from cpu_populated_map before 1032 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1060 * destroy_workqueue() takes the lock, in that case we leak 1033
1061 * cwq[cpu]->thread. 1034 cwq->wq = wq;
1062 */ 1035 cwq->cpu = cpu;
1063 spin_lock(&workqueue_lock); 1036 spin_lock_init(&cwq->lock);
1064 list_add(&wq->list, &workqueues); 1037 INIT_LIST_HEAD(&cwq->worklist);
1065 spin_unlock(&workqueue_lock); 1038 init_waitqueue_head(&cwq->more_work);
1066 /* 1039
1067 * We must initialize cwqs for each possible cpu even if we 1040 if (err)
1068 * are going to call destroy_workqueue() finally. Otherwise 1041 continue;
1069 * cpu_up() can hit the uninitialized cwq once we drop the 1042 err = create_workqueue_thread(cwq, cpu);
1070 * lock. 1043 if (cpu_online(cpu) && !singlethread)
1071 */
1072 for_each_possible_cpu(cpu) {
1073 cwq = init_cpu_workqueue(wq, cpu);
1074 if (err || !cpu_online(cpu))
1075 continue;
1076 err = create_workqueue_thread(cwq, cpu);
1077 start_workqueue_thread(cwq, cpu); 1044 start_workqueue_thread(cwq, cpu);
1078 } 1045 else
1079 cpu_maps_update_done(); 1046 start_workqueue_thread(cwq, -1);
1080 } 1047 }
1081 1048
1049 spin_lock(&workqueue_lock);
1050 list_add(&wq->list, &workqueues);
1051 spin_unlock(&workqueue_lock);
1052
1053 cpu_maps_update_done();
1054
1082 if (err) { 1055 if (err) {
1083 destroy_workqueue(wq); 1056 destroy_workqueue(wq);
1084 wq = NULL; 1057 wq = NULL;
@@ -1128,17 +1101,16 @@ static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
1128 */ 1101 */
1129void destroy_workqueue(struct workqueue_struct *wq) 1102void destroy_workqueue(struct workqueue_struct *wq)
1130{ 1103{
1131 const struct cpumask *cpu_map = wq_cpu_map(wq);
1132 int cpu; 1104 int cpu;
1133 1105
1134 cpu_maps_update_begin(); 1106 cpu_maps_update_begin();
1135 spin_lock(&workqueue_lock); 1107 spin_lock(&workqueue_lock);
1136 list_del(&wq->list); 1108 list_del(&wq->list);
1137 spin_unlock(&workqueue_lock); 1109 spin_unlock(&workqueue_lock);
1110 cpu_maps_update_done();
1138 1111
1139 for_each_cpu(cpu, cpu_map) 1112 for_each_possible_cpu(cpu)
1140 cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu)); 1113 cleanup_workqueue_thread(get_cwq(cpu, wq));
1141 cpu_maps_update_done();
1142 1114
1143 free_percpu(wq->cpu_wq); 1115 free_percpu(wq->cpu_wq);
1144 kfree(wq); 1116 kfree(wq);
@@ -1152,48 +1124,25 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
1152 unsigned int cpu = (unsigned long)hcpu; 1124 unsigned int cpu = (unsigned long)hcpu;
1153 struct cpu_workqueue_struct *cwq; 1125 struct cpu_workqueue_struct *cwq;
1154 struct workqueue_struct *wq; 1126 struct workqueue_struct *wq;
1155 int err = 0;
1156 1127
1157 action &= ~CPU_TASKS_FROZEN; 1128 action &= ~CPU_TASKS_FROZEN;
1158 1129
1159 switch (action) {
1160 case CPU_UP_PREPARE:
1161 cpumask_set_cpu(cpu, cpu_populated_map);
1162 }
1163undo:
1164 list_for_each_entry(wq, &workqueues, list) { 1130 list_for_each_entry(wq, &workqueues, list) {
1165 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 1131 if (wq->flags & WQ_SINGLE_THREAD)
1132 continue;
1166 1133
1167 switch (action) { 1134 cwq = get_cwq(cpu, wq);
1168 case CPU_UP_PREPARE:
1169 err = create_workqueue_thread(cwq, cpu);
1170 if (!err)
1171 break;
1172 printk(KERN_ERR "workqueue [%s] for %i failed\n",
1173 wq->name, cpu);
1174 action = CPU_UP_CANCELED;
1175 err = -ENOMEM;
1176 goto undo;
1177
1178 case CPU_ONLINE:
1179 start_workqueue_thread(cwq, cpu);
1180 break;
1181 1135
1182 case CPU_UP_CANCELED: 1136 switch (action) {
1183 start_workqueue_thread(cwq, -1);
1184 case CPU_POST_DEAD: 1137 case CPU_POST_DEAD:
1185 cleanup_workqueue_thread(cwq); 1138 lock_map_acquire(&cwq->wq->lockdep_map);
1139 lock_map_release(&cwq->wq->lockdep_map);
1140 flush_cpu_workqueue(cwq);
1186 break; 1141 break;
1187 } 1142 }
1188 } 1143 }
1189 1144
1190 switch (action) { 1145 return notifier_from_errno(0);
1191 case CPU_UP_CANCELED:
1192 case CPU_POST_DEAD:
1193 cpumask_clear_cpu(cpu, cpu_populated_map);
1194 }
1195
1196 return notifier_from_errno(err);
1197} 1146}
1198 1147
1199#ifdef CONFIG_SMP 1148#ifdef CONFIG_SMP
@@ -1245,11 +1194,7 @@ EXPORT_SYMBOL_GPL(work_on_cpu);
1245 1194
1246void __init init_workqueues(void) 1195void __init init_workqueues(void)
1247{ 1196{
1248 alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);
1249
1250 cpumask_copy(cpu_populated_map, cpu_online_mask);
1251 singlethread_cpu = cpumask_first(cpu_possible_mask); 1197 singlethread_cpu = cpumask_first(cpu_possible_mask);
1252 cpu_singlethread_map = cpumask_of(singlethread_cpu);
1253 hotcpu_notifier(workqueue_cpu_callback, 0); 1198 hotcpu_notifier(workqueue_cpu_callback, 0);
1254 keventd_wq = create_workqueue("events"); 1199 keventd_wq = create_workqueue("events");
1255 BUG_ON(!keventd_wq); 1200 BUG_ON(!keventd_wq);