aboutsummaryrefslogtreecommitdiffstats
path: root/kernel/workqueue.c
diff options
context:
space:
mode:
authorOleg Nesterov <oleg@tv-sign.ru>2007-05-09 05:34:09 -0400
committerLinus Torvalds <torvalds@woody.linux-foundation.org>2007-05-09 15:30:52 -0400
commit3af24433efac62f451bfdb1cf1edb7181fb73645 (patch)
tree330353b50a88615ef6e99440e8412667ae0a855e /kernel/workqueue.c
parent36aa9dfc39bf473780439f5629c30f59d677e793 (diff)
workqueue: don't migrate pending works from the dead CPU
Currently CPU_DEAD uses kthread_stop() to stop cwq->thread and then transfers cwq->worklist to another CPU. However, it is very unlikely that worker_thread() will notice kthread_should_stop() before flushing cwq->worklist. It is only possible if worker_thread() was preempted after run_workqueue(cwq), a new work_struct was added, and CPU_DEAD happened before cwq->thread has a chance to run. This means that take_over_work() mostly adds unneeded complications. Note also that kthread_stop() is not good per se, wake_up_process() may confuse work->func() if it sleeps waiting for some event. Remove take_over_work() and migrate_sequence complications. CPU_DEAD sets the cwq->should_stop flag (introduced by this patch) and waits for cwq->thread to flush cwq->worklist and exit. Because the dead CPU is not on cpu_online_map, no more works can be added to that cwq. cpu_populated_map was introduced to optimize for_each_possible_cpu(), it is not strictly needed, and it is more a documentation in fact. Saves 418 bytes. Signed-off-by: Oleg Nesterov <oleg@tv-sign.ru> Cc: Srivatsa Vaddagiri <vatsa@in.ibm.com> Cc: "Pallipadi, Venkatesh" <venkatesh.pallipadi@intel.com> Cc: Gautham shenoy <ego@in.ibm.com> Signed-off-by: Andrew Morton <akpm@linux-foundation.org> Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
Diffstat (limited to 'kernel/workqueue.c')
-rw-r--r--kernel/workqueue.c430
1 files changed, 211 insertions, 219 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 398c34ff6a54..a981add58fb9 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -43,10 +43,11 @@ struct cpu_workqueue_struct {
43 43
44 struct list_head worklist; 44 struct list_head worklist;
45 wait_queue_head_t more_work; 45 wait_queue_head_t more_work;
46 struct work_struct *current_work;
46 47
47 struct workqueue_struct *wq; 48 struct workqueue_struct *wq;
48 struct task_struct *thread; 49 struct task_struct *thread;
49 struct work_struct *current_work; 50 int should_stop;
50 51
51 int run_depth; /* Detect run_workqueue() recursion depth */ 52 int run_depth; /* Detect run_workqueue() recursion depth */
52} ____cacheline_aligned; 53} ____cacheline_aligned;
@@ -64,11 +65,12 @@ struct workqueue_struct {
64 65
65/* All the per-cpu workqueues on the system, for hotplug cpu to add/remove 66/* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
66 threads to each one as cpus come/go. */ 67 threads to each one as cpus come/go. */
67static long migrate_sequence __read_mostly;
68static DEFINE_MUTEX(workqueue_mutex); 68static DEFINE_MUTEX(workqueue_mutex);
69static LIST_HEAD(workqueues); 69static LIST_HEAD(workqueues);
70 70
71static int singlethread_cpu; 71static int singlethread_cpu __read_mostly;
72/* optimization, we could use cpu_possible_map */
73static cpumask_t cpu_populated_map __read_mostly;
72 74
73/* If it's single threaded, it isn't in the list of workqueues. */ 75/* If it's single threaded, it isn't in the list of workqueues. */
74static inline int is_single_threaded(struct workqueue_struct *wq) 76static inline int is_single_threaded(struct workqueue_struct *wq)
@@ -344,10 +346,28 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq)
344 spin_unlock_irqrestore(&cwq->lock, flags); 346 spin_unlock_irqrestore(&cwq->lock, flags);
345} 347}
346 348
349/*
350 * NOTE: the caller must not touch *cwq if this func returns true
351 */
352static int cwq_should_stop(struct cpu_workqueue_struct *cwq)
353{
354 int should_stop = cwq->should_stop;
355
356 if (unlikely(should_stop)) {
357 spin_lock_irq(&cwq->lock);
358 should_stop = cwq->should_stop && list_empty(&cwq->worklist);
359 if (should_stop)
360 cwq->thread = NULL;
361 spin_unlock_irq(&cwq->lock);
362 }
363
364 return should_stop;
365}
366
347static int worker_thread(void *__cwq) 367static int worker_thread(void *__cwq)
348{ 368{
349 struct cpu_workqueue_struct *cwq = __cwq; 369 struct cpu_workqueue_struct *cwq = __cwq;
350 DECLARE_WAITQUEUE(wait, current); 370 DEFINE_WAIT(wait);
351 struct k_sigaction sa; 371 struct k_sigaction sa;
352 sigset_t blocked; 372 sigset_t blocked;
353 373
@@ -373,23 +393,21 @@ static int worker_thread(void *__cwq)
373 siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD)); 393 siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
374 do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0); 394 do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
375 395
376 set_current_state(TASK_INTERRUPTIBLE); 396 for (;;) {
377 while (!kthread_should_stop()) {
378 if (cwq->wq->freezeable) 397 if (cwq->wq->freezeable)
379 try_to_freeze(); 398 try_to_freeze();
380 399
381 add_wait_queue(&cwq->more_work, &wait); 400 prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
382 if (list_empty(&cwq->worklist)) 401 if (!cwq->should_stop && list_empty(&cwq->worklist))
383 schedule(); 402 schedule();
384 else 403 finish_wait(&cwq->more_work, &wait);
385 __set_current_state(TASK_RUNNING); 404
386 remove_wait_queue(&cwq->more_work, &wait); 405 if (cwq_should_stop(cwq))
406 break;
387 407
388 if (!list_empty(&cwq->worklist)) 408 run_workqueue(cwq);
389 run_workqueue(cwq);
390 set_current_state(TASK_INTERRUPTIBLE);
391 } 409 }
392 __set_current_state(TASK_RUNNING); 410
393 return 0; 411 return 0;
394} 412}
395 413
@@ -454,20 +472,13 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
454 */ 472 */
455void fastcall flush_workqueue(struct workqueue_struct *wq) 473void fastcall flush_workqueue(struct workqueue_struct *wq)
456{ 474{
457 if (is_single_threaded(wq)) { 475 if (is_single_threaded(wq))
458 /* Always use first cpu's area. */
459 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); 476 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
460 } else { 477 else {
461 long sequence;
462 int cpu; 478 int cpu;
463again:
464 sequence = migrate_sequence;
465 479
466 for_each_possible_cpu(cpu) 480 for_each_cpu_mask(cpu, cpu_populated_map)
467 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 481 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
468
469 if (unlikely(sequence != migrate_sequence))
470 goto again;
471 } 482 }
472} 483}
473EXPORT_SYMBOL_GPL(flush_workqueue); 484EXPORT_SYMBOL_GPL(flush_workqueue);
@@ -485,11 +496,8 @@ static void wait_on_work(struct cpu_workqueue_struct *cwq,
485 } 496 }
486 spin_unlock_irq(&cwq->lock); 497 spin_unlock_irq(&cwq->lock);
487 498
488 if (unlikely(running)) { 499 if (unlikely(running))
489 mutex_unlock(&workqueue_mutex);
490 wait_for_completion(&barr.done); 500 wait_for_completion(&barr.done);
491 mutex_lock(&workqueue_mutex);
492 }
493} 501}
494 502
495/** 503/**
@@ -510,155 +518,31 @@ void flush_work(struct workqueue_struct *wq, struct work_struct *work)
510{ 518{
511 struct cpu_workqueue_struct *cwq; 519 struct cpu_workqueue_struct *cwq;
512 520
513 mutex_lock(&workqueue_mutex);
514 cwq = get_wq_data(work); 521 cwq = get_wq_data(work);
515 /* Was it ever queued ? */ 522 /* Was it ever queued ? */
516 if (!cwq) 523 if (!cwq)
517 goto out; 524 return;
518 525
519 /* 526 /*
520 * This work can't be re-queued, and the lock above protects us 527 * This work can't be re-queued, no need to re-check that
521 * from take_over_work(), no need to re-check that get_wq_data() 528 * get_wq_data() is still the same when we take cwq->lock.
522 * is still the same when we take cwq->lock.
523 */ 529 */
524 spin_lock_irq(&cwq->lock); 530 spin_lock_irq(&cwq->lock);
525 list_del_init(&work->entry); 531 list_del_init(&work->entry);
526 work_release(work); 532 work_release(work);
527 spin_unlock_irq(&cwq->lock); 533 spin_unlock_irq(&cwq->lock);
528 534
529 if (is_single_threaded(wq)) { 535 if (is_single_threaded(wq))
530 /* Always use first cpu's area. */
531 wait_on_work(per_cpu_ptr(wq->cpu_wq, singlethread_cpu), work); 536 wait_on_work(per_cpu_ptr(wq->cpu_wq, singlethread_cpu), work);
532 } else { 537 else {
533 int cpu; 538 int cpu;
534 539
535 for_each_online_cpu(cpu) 540 for_each_cpu_mask(cpu, cpu_populated_map)
536 wait_on_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 541 wait_on_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
537 } 542 }
538out:
539 mutex_unlock(&workqueue_mutex);
540} 543}
541EXPORT_SYMBOL_GPL(flush_work); 544EXPORT_SYMBOL_GPL(flush_work);
542 545
543static void init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
544{
545 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
546
547 cwq->wq = wq;
548 spin_lock_init(&cwq->lock);
549 INIT_LIST_HEAD(&cwq->worklist);
550 init_waitqueue_head(&cwq->more_work);
551}
552
553static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
554 int cpu)
555{
556 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
557 struct task_struct *p;
558
559 if (is_single_threaded(wq))
560 p = kthread_create(worker_thread, cwq, "%s", wq->name);
561 else
562 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
563 if (IS_ERR(p))
564 return NULL;
565 cwq->thread = p;
566 return p;
567}
568
569struct workqueue_struct *__create_workqueue(const char *name,
570 int singlethread, int freezeable)
571{
572 int cpu, destroy = 0;
573 struct workqueue_struct *wq;
574 struct task_struct *p;
575
576 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
577 if (!wq)
578 return NULL;
579
580 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
581 if (!wq->cpu_wq) {
582 kfree(wq);
583 return NULL;
584 }
585
586 wq->name = name;
587 wq->freezeable = freezeable;
588
589 mutex_lock(&workqueue_mutex);
590 if (singlethread) {
591 INIT_LIST_HEAD(&wq->list);
592 init_cpu_workqueue(wq, singlethread_cpu);
593 p = create_workqueue_thread(wq, singlethread_cpu);
594 if (!p)
595 destroy = 1;
596 else
597 wake_up_process(p);
598 } else {
599 list_add(&wq->list, &workqueues);
600 for_each_possible_cpu(cpu) {
601 init_cpu_workqueue(wq, cpu);
602 if (!cpu_online(cpu))
603 continue;
604
605 p = create_workqueue_thread(wq, cpu);
606 if (p) {
607 kthread_bind(p, cpu);
608 wake_up_process(p);
609 } else
610 destroy = 1;
611 }
612 }
613 mutex_unlock(&workqueue_mutex);
614
615 /*
616 * Was there any error during startup? If yes then clean up:
617 */
618 if (destroy) {
619 destroy_workqueue(wq);
620 wq = NULL;
621 }
622 return wq;
623}
624EXPORT_SYMBOL_GPL(__create_workqueue);
625
626static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
627{
628 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
629
630 if (cwq->thread) {
631 kthread_stop(cwq->thread);
632 cwq->thread = NULL;
633 }
634}
635
636/**
637 * destroy_workqueue - safely terminate a workqueue
638 * @wq: target workqueue
639 *
640 * Safely destroy a workqueue. All work currently pending will be done first.
641 */
642void destroy_workqueue(struct workqueue_struct *wq)
643{
644 int cpu;
645
646 flush_workqueue(wq);
647
648 /* We don't need the distraction of CPUs appearing and vanishing. */
649 mutex_lock(&workqueue_mutex);
650 if (is_single_threaded(wq))
651 cleanup_workqueue_thread(wq, singlethread_cpu);
652 else {
653 for_each_online_cpu(cpu)
654 cleanup_workqueue_thread(wq, cpu);
655 list_del(&wq->list);
656 }
657 mutex_unlock(&workqueue_mutex);
658 free_percpu(wq->cpu_wq);
659 kfree(wq);
660}
661EXPORT_SYMBOL_GPL(destroy_workqueue);
662 546
663static struct workqueue_struct *keventd_wq; 547static struct workqueue_struct *keventd_wq;
664 548
@@ -822,85 +706,193 @@ int current_is_keventd(void)
822 706
823} 707}
824 708
825/* Take the work from this (downed) CPU. */ 709static struct cpu_workqueue_struct *
826static void take_over_work(struct workqueue_struct *wq, unsigned int cpu) 710init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
827{ 711{
828 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 712 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
829 struct list_head list;
830 struct work_struct *work;
831 713
832 spin_lock_irq(&cwq->lock); 714 cwq->wq = wq;
833 list_replace_init(&cwq->worklist, &list); 715 spin_lock_init(&cwq->lock);
834 migrate_sequence++; 716 INIT_LIST_HEAD(&cwq->worklist);
835 717 init_waitqueue_head(&cwq->more_work);
836 while (!list_empty(&list)) { 718
837 printk("Taking work for %s\n", wq->name); 719 return cwq;
838 work = list_entry(list.next,struct work_struct,entry);
839 list_del(&work->entry);
840 __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work);
841 }
842 spin_unlock_irq(&cwq->lock);
843} 720}
844 721
845/* We're holding the cpucontrol mutex here */ 722static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
846static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, 723{
847 unsigned long action, 724 struct workqueue_struct *wq = cwq->wq;
848 void *hcpu) 725 const char *fmt = is_single_threaded(wq) ? "%s" : "%s/%d";
726 struct task_struct *p;
727
728 p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
729 /*
730 * Nobody can add the work_struct to this cwq,
731 * if (caller is __create_workqueue)
732 * nobody should see this wq
733 * else // caller is CPU_UP_PREPARE
734 * cpu is not on cpu_online_map
735 * so we can abort safely.
736 */
737 if (IS_ERR(p))
738 return PTR_ERR(p);
739
740 cwq->thread = p;
741 cwq->should_stop = 0;
742 if (!is_single_threaded(wq))
743 kthread_bind(p, cpu);
744
745 if (is_single_threaded(wq) || cpu_online(cpu))
746 wake_up_process(p);
747
748 return 0;
749}
750
751struct workqueue_struct *__create_workqueue(const char *name,
752 int singlethread, int freezeable)
849{ 753{
850 unsigned int hotcpu = (unsigned long)hcpu;
851 struct workqueue_struct *wq; 754 struct workqueue_struct *wq;
755 struct cpu_workqueue_struct *cwq;
756 int err = 0, cpu;
852 757
853 switch (action) { 758 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
854 case CPU_UP_PREPARE: 759 if (!wq)
760 return NULL;
761
762 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
763 if (!wq->cpu_wq) {
764 kfree(wq);
765 return NULL;
766 }
767
768 wq->name = name;
769 wq->freezeable = freezeable;
770
771 if (singlethread) {
772 INIT_LIST_HEAD(&wq->list);
773 cwq = init_cpu_workqueue(wq, singlethread_cpu);
774 err = create_workqueue_thread(cwq, singlethread_cpu);
775 } else {
855 mutex_lock(&workqueue_mutex); 776 mutex_lock(&workqueue_mutex);
856 /* Create a new workqueue thread for it. */ 777 list_add(&wq->list, &workqueues);
857 list_for_each_entry(wq, &workqueues, list) { 778
858 if (!create_workqueue_thread(wq, hotcpu)) { 779 for_each_possible_cpu(cpu) {
859 printk("workqueue for %i failed\n", hotcpu); 780 cwq = init_cpu_workqueue(wq, cpu);
860 return NOTIFY_BAD; 781 if (err || !cpu_online(cpu))
861 } 782 continue;
783 err = create_workqueue_thread(cwq, cpu);
862 } 784 }
863 break; 785 mutex_unlock(&workqueue_mutex);
786 }
787
788 if (err) {
789 destroy_workqueue(wq);
790 wq = NULL;
791 }
792 return wq;
793}
794EXPORT_SYMBOL_GPL(__create_workqueue);
864 795
865 case CPU_ONLINE: 796static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
866 /* Kick off worker threads. */ 797{
867 list_for_each_entry(wq, &workqueues, list) { 798 struct wq_barrier barr;
868 struct cpu_workqueue_struct *cwq; 799 int alive = 0;
869 800
870 cwq = per_cpu_ptr(wq->cpu_wq, hotcpu); 801 spin_lock_irq(&cwq->lock);
871 kthread_bind(cwq->thread, hotcpu); 802 if (cwq->thread != NULL) {
872 wake_up_process(cwq->thread); 803 insert_wq_barrier(cwq, &barr, 1);
873 } 804 cwq->should_stop = 1;
805 alive = 1;
806 }
807 spin_unlock_irq(&cwq->lock);
808
809 if (alive) {
810 wait_for_completion(&barr.done);
811
812 while (unlikely(cwq->thread != NULL))
813 cpu_relax();
814 /*
815 * Wait until cwq->thread unlocks cwq->lock,
816 * it won't touch *cwq after that.
817 */
818 smp_rmb();
819 spin_unlock_wait(&cwq->lock);
820 }
821}
822
823/**
824 * destroy_workqueue - safely terminate a workqueue
825 * @wq: target workqueue
826 *
827 * Safely destroy a workqueue. All work currently pending will be done first.
828 */
829void destroy_workqueue(struct workqueue_struct *wq)
830{
831 struct cpu_workqueue_struct *cwq;
832
833 if (is_single_threaded(wq)) {
834 cwq = per_cpu_ptr(wq->cpu_wq, singlethread_cpu);
835 cleanup_workqueue_thread(cwq, singlethread_cpu);
836 } else {
837 int cpu;
838
839 mutex_lock(&workqueue_mutex);
840 list_del(&wq->list);
874 mutex_unlock(&workqueue_mutex); 841 mutex_unlock(&workqueue_mutex);
875 break;
876 842
877 case CPU_UP_CANCELED: 843 for_each_cpu_mask(cpu, cpu_populated_map) {
878 list_for_each_entry(wq, &workqueues, list) { 844 cwq = per_cpu_ptr(wq->cpu_wq, cpu);
879 if (!per_cpu_ptr(wq->cpu_wq, hotcpu)->thread) 845 cleanup_workqueue_thread(cwq, cpu);
880 continue;
881 /* Unbind so it can run. */
882 kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread,
883 any_online_cpu(cpu_online_map));
884 cleanup_workqueue_thread(wq, hotcpu);
885 } 846 }
886 mutex_unlock(&workqueue_mutex); 847 }
887 break;
888 848
889 case CPU_DOWN_PREPARE: 849 free_percpu(wq->cpu_wq);
850 kfree(wq);
851}
852EXPORT_SYMBOL_GPL(destroy_workqueue);
853
854static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
855 unsigned long action,
856 void *hcpu)
857{
858 unsigned int cpu = (unsigned long)hcpu;
859 struct cpu_workqueue_struct *cwq;
860 struct workqueue_struct *wq;
861
862 switch (action) {
863 case CPU_LOCK_ACQUIRE:
890 mutex_lock(&workqueue_mutex); 864 mutex_lock(&workqueue_mutex);
891 break; 865 return NOTIFY_OK;
892 866
893 case CPU_DOWN_FAILED: 867 case CPU_LOCK_RELEASE:
894 mutex_unlock(&workqueue_mutex); 868 mutex_unlock(&workqueue_mutex);
895 break; 869 return NOTIFY_OK;
896 870
897 case CPU_DEAD: 871 case CPU_UP_PREPARE:
898 list_for_each_entry(wq, &workqueues, list) 872 cpu_set(cpu, cpu_populated_map);
899 cleanup_workqueue_thread(wq, hotcpu); 873 }
900 list_for_each_entry(wq, &workqueues, list) 874
901 take_over_work(wq, hotcpu); 875 list_for_each_entry(wq, &workqueues, list) {
902 mutex_unlock(&workqueue_mutex); 876 cwq = per_cpu_ptr(wq->cpu_wq, cpu);
903 break; 877
878 switch (action) {
879 case CPU_UP_PREPARE:
880 if (!create_workqueue_thread(cwq, cpu))
881 break;
882 printk(KERN_ERR "workqueue for %i failed\n", cpu);
883 return NOTIFY_BAD;
884
885 case CPU_ONLINE:
886 wake_up_process(cwq->thread);
887 break;
888
889 case CPU_UP_CANCELED:
890 if (cwq->thread)
891 wake_up_process(cwq->thread);
892 case CPU_DEAD:
893 cleanup_workqueue_thread(cwq, cpu);
894 break;
895 }
904 } 896 }
905 897
906 return NOTIFY_OK; 898 return NOTIFY_OK;
@@ -908,9 +900,9 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
908 900
909void init_workqueues(void) 901void init_workqueues(void)
910{ 902{
903 cpu_populated_map = cpu_online_map;
911 singlethread_cpu = first_cpu(cpu_possible_map); 904 singlethread_cpu = first_cpu(cpu_possible_map);
912 hotcpu_notifier(workqueue_cpu_callback, 0); 905 hotcpu_notifier(workqueue_cpu_callback, 0);
913 keventd_wq = create_workqueue("events"); 906 keventd_wq = create_workqueue("events");
914 BUG_ON(!keventd_wq); 907 BUG_ON(!keventd_wq);
915} 908}
916