aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTejun Heo <tj@kernel.org>2010-06-29 04:07:11 -0400
committerTejun Heo <tj@kernel.org>2010-06-29 04:07:11 -0400
commitc34056a3fdde777c079cc8a70785c2602f2586cb (patch)
tree583cc13af000d1c085b1f951f8975eee78cea54c
parent73f53c4aa732eced5fcb1844d3d452c30905f20f (diff)
workqueue: introduce worker
Separate out worker thread related information to struct worker from struct cpu_workqueue_struct and implement helper functions to deal with the new struct worker. The only change which is visible outside is that now workqueue worker are all named "kworker/CPUID:WORKERID" where WORKERID is allocated from per-cpu ida. This is in preparation of concurrency managed workqueue where shared multiple workers would be available per cpu. Signed-off-by: Tejun Heo <tj@kernel.org>
-rw-r--r--kernel/workqueue.c211
1 files changed, 150 insertions, 61 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 56e47c59d73b..600db10a4dbf 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -33,6 +33,7 @@
33#include <linux/kallsyms.h> 33#include <linux/kallsyms.h>
34#include <linux/debug_locks.h> 34#include <linux/debug_locks.h>
35#include <linux/lockdep.h> 35#include <linux/lockdep.h>
36#include <linux/idr.h>
36 37
37/* 38/*
38 * Structure fields follow one of the following exclusion rules. 39 * Structure fields follow one of the following exclusion rules.
@@ -46,6 +47,15 @@
46 * W: workqueue_lock protected. 47 * W: workqueue_lock protected.
47 */ 48 */
48 49
50struct cpu_workqueue_struct;
51
52struct worker {
53 struct work_struct *current_work; /* L: work being processed */
54 struct task_struct *task; /* I: worker task */
55 struct cpu_workqueue_struct *cwq; /* I: the associated cwq */
56 int id; /* I: worker id */
57};
58
49/* 59/*
50 * The per-CPU workqueue (if single thread, we always use the first 60 * The per-CPU workqueue (if single thread, we always use the first
51 * possible cpu). The lower WORK_STRUCT_FLAG_BITS of 61 * possible cpu). The lower WORK_STRUCT_FLAG_BITS of
@@ -58,15 +68,14 @@ struct cpu_workqueue_struct {
58 68
59 struct list_head worklist; 69 struct list_head worklist;
60 wait_queue_head_t more_work; 70 wait_queue_head_t more_work;
61 struct work_struct *current_work;
62 unsigned int cpu; 71 unsigned int cpu;
72 struct worker *worker;
63 73
64 struct workqueue_struct *wq; /* I: the owning workqueue */ 74 struct workqueue_struct *wq; /* I: the owning workqueue */
65 int work_color; /* L: current color */ 75 int work_color; /* L: current color */
66 int flush_color; /* L: flushing color */ 76 int flush_color; /* L: flushing color */
67 int nr_in_flight[WORK_NR_COLORS]; 77 int nr_in_flight[WORK_NR_COLORS];
68 /* L: nr of in_flight works */ 78 /* L: nr of in_flight works */
69 struct task_struct *thread;
70}; 79};
71 80
72/* 81/*
@@ -214,6 +223,9 @@ static inline void debug_work_deactivate(struct work_struct *work) { }
214/* Serializes the accesses to the list of workqueues. */ 223/* Serializes the accesses to the list of workqueues. */
215static DEFINE_SPINLOCK(workqueue_lock); 224static DEFINE_SPINLOCK(workqueue_lock);
216static LIST_HEAD(workqueues); 225static LIST_HEAD(workqueues);
226static DEFINE_PER_CPU(struct ida, worker_ida);
227
228static int worker_thread(void *__worker);
217 229
218static int singlethread_cpu __read_mostly; 230static int singlethread_cpu __read_mostly;
219 231
@@ -428,6 +440,105 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
428} 440}
429EXPORT_SYMBOL_GPL(queue_delayed_work_on); 441EXPORT_SYMBOL_GPL(queue_delayed_work_on);
430 442
443static struct worker *alloc_worker(void)
444{
445 struct worker *worker;
446
447 worker = kzalloc(sizeof(*worker), GFP_KERNEL);
448 return worker;
449}
450
451/**
452 * create_worker - create a new workqueue worker
453 * @cwq: cwq the new worker will belong to
454 * @bind: whether to set affinity to @cpu or not
455 *
456 * Create a new worker which is bound to @cwq. The returned worker
457 * can be started by calling start_worker() or destroyed using
458 * destroy_worker().
459 *
460 * CONTEXT:
461 * Might sleep. Does GFP_KERNEL allocations.
462 *
463 * RETURNS:
464 * Pointer to the newly created worker.
465 */
466static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
467{
468 int id = -1;
469 struct worker *worker = NULL;
470
471 spin_lock(&workqueue_lock);
472 while (ida_get_new(&per_cpu(worker_ida, cwq->cpu), &id)) {
473 spin_unlock(&workqueue_lock);
474 if (!ida_pre_get(&per_cpu(worker_ida, cwq->cpu), GFP_KERNEL))
475 goto fail;
476 spin_lock(&workqueue_lock);
477 }
478 spin_unlock(&workqueue_lock);
479
480 worker = alloc_worker();
481 if (!worker)
482 goto fail;
483
484 worker->cwq = cwq;
485 worker->id = id;
486
487 worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
488 cwq->cpu, id);
489 if (IS_ERR(worker->task))
490 goto fail;
491
492 if (bind)
493 kthread_bind(worker->task, cwq->cpu);
494
495 return worker;
496fail:
497 if (id >= 0) {
498 spin_lock(&workqueue_lock);
499 ida_remove(&per_cpu(worker_ida, cwq->cpu), id);
500 spin_unlock(&workqueue_lock);
501 }
502 kfree(worker);
503 return NULL;
504}
505
506/**
507 * start_worker - start a newly created worker
508 * @worker: worker to start
509 *
510 * Start @worker.
511 *
512 * CONTEXT:
513 * spin_lock_irq(cwq->lock).
514 */
515static void start_worker(struct worker *worker)
516{
517 wake_up_process(worker->task);
518}
519
520/**
521 * destroy_worker - destroy a workqueue worker
522 * @worker: worker to be destroyed
523 *
524 * Destroy @worker.
525 */
526static void destroy_worker(struct worker *worker)
527{
528 int cpu = worker->cwq->cpu;
529 int id = worker->id;
530
531 /* sanity check frenzy */
532 BUG_ON(worker->current_work);
533
534 kthread_stop(worker->task);
535 kfree(worker);
536
537 spin_lock(&workqueue_lock);
538 ida_remove(&per_cpu(worker_ida, cpu), id);
539 spin_unlock(&workqueue_lock);
540}
541
431/** 542/**
432 * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight 543 * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
433 * @cwq: cwq of interest 544 * @cwq: cwq of interest
@@ -468,7 +579,7 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
468 579
469/** 580/**
470 * process_one_work - process single work 581 * process_one_work - process single work
471 * @cwq: cwq to process work for 582 * @worker: self
472 * @work: work to process 583 * @work: work to process
473 * 584 *
474 * Process @work. This function contains all the logics necessary to 585 * Process @work. This function contains all the logics necessary to
@@ -480,9 +591,9 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
480 * CONTEXT: 591 * CONTEXT:
481 * spin_lock_irq(cwq->lock) which is released and regrabbed. 592 * spin_lock_irq(cwq->lock) which is released and regrabbed.
482 */ 593 */
483static void process_one_work(struct cpu_workqueue_struct *cwq, 594static void process_one_work(struct worker *worker, struct work_struct *work)
484 struct work_struct *work)
485{ 595{
596 struct cpu_workqueue_struct *cwq = worker->cwq;
486 work_func_t f = work->func; 597 work_func_t f = work->func;
487 int work_color; 598 int work_color;
488#ifdef CONFIG_LOCKDEP 599#ifdef CONFIG_LOCKDEP
@@ -497,7 +608,7 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
497#endif 608#endif
498 /* claim and process */ 609 /* claim and process */
499 debug_work_deactivate(work); 610 debug_work_deactivate(work);
500 cwq->current_work = work; 611 worker->current_work = work;
501 work_color = get_work_color(work); 612 work_color = get_work_color(work);
502 list_del_init(&work->entry); 613 list_del_init(&work->entry);
503 614
@@ -524,30 +635,33 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
524 spin_lock_irq(&cwq->lock); 635 spin_lock_irq(&cwq->lock);
525 636
526 /* we're done with it, release */ 637 /* we're done with it, release */
527 cwq->current_work = NULL; 638 worker->current_work = NULL;
528 cwq_dec_nr_in_flight(cwq, work_color); 639 cwq_dec_nr_in_flight(cwq, work_color);
529} 640}
530 641
531static void run_workqueue(struct cpu_workqueue_struct *cwq) 642static void run_workqueue(struct worker *worker)
532{ 643{
644 struct cpu_workqueue_struct *cwq = worker->cwq;
645
533 spin_lock_irq(&cwq->lock); 646 spin_lock_irq(&cwq->lock);
534 while (!list_empty(&cwq->worklist)) { 647 while (!list_empty(&cwq->worklist)) {
535 struct work_struct *work = list_entry(cwq->worklist.next, 648 struct work_struct *work = list_entry(cwq->worklist.next,
536 struct work_struct, entry); 649 struct work_struct, entry);
537 process_one_work(cwq, work); 650 process_one_work(worker, work);
538 } 651 }
539 spin_unlock_irq(&cwq->lock); 652 spin_unlock_irq(&cwq->lock);
540} 653}
541 654
542/** 655/**
543 * worker_thread - the worker thread function 656 * worker_thread - the worker thread function
544 * @__cwq: cwq to serve 657 * @__worker: self
545 * 658 *
546 * The cwq worker thread function. 659 * The cwq worker thread function.
547 */ 660 */
548static int worker_thread(void *__cwq) 661static int worker_thread(void *__worker)
549{ 662{
550 struct cpu_workqueue_struct *cwq = __cwq; 663 struct worker *worker = __worker;
664 struct cpu_workqueue_struct *cwq = worker->cwq;
551 DEFINE_WAIT(wait); 665 DEFINE_WAIT(wait);
552 666
553 if (cwq->wq->flags & WQ_FREEZEABLE) 667 if (cwq->wq->flags & WQ_FREEZEABLE)
@@ -566,11 +680,11 @@ static int worker_thread(void *__cwq)
566 if (kthread_should_stop()) 680 if (kthread_should_stop())
567 break; 681 break;
568 682
569 if (unlikely(!cpumask_equal(&cwq->thread->cpus_allowed, 683 if (unlikely(!cpumask_equal(&worker->task->cpus_allowed,
570 get_cpu_mask(cwq->cpu)))) 684 get_cpu_mask(cwq->cpu))))
571 set_cpus_allowed_ptr(cwq->thread, 685 set_cpus_allowed_ptr(worker->task,
572 get_cpu_mask(cwq->cpu)); 686 get_cpu_mask(cwq->cpu));
573 run_workqueue(cwq); 687 run_workqueue(worker);
574 } 688 }
575 689
576 return 0; 690 return 0;
@@ -873,7 +987,7 @@ int flush_work(struct work_struct *work)
873 goto already_gone; 987 goto already_gone;
874 prev = &work->entry; 988 prev = &work->entry;
875 } else { 989 } else {
876 if (cwq->current_work != work) 990 if (!cwq->worker || cwq->worker->current_work != work)
877 goto already_gone; 991 goto already_gone;
878 prev = &cwq->worklist; 992 prev = &cwq->worklist;
879 } 993 }
@@ -937,7 +1051,7 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
937 int running = 0; 1051 int running = 0;
938 1052
939 spin_lock_irq(&cwq->lock); 1053 spin_lock_irq(&cwq->lock);
940 if (unlikely(cwq->current_work == work)) { 1054 if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
941 insert_wq_barrier(cwq, &barr, cwq->worklist.next); 1055 insert_wq_barrier(cwq, &barr, cwq->worklist.next);
942 running = 1; 1056 running = 1;
943 } 1057 }
@@ -1225,7 +1339,7 @@ int current_is_keventd(void)
1225 BUG_ON(!keventd_wq); 1339 BUG_ON(!keventd_wq);
1226 1340
1227 cwq = get_cwq(cpu, keventd_wq); 1341 cwq = get_cwq(cpu, keventd_wq);
1228 if (current == cwq->thread) 1342 if (current == cwq->worker->task)
1229 ret = 1; 1343 ret = 1;
1230 1344
1231 return ret; 1345 return ret;
@@ -1279,38 +1393,6 @@ static void free_cwqs(struct cpu_workqueue_struct *cwqs)
1279#endif 1393#endif
1280} 1394}
1281 1395
1282static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
1283{
1284 struct workqueue_struct *wq = cwq->wq;
1285 struct task_struct *p;
1286
1287 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
1288 /*
1289 * Nobody can add the work_struct to this cwq,
1290 * if (caller is __create_workqueue)
1291 * nobody should see this wq
1292 * else // caller is CPU_UP_PREPARE
1293 * cpu is not on cpu_online_map
1294 * so we can abort safely.
1295 */
1296 if (IS_ERR(p))
1297 return PTR_ERR(p);
1298 cwq->thread = p;
1299
1300 return 0;
1301}
1302
1303static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
1304{
1305 struct task_struct *p = cwq->thread;
1306
1307 if (p != NULL) {
1308 if (cpu >= 0)
1309 kthread_bind(p, cpu);
1310 wake_up_process(p);
1311 }
1312}
1313
1314struct workqueue_struct *__create_workqueue_key(const char *name, 1396struct workqueue_struct *__create_workqueue_key(const char *name,
1315 unsigned int flags, 1397 unsigned int flags,
1316 struct lock_class_key *key, 1398 struct lock_class_key *key,
@@ -1318,7 +1400,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1318{ 1400{
1319 bool singlethread = flags & WQ_SINGLE_THREAD; 1401 bool singlethread = flags & WQ_SINGLE_THREAD;
1320 struct workqueue_struct *wq; 1402 struct workqueue_struct *wq;
1321 int err = 0, cpu; 1403 bool failed = false;
1404 unsigned int cpu;
1322 1405
1323 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 1406 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
1324 if (!wq) 1407 if (!wq)
@@ -1348,20 +1431,21 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1348 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 1431 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1349 1432
1350 BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK); 1433 BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
1351 cwq->wq = wq;
1352 cwq->cpu = cpu; 1434 cwq->cpu = cpu;
1435 cwq->wq = wq;
1353 cwq->flush_color = -1; 1436 cwq->flush_color = -1;
1354 spin_lock_init(&cwq->lock); 1437 spin_lock_init(&cwq->lock);
1355 INIT_LIST_HEAD(&cwq->worklist); 1438 INIT_LIST_HEAD(&cwq->worklist);
1356 init_waitqueue_head(&cwq->more_work); 1439 init_waitqueue_head(&cwq->more_work);
1357 1440
1358 if (err) 1441 if (failed)
1359 continue; 1442 continue;
1360 err = create_workqueue_thread(cwq, cpu); 1443 cwq->worker = create_worker(cwq,
1361 if (cpu_online(cpu) && !singlethread) 1444 cpu_online(cpu) && !singlethread);
1362 start_workqueue_thread(cwq, cpu); 1445 if (cwq->worker)
1446 start_worker(cwq->worker);
1363 else 1447 else
1364 start_workqueue_thread(cwq, -1); 1448 failed = true;
1365 } 1449 }
1366 1450
1367 spin_lock(&workqueue_lock); 1451 spin_lock(&workqueue_lock);
@@ -1370,7 +1454,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
1370 1454
1371 cpu_maps_update_done(); 1455 cpu_maps_update_done();
1372 1456
1373 if (err) { 1457 if (failed) {
1374 destroy_workqueue(wq); 1458 destroy_workqueue(wq);
1375 wq = NULL; 1459 wq = NULL;
1376 } 1460 }
@@ -1406,9 +1490,9 @@ void destroy_workqueue(struct workqueue_struct *wq)
1406 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 1490 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
1407 int i; 1491 int i;
1408 1492
1409 if (cwq->thread) { 1493 if (cwq->worker) {
1410 kthread_stop(cwq->thread); 1494 destroy_worker(cwq->worker);
1411 cwq->thread = NULL; 1495 cwq->worker = NULL;
1412 } 1496 }
1413 1497
1414 for (i = 0; i < WORK_NR_COLORS; i++) 1498 for (i = 0; i < WORK_NR_COLORS; i++)
@@ -1495,6 +1579,11 @@ EXPORT_SYMBOL_GPL(work_on_cpu);
1495 1579
1496void __init init_workqueues(void) 1580void __init init_workqueues(void)
1497{ 1581{
1582 unsigned int cpu;
1583
1584 for_each_possible_cpu(cpu)
1585 ida_init(&per_cpu(worker_ida, cpu));
1586
1498 singlethread_cpu = cpumask_first(cpu_possible_mask); 1587 singlethread_cpu = cpumask_first(cpu_possible_mask);
1499 hotcpu_notifier(workqueue_cpu_callback, 0); 1588 hotcpu_notifier(workqueue_cpu_callback, 0);
1500 keventd_wq = create_workqueue("events"); 1589 keventd_wq = create_workqueue("events");