aboutsummaryrefslogtreecommitdiffstats
path: root/kernel/workqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/workqueue.c')
-rw-r--r--kernel/workqueue.c173
1 files changed, 59 insertions, 114 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index f7ab703285a6..dc78956ccf03 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -55,6 +55,7 @@ struct cpu_workqueue_struct {
55 struct list_head worklist; 55 struct list_head worklist;
56 wait_queue_head_t more_work; 56 wait_queue_head_t more_work;
57 struct work_struct *current_work; 57 struct work_struct *current_work;
58 unsigned int cpu;
58 59
59 struct workqueue_struct *wq; /* I: the owning workqueue */ 60 struct workqueue_struct *wq; /* I: the owning workqueue */
60 struct task_struct *thread; 61 struct task_struct *thread;
@@ -189,34 +190,19 @@ static DEFINE_SPINLOCK(workqueue_lock);
189static LIST_HEAD(workqueues); 190static LIST_HEAD(workqueues);
190 191
191static int singlethread_cpu __read_mostly; 192static int singlethread_cpu __read_mostly;
192static const struct cpumask *cpu_singlethread_map __read_mostly;
193/*
194 * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD
195 * flushes cwq->worklist. This means that flush_workqueue/wait_on_work
196 * which comes in between can't use for_each_online_cpu(). We could
197 * use cpu_possible_map, the cpumask below is more a documentation
198 * than optimization.
199 */
200static cpumask_var_t cpu_populated_map __read_mostly;
201
202/* If it's single threaded, it isn't in the list of workqueues. */
203static inline bool is_wq_single_threaded(struct workqueue_struct *wq)
204{
205 return wq->flags & WQ_SINGLE_THREAD;
206}
207 193
208static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq) 194static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
195 struct workqueue_struct *wq)
209{ 196{
210 return is_wq_single_threaded(wq) 197 return per_cpu_ptr(wq->cpu_wq, cpu);
211 ? cpu_singlethread_map : cpu_populated_map;
212} 198}
213 199
214static struct cpu_workqueue_struct *get_cwq(unsigned int cpu, 200static struct cpu_workqueue_struct *target_cwq(unsigned int cpu,
215 struct workqueue_struct *wq) 201 struct workqueue_struct *wq)
216{ 202{
217 if (unlikely(is_wq_single_threaded(wq))) 203 if (unlikely(wq->flags & WQ_SINGLE_THREAD))
218 cpu = singlethread_cpu; 204 cpu = singlethread_cpu;
219 return per_cpu_ptr(wq->cpu_wq, cpu); 205 return get_cwq(cpu, wq);
220} 206}
221 207
222/* 208/*
@@ -279,7 +265,7 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
279static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, 265static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
280 struct work_struct *work) 266 struct work_struct *work)
281{ 267{
282 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 268 struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq);
283 unsigned long flags; 269 unsigned long flags;
284 270
285 debug_work_activate(work); 271 debug_work_activate(work);
@@ -383,7 +369,7 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
383 timer_stats_timer_set_start_info(&dwork->timer); 369 timer_stats_timer_set_start_info(&dwork->timer);
384 370
385 /* This stores cwq for the moment, for the timer_fn */ 371 /* This stores cwq for the moment, for the timer_fn */
386 set_wq_data(work, get_cwq(raw_smp_processor_id(), wq), 0); 372 set_wq_data(work, target_cwq(raw_smp_processor_id(), wq), 0);
387 timer->expires = jiffies + delay; 373 timer->expires = jiffies + delay;
388 timer->data = (unsigned long)dwork; 374 timer->data = (unsigned long)dwork;
389 timer->function = delayed_work_timer_fn; 375 timer->function = delayed_work_timer_fn;
@@ -495,6 +481,10 @@ static int worker_thread(void *__cwq)
495 if (kthread_should_stop()) 481 if (kthread_should_stop())
496 break; 482 break;
497 483
484 if (unlikely(!cpumask_equal(&cwq->thread->cpus_allowed,
485 get_cpu_mask(cwq->cpu))))
486 set_cpus_allowed_ptr(cwq->thread,
487 get_cpu_mask(cwq->cpu));
498 run_workqueue(cwq); 488 run_workqueue(cwq);
499 } 489 }
500 490
@@ -574,14 +564,13 @@ static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
574 */ 564 */
575void flush_workqueue(struct workqueue_struct *wq) 565void flush_workqueue(struct workqueue_struct *wq)
576{ 566{
577 const struct cpumask *cpu_map = wq_cpu_map(wq);
578 int cpu; 567 int cpu;
579 568
580 might_sleep(); 569 might_sleep();
581 lock_map_acquire(&wq->lockdep_map); 570 lock_map_acquire(&wq->lockdep_map);
582 lock_map_release(&wq->lockdep_map); 571 lock_map_release(&wq->lockdep_map);
583 for_each_cpu(cpu, cpu_map) 572 for_each_possible_cpu(cpu)
584 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 573 flush_cpu_workqueue(get_cwq(cpu, wq));
585} 574}
586EXPORT_SYMBOL_GPL(flush_workqueue); 575EXPORT_SYMBOL_GPL(flush_workqueue);
587 576
@@ -699,7 +688,6 @@ static void wait_on_work(struct work_struct *work)
699{ 688{
700 struct cpu_workqueue_struct *cwq; 689 struct cpu_workqueue_struct *cwq;
701 struct workqueue_struct *wq; 690 struct workqueue_struct *wq;
702 const struct cpumask *cpu_map;
703 int cpu; 691 int cpu;
704 692
705 might_sleep(); 693 might_sleep();
@@ -712,9 +700,8 @@ static void wait_on_work(struct work_struct *work)
712 return; 700 return;
713 701
714 wq = cwq->wq; 702 wq = cwq->wq;
715 cpu_map = wq_cpu_map(wq);
716 703
717 for_each_cpu(cpu, cpu_map) 704 for_each_possible_cpu(cpu)
718 wait_on_cpu_work(get_cwq(cpu, wq), work); 705 wait_on_cpu_work(get_cwq(cpu, wq), work);
719} 706}
720 707
@@ -972,7 +959,7 @@ int current_is_keventd(void)
972 959
973 BUG_ON(!keventd_wq); 960 BUG_ON(!keventd_wq);
974 961
975 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); 962 cwq = get_cwq(cpu, keventd_wq);
976 if (current == cwq->thread) 963 if (current == cwq->thread)
977 ret = 1; 964 ret = 1;
978 965
@@ -980,26 +967,12 @@ int current_is_keventd(void)
980 967
981} 968}
982 969
983static struct cpu_workqueue_struct *
984init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
985{
986 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
987
988 cwq->wq = wq;
989 spin_lock_init(&cwq->lock);
990 INIT_LIST_HEAD(&cwq->worklist);
991 init_waitqueue_head(&cwq->more_work);
992
993 return cwq;
994}
995
996static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 970static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
997{ 971{
998 struct workqueue_struct *wq = cwq->wq; 972 struct workqueue_struct *wq = cwq->wq;
999 const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
1000 struct task_struct *p; 973 struct task_struct *p;
1001 974
1002 p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu); 975 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
1003 /* 976 /*
1004 * Nobody can add the work_struct to this cwq, 977 * Nobody can add the work_struct to this cwq,
1005 * if (caller is __create_workqueue) 978 * if (caller is __create_workqueue)
@@ -1031,8 +1004,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1031 struct lock_class_key *key, 1004 struct lock_class_key *key,
1032 const char *lock_name) 1005 const char *lock_name)
1033{ 1006{
1007 bool singlethread = flags & WQ_SINGLE_THREAD;
1034 struct workqueue_struct *wq; 1008 struct workqueue_struct *wq;
1035 struct cpu_workqueue_struct *cwq;
1036 int err = 0, cpu; 1009 int err = 0, cpu;
1037 1010
1038 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 1011 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
@@ -1048,37 +1021,37 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1048 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); 1021 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
1049 INIT_LIST_HEAD(&wq->list); 1022 INIT_LIST_HEAD(&wq->list);
1050 1023
1051 if (flags & WQ_SINGLE_THREAD) { 1024 cpu_maps_update_begin();
1052 cwq = init_cpu_workqueue(wq, singlethread_cpu); 1025 /*
1053 err = create_workqueue_thread(cwq, singlethread_cpu); 1026 * We must initialize cwqs for each possible cpu even if we
1054 start_workqueue_thread(cwq, -1); 1027 * are going to call destroy_workqueue() finally. Otherwise
1055 } else { 1028 * cpu_up() can hit the uninitialized cwq once we drop the
1056 cpu_maps_update_begin(); 1029 * lock.
1057 /* 1030 */
1058 * We must place this wq on list even if the code below fails. 1031 for_each_possible_cpu(cpu) {
1059 * cpu_down(cpu) can remove cpu from cpu_populated_map before 1032 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1060 * destroy_workqueue() takes the lock, in that case we leak 1033
1061 * cwq[cpu]->thread. 1034 cwq->wq = wq;
1062 */ 1035 cwq->cpu = cpu;
1063 spin_lock(&workqueue_lock); 1036 spin_lock_init(&cwq->lock);
1064 list_add(&wq->list, &workqueues); 1037 INIT_LIST_HEAD(&cwq->worklist);
1065 spin_unlock(&workqueue_lock); 1038 init_waitqueue_head(&cwq->more_work);
1066 /* 1039
1067 * We must initialize cwqs for each possible cpu even if we 1040 if (err)
1068 * are going to call destroy_workqueue() finally. Otherwise 1041 continue;
1069 * cpu_up() can hit the uninitialized cwq once we drop the 1042 err = create_workqueue_thread(cwq, cpu);
1070 * lock. 1043 if (cpu_online(cpu) && !singlethread)
1071 */
1072 for_each_possible_cpu(cpu) {
1073 cwq = init_cpu_workqueue(wq, cpu);
1074 if (err || !cpu_online(cpu))
1075 continue;
1076 err = create_workqueue_thread(cwq, cpu);
1077 start_workqueue_thread(cwq, cpu); 1044 start_workqueue_thread(cwq, cpu);
1078 } 1045 else
1079 cpu_maps_update_done(); 1046 start_workqueue_thread(cwq, -1);
1080 } 1047 }
1081 1048
1049 spin_lock(&workqueue_lock);
1050 list_add(&wq->list, &workqueues);
1051 spin_unlock(&workqueue_lock);
1052
1053 cpu_maps_update_done();
1054
1082 if (err) { 1055 if (err) {
1083 destroy_workqueue(wq); 1056 destroy_workqueue(wq);
1084 wq = NULL; 1057 wq = NULL;
@@ -1128,17 +1101,16 @@ static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
1128 */ 1101 */
1129void destroy_workqueue(struct workqueue_struct *wq) 1102void destroy_workqueue(struct workqueue_struct *wq)
1130{ 1103{
1131 const struct cpumask *cpu_map = wq_cpu_map(wq);
1132 int cpu; 1104 int cpu;
1133 1105
1134 cpu_maps_update_begin(); 1106 cpu_maps_update_begin();
1135 spin_lock(&workqueue_lock); 1107 spin_lock(&workqueue_lock);
1136 list_del(&wq->list); 1108 list_del(&wq->list);
1137 spin_unlock(&workqueue_lock); 1109 spin_unlock(&workqueue_lock);
1110 cpu_maps_update_done();
1138 1111
1139 for_each_cpu(cpu, cpu_map) 1112 for_each_possible_cpu(cpu)
1140 cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu)); 1113 cleanup_workqueue_thread(get_cwq(cpu, wq));
1141 cpu_maps_update_done();
1142 1114
1143 free_percpu(wq->cpu_wq); 1115 free_percpu(wq->cpu_wq);
1144 kfree(wq); 1116 kfree(wq);
@@ -1152,48 +1124,25 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
1152 unsigned int cpu = (unsigned long)hcpu; 1124 unsigned int cpu = (unsigned long)hcpu;
1153 struct cpu_workqueue_struct *cwq; 1125 struct cpu_workqueue_struct *cwq;
1154 struct workqueue_struct *wq; 1126 struct workqueue_struct *wq;
1155 int err = 0;
1156 1127
1157 action &= ~CPU_TASKS_FROZEN; 1128 action &= ~CPU_TASKS_FROZEN;
1158 1129
1159 switch (action) {
1160 case CPU_UP_PREPARE:
1161 cpumask_set_cpu(cpu, cpu_populated_map);
1162 }
1163undo:
1164 list_for_each_entry(wq, &workqueues, list) { 1130 list_for_each_entry(wq, &workqueues, list) {
1165 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 1131 if (wq->flags & WQ_SINGLE_THREAD)
1132 continue;
1166 1133
1167 switch (action) { 1134 cwq = get_cwq(cpu, wq);
1168 case CPU_UP_PREPARE:
1169 err = create_workqueue_thread(cwq, cpu);
1170 if (!err)
1171 break;
1172 printk(KERN_ERR "workqueue [%s] for %i failed\n",
1173 wq->name, cpu);
1174 action = CPU_UP_CANCELED;
1175 err = -ENOMEM;
1176 goto undo;
1177
1178 case CPU_ONLINE:
1179 start_workqueue_thread(cwq, cpu);
1180 break;
1181 1135
1182 case CPU_UP_CANCELED: 1136 switch (action) {
1183 start_workqueue_thread(cwq, -1);
1184 case CPU_POST_DEAD: 1137 case CPU_POST_DEAD:
1185 cleanup_workqueue_thread(cwq); 1138 lock_map_acquire(&cwq->wq->lockdep_map);
1139 lock_map_release(&cwq->wq->lockdep_map);
1140 flush_cpu_workqueue(cwq);
1186 break; 1141 break;
1187 } 1142 }
1188 } 1143 }
1189 1144
1190 switch (action) { 1145 return notifier_from_errno(0);
1191 case CPU_UP_CANCELED:
1192 case CPU_POST_DEAD:
1193 cpumask_clear_cpu(cpu, cpu_populated_map);
1194 }
1195
1196 return notifier_from_errno(err);
1197} 1146}
1198 1147
1199#ifdef CONFIG_SMP 1148#ifdef CONFIG_SMP
@@ -1245,11 +1194,7 @@ EXPORT_SYMBOL_GPL(work_on_cpu);
1245 1194
1246void __init init_workqueues(void) 1195void __init init_workqueues(void)
1247{ 1196{
1248 alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);
1249
1250 cpumask_copy(cpu_populated_map, cpu_online_mask);
1251 singlethread_cpu = cpumask_first(cpu_possible_mask); 1197 singlethread_cpu = cpumask_first(cpu_possible_mask);
1252 cpu_singlethread_map = cpumask_of(singlethread_cpu);
1253 hotcpu_notifier(workqueue_cpu_callback, 0); 1198 hotcpu_notifier(workqueue_cpu_callback, 0);
1254 keventd_wq = create_workqueue("events"); 1199 keventd_wq = create_workqueue("events");
1255 BUG_ON(!keventd_wq); 1200 BUG_ON(!keventd_wq);