diff options
author | Oleg Nesterov <oleg@tv-sign.ru> | 2007-05-09 05:34:09 -0400 |
---|---|---|
committer | Linus Torvalds <torvalds@woody.linux-foundation.org> | 2007-05-09 15:30:52 -0400 |
commit | 3af24433efac62f451bfdb1cf1edb7181fb73645 (patch) | |
tree | 330353b50a88615ef6e99440e8412667ae0a855e /kernel/workqueue.c | |
parent | 36aa9dfc39bf473780439f5629c30f59d677e793 (diff) |
workqueue: don't migrate pending works from the dead CPU
Currently CPU_DEAD uses kthread_stop() to stop cwq->thread and then
transfers cwq->worklist to another CPU. However, it is very unlikely that
worker_thread() will notice kthread_should_stop() before flushing
cwq->worklist. It is only possible if worker_thread() was preempted after
run_workqueue(cwq), a new work_struct was added, and CPU_DEAD happened
before cwq->thread has a chance to run.
This means that take_over_work() mostly adds unneeded complications. Note
also that kthread_stop() is not good per se, wake_up_process() may confuse
work->func() if it sleeps waiting for some event.
Remove take_over_work() and migrate_sequence complications. CPU_DEAD sets
the cwq->should_stop flag (introduced by this patch) and waits for
cwq->thread to flush cwq->worklist and exit. Because the dead CPU is not
on cpu_online_map, no more works can be added to that cwq.
cpu_populated_map was introduced to optimize for_each_possible_cpu(), it is
not strictly needed, and it is more a documentation in fact.
Saves 418 bytes.
Signed-off-by: Oleg Nesterov <oleg@tv-sign.ru>
Cc: Srivatsa Vaddagiri <vatsa@in.ibm.com>
Cc: "Pallipadi, Venkatesh" <venkatesh.pallipadi@intel.com>
Cc: Gautham shenoy <ego@in.ibm.com>
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
Diffstat (limited to 'kernel/workqueue.c')
-rw-r--r-- | kernel/workqueue.c | 430 |
1 files changed, 211 insertions, 219 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c index 398c34ff6a54..a981add58fb9 100644 --- a/kernel/workqueue.c +++ b/kernel/workqueue.c | |||
@@ -43,10 +43,11 @@ struct cpu_workqueue_struct { | |||
43 | 43 | ||
44 | struct list_head worklist; | 44 | struct list_head worklist; |
45 | wait_queue_head_t more_work; | 45 | wait_queue_head_t more_work; |
46 | struct work_struct *current_work; | ||
46 | 47 | ||
47 | struct workqueue_struct *wq; | 48 | struct workqueue_struct *wq; |
48 | struct task_struct *thread; | 49 | struct task_struct *thread; |
49 | struct work_struct *current_work; | 50 | int should_stop; |
50 | 51 | ||
51 | int run_depth; /* Detect run_workqueue() recursion depth */ | 52 | int run_depth; /* Detect run_workqueue() recursion depth */ |
52 | } ____cacheline_aligned; | 53 | } ____cacheline_aligned; |
@@ -64,11 +65,12 @@ struct workqueue_struct { | |||
64 | 65 | ||
65 | /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove | 66 | /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove |
66 | threads to each one as cpus come/go. */ | 67 | threads to each one as cpus come/go. */ |
67 | static long migrate_sequence __read_mostly; | ||
68 | static DEFINE_MUTEX(workqueue_mutex); | 68 | static DEFINE_MUTEX(workqueue_mutex); |
69 | static LIST_HEAD(workqueues); | 69 | static LIST_HEAD(workqueues); |
70 | 70 | ||
71 | static int singlethread_cpu; | 71 | static int singlethread_cpu __read_mostly; |
72 | /* optimization, we could use cpu_possible_map */ | ||
73 | static cpumask_t cpu_populated_map __read_mostly; | ||
72 | 74 | ||
73 | /* If it's single threaded, it isn't in the list of workqueues. */ | 75 | /* If it's single threaded, it isn't in the list of workqueues. */ |
74 | static inline int is_single_threaded(struct workqueue_struct *wq) | 76 | static inline int is_single_threaded(struct workqueue_struct *wq) |
@@ -344,10 +346,28 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq) | |||
344 | spin_unlock_irqrestore(&cwq->lock, flags); | 346 | spin_unlock_irqrestore(&cwq->lock, flags); |
345 | } | 347 | } |
346 | 348 | ||
349 | /* | ||
350 | * NOTE: the caller must not touch *cwq if this func returns true | ||
351 | */ | ||
352 | static int cwq_should_stop(struct cpu_workqueue_struct *cwq) | ||
353 | { | ||
354 | int should_stop = cwq->should_stop; | ||
355 | |||
356 | if (unlikely(should_stop)) { | ||
357 | spin_lock_irq(&cwq->lock); | ||
358 | should_stop = cwq->should_stop && list_empty(&cwq->worklist); | ||
359 | if (should_stop) | ||
360 | cwq->thread = NULL; | ||
361 | spin_unlock_irq(&cwq->lock); | ||
362 | } | ||
363 | |||
364 | return should_stop; | ||
365 | } | ||
366 | |||
347 | static int worker_thread(void *__cwq) | 367 | static int worker_thread(void *__cwq) |
348 | { | 368 | { |
349 | struct cpu_workqueue_struct *cwq = __cwq; | 369 | struct cpu_workqueue_struct *cwq = __cwq; |
350 | DECLARE_WAITQUEUE(wait, current); | 370 | DEFINE_WAIT(wait); |
351 | struct k_sigaction sa; | 371 | struct k_sigaction sa; |
352 | sigset_t blocked; | 372 | sigset_t blocked; |
353 | 373 | ||
@@ -373,23 +393,21 @@ static int worker_thread(void *__cwq) | |||
373 | siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD)); | 393 | siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD)); |
374 | do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0); | 394 | do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0); |
375 | 395 | ||
376 | set_current_state(TASK_INTERRUPTIBLE); | 396 | for (;;) { |
377 | while (!kthread_should_stop()) { | ||
378 | if (cwq->wq->freezeable) | 397 | if (cwq->wq->freezeable) |
379 | try_to_freeze(); | 398 | try_to_freeze(); |
380 | 399 | ||
381 | add_wait_queue(&cwq->more_work, &wait); | 400 | prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); |
382 | if (list_empty(&cwq->worklist)) | 401 | if (!cwq->should_stop && list_empty(&cwq->worklist)) |
383 | schedule(); | 402 | schedule(); |
384 | else | 403 | finish_wait(&cwq->more_work, &wait); |
385 | __set_current_state(TASK_RUNNING); | 404 | |
386 | remove_wait_queue(&cwq->more_work, &wait); | 405 | if (cwq_should_stop(cwq)) |
406 | break; | ||
387 | 407 | ||
388 | if (!list_empty(&cwq->worklist)) | 408 | run_workqueue(cwq); |
389 | run_workqueue(cwq); | ||
390 | set_current_state(TASK_INTERRUPTIBLE); | ||
391 | } | 409 | } |
392 | __set_current_state(TASK_RUNNING); | 410 | |
393 | return 0; | 411 | return 0; |
394 | } | 412 | } |
395 | 413 | ||
@@ -454,20 +472,13 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) | |||
454 | */ | 472 | */ |
455 | void fastcall flush_workqueue(struct workqueue_struct *wq) | 473 | void fastcall flush_workqueue(struct workqueue_struct *wq) |
456 | { | 474 | { |
457 | if (is_single_threaded(wq)) { | 475 | if (is_single_threaded(wq)) |
458 | /* Always use first cpu's area. */ | ||
459 | flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); | 476 | flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); |
460 | } else { | 477 | else { |
461 | long sequence; | ||
462 | int cpu; | 478 | int cpu; |
463 | again: | ||
464 | sequence = migrate_sequence; | ||
465 | 479 | ||
466 | for_each_possible_cpu(cpu) | 480 | for_each_cpu_mask(cpu, cpu_populated_map) |
467 | flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); | 481 | flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); |
468 | |||
469 | if (unlikely(sequence != migrate_sequence)) | ||
470 | goto again; | ||
471 | } | 482 | } |
472 | } | 483 | } |
473 | EXPORT_SYMBOL_GPL(flush_workqueue); | 484 | EXPORT_SYMBOL_GPL(flush_workqueue); |
@@ -485,11 +496,8 @@ static void wait_on_work(struct cpu_workqueue_struct *cwq, | |||
485 | } | 496 | } |
486 | spin_unlock_irq(&cwq->lock); | 497 | spin_unlock_irq(&cwq->lock); |
487 | 498 | ||
488 | if (unlikely(running)) { | 499 | if (unlikely(running)) |
489 | mutex_unlock(&workqueue_mutex); | ||
490 | wait_for_completion(&barr.done); | 500 | wait_for_completion(&barr.done); |
491 | mutex_lock(&workqueue_mutex); | ||
492 | } | ||
493 | } | 501 | } |
494 | 502 | ||
495 | /** | 503 | /** |
@@ -510,155 +518,31 @@ void flush_work(struct workqueue_struct *wq, struct work_struct *work) | |||
510 | { | 518 | { |
511 | struct cpu_workqueue_struct *cwq; | 519 | struct cpu_workqueue_struct *cwq; |
512 | 520 | ||
513 | mutex_lock(&workqueue_mutex); | ||
514 | cwq = get_wq_data(work); | 521 | cwq = get_wq_data(work); |
515 | /* Was it ever queued ? */ | 522 | /* Was it ever queued ? */ |
516 | if (!cwq) | 523 | if (!cwq) |
517 | goto out; | 524 | return; |
518 | 525 | ||
519 | /* | 526 | /* |
520 | * This work can't be re-queued, and the lock above protects us | 527 | * This work can't be re-queued, no need to re-check that |
521 | * from take_over_work(), no need to re-check that get_wq_data() | 528 | * get_wq_data() is still the same when we take cwq->lock. |
522 | * is still the same when we take cwq->lock. | ||
523 | */ | 529 | */ |
524 | spin_lock_irq(&cwq->lock); | 530 | spin_lock_irq(&cwq->lock); |
525 | list_del_init(&work->entry); | 531 | list_del_init(&work->entry); |
526 | work_release(work); | 532 | work_release(work); |
527 | spin_unlock_irq(&cwq->lock); | 533 | spin_unlock_irq(&cwq->lock); |
528 | 534 | ||
529 | if (is_single_threaded(wq)) { | 535 | if (is_single_threaded(wq)) |
530 | /* Always use first cpu's area. */ | ||
531 | wait_on_work(per_cpu_ptr(wq->cpu_wq, singlethread_cpu), work); | 536 | wait_on_work(per_cpu_ptr(wq->cpu_wq, singlethread_cpu), work); |
532 | } else { | 537 | else { |
533 | int cpu; | 538 | int cpu; |
534 | 539 | ||
535 | for_each_online_cpu(cpu) | 540 | for_each_cpu_mask(cpu, cpu_populated_map) |
536 | wait_on_work(per_cpu_ptr(wq->cpu_wq, cpu), work); | 541 | wait_on_work(per_cpu_ptr(wq->cpu_wq, cpu), work); |
537 | } | 542 | } |
538 | out: | ||
539 | mutex_unlock(&workqueue_mutex); | ||
540 | } | 543 | } |
541 | EXPORT_SYMBOL_GPL(flush_work); | 544 | EXPORT_SYMBOL_GPL(flush_work); |
542 | 545 | ||
543 | static void init_cpu_workqueue(struct workqueue_struct *wq, int cpu) | ||
544 | { | ||
545 | struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); | ||
546 | |||
547 | cwq->wq = wq; | ||
548 | spin_lock_init(&cwq->lock); | ||
549 | INIT_LIST_HEAD(&cwq->worklist); | ||
550 | init_waitqueue_head(&cwq->more_work); | ||
551 | } | ||
552 | |||
553 | static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, | ||
554 | int cpu) | ||
555 | { | ||
556 | struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); | ||
557 | struct task_struct *p; | ||
558 | |||
559 | if (is_single_threaded(wq)) | ||
560 | p = kthread_create(worker_thread, cwq, "%s", wq->name); | ||
561 | else | ||
562 | p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu); | ||
563 | if (IS_ERR(p)) | ||
564 | return NULL; | ||
565 | cwq->thread = p; | ||
566 | return p; | ||
567 | } | ||
568 | |||
569 | struct workqueue_struct *__create_workqueue(const char *name, | ||
570 | int singlethread, int freezeable) | ||
571 | { | ||
572 | int cpu, destroy = 0; | ||
573 | struct workqueue_struct *wq; | ||
574 | struct task_struct *p; | ||
575 | |||
576 | wq = kzalloc(sizeof(*wq), GFP_KERNEL); | ||
577 | if (!wq) | ||
578 | return NULL; | ||
579 | |||
580 | wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); | ||
581 | if (!wq->cpu_wq) { | ||
582 | kfree(wq); | ||
583 | return NULL; | ||
584 | } | ||
585 | |||
586 | wq->name = name; | ||
587 | wq->freezeable = freezeable; | ||
588 | |||
589 | mutex_lock(&workqueue_mutex); | ||
590 | if (singlethread) { | ||
591 | INIT_LIST_HEAD(&wq->list); | ||
592 | init_cpu_workqueue(wq, singlethread_cpu); | ||
593 | p = create_workqueue_thread(wq, singlethread_cpu); | ||
594 | if (!p) | ||
595 | destroy = 1; | ||
596 | else | ||
597 | wake_up_process(p); | ||
598 | } else { | ||
599 | list_add(&wq->list, &workqueues); | ||
600 | for_each_possible_cpu(cpu) { | ||
601 | init_cpu_workqueue(wq, cpu); | ||
602 | if (!cpu_online(cpu)) | ||
603 | continue; | ||
604 | |||
605 | p = create_workqueue_thread(wq, cpu); | ||
606 | if (p) { | ||
607 | kthread_bind(p, cpu); | ||
608 | wake_up_process(p); | ||
609 | } else | ||
610 | destroy = 1; | ||
611 | } | ||
612 | } | ||
613 | mutex_unlock(&workqueue_mutex); | ||
614 | |||
615 | /* | ||
616 | * Was there any error during startup? If yes then clean up: | ||
617 | */ | ||
618 | if (destroy) { | ||
619 | destroy_workqueue(wq); | ||
620 | wq = NULL; | ||
621 | } | ||
622 | return wq; | ||
623 | } | ||
624 | EXPORT_SYMBOL_GPL(__create_workqueue); | ||
625 | |||
626 | static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu) | ||
627 | { | ||
628 | struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); | ||
629 | |||
630 | if (cwq->thread) { | ||
631 | kthread_stop(cwq->thread); | ||
632 | cwq->thread = NULL; | ||
633 | } | ||
634 | } | ||
635 | |||
636 | /** | ||
637 | * destroy_workqueue - safely terminate a workqueue | ||
638 | * @wq: target workqueue | ||
639 | * | ||
640 | * Safely destroy a workqueue. All work currently pending will be done first. | ||
641 | */ | ||
642 | void destroy_workqueue(struct workqueue_struct *wq) | ||
643 | { | ||
644 | int cpu; | ||
645 | |||
646 | flush_workqueue(wq); | ||
647 | |||
648 | /* We don't need the distraction of CPUs appearing and vanishing. */ | ||
649 | mutex_lock(&workqueue_mutex); | ||
650 | if (is_single_threaded(wq)) | ||
651 | cleanup_workqueue_thread(wq, singlethread_cpu); | ||
652 | else { | ||
653 | for_each_online_cpu(cpu) | ||
654 | cleanup_workqueue_thread(wq, cpu); | ||
655 | list_del(&wq->list); | ||
656 | } | ||
657 | mutex_unlock(&workqueue_mutex); | ||
658 | free_percpu(wq->cpu_wq); | ||
659 | kfree(wq); | ||
660 | } | ||
661 | EXPORT_SYMBOL_GPL(destroy_workqueue); | ||
662 | 546 | ||
663 | static struct workqueue_struct *keventd_wq; | 547 | static struct workqueue_struct *keventd_wq; |
664 | 548 | ||
@@ -822,85 +706,193 @@ int current_is_keventd(void) | |||
822 | 706 | ||
823 | } | 707 | } |
824 | 708 | ||
825 | /* Take the work from this (downed) CPU. */ | 709 | static struct cpu_workqueue_struct * |
826 | static void take_over_work(struct workqueue_struct *wq, unsigned int cpu) | 710 | init_cpu_workqueue(struct workqueue_struct *wq, int cpu) |
827 | { | 711 | { |
828 | struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); | 712 | struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); |
829 | struct list_head list; | ||
830 | struct work_struct *work; | ||
831 | 713 | ||
832 | spin_lock_irq(&cwq->lock); | 714 | cwq->wq = wq; |
833 | list_replace_init(&cwq->worklist, &list); | 715 | spin_lock_init(&cwq->lock); |
834 | migrate_sequence++; | 716 | INIT_LIST_HEAD(&cwq->worklist); |
835 | 717 | init_waitqueue_head(&cwq->more_work); | |
836 | while (!list_empty(&list)) { | 718 | |
837 | printk("Taking work for %s\n", wq->name); | 719 | return cwq; |
838 | work = list_entry(list.next,struct work_struct,entry); | ||
839 | list_del(&work->entry); | ||
840 | __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work); | ||
841 | } | ||
842 | spin_unlock_irq(&cwq->lock); | ||
843 | } | 720 | } |
844 | 721 | ||
845 | /* We're holding the cpucontrol mutex here */ | 722 | static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) |
846 | static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, | 723 | { |
847 | unsigned long action, | 724 | struct workqueue_struct *wq = cwq->wq; |
848 | void *hcpu) | 725 | const char *fmt = is_single_threaded(wq) ? "%s" : "%s/%d"; |
726 | struct task_struct *p; | ||
727 | |||
728 | p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu); | ||
729 | /* | ||
730 | * Nobody can add the work_struct to this cwq, | ||
731 | * if (caller is __create_workqueue) | ||
732 | * nobody should see this wq | ||
733 | * else // caller is CPU_UP_PREPARE | ||
734 | * cpu is not on cpu_online_map | ||
735 | * so we can abort safely. | ||
736 | */ | ||
737 | if (IS_ERR(p)) | ||
738 | return PTR_ERR(p); | ||
739 | |||
740 | cwq->thread = p; | ||
741 | cwq->should_stop = 0; | ||
742 | if (!is_single_threaded(wq)) | ||
743 | kthread_bind(p, cpu); | ||
744 | |||
745 | if (is_single_threaded(wq) || cpu_online(cpu)) | ||
746 | wake_up_process(p); | ||
747 | |||
748 | return 0; | ||
749 | } | ||
750 | |||
751 | struct workqueue_struct *__create_workqueue(const char *name, | ||
752 | int singlethread, int freezeable) | ||
849 | { | 753 | { |
850 | unsigned int hotcpu = (unsigned long)hcpu; | ||
851 | struct workqueue_struct *wq; | 754 | struct workqueue_struct *wq; |
755 | struct cpu_workqueue_struct *cwq; | ||
756 | int err = 0, cpu; | ||
852 | 757 | ||
853 | switch (action) { | 758 | wq = kzalloc(sizeof(*wq), GFP_KERNEL); |
854 | case CPU_UP_PREPARE: | 759 | if (!wq) |
760 | return NULL; | ||
761 | |||
762 | wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); | ||
763 | if (!wq->cpu_wq) { | ||
764 | kfree(wq); | ||
765 | return NULL; | ||
766 | } | ||
767 | |||
768 | wq->name = name; | ||
769 | wq->freezeable = freezeable; | ||
770 | |||
771 | if (singlethread) { | ||
772 | INIT_LIST_HEAD(&wq->list); | ||
773 | cwq = init_cpu_workqueue(wq, singlethread_cpu); | ||
774 | err = create_workqueue_thread(cwq, singlethread_cpu); | ||
775 | } else { | ||
855 | mutex_lock(&workqueue_mutex); | 776 | mutex_lock(&workqueue_mutex); |
856 | /* Create a new workqueue thread for it. */ | 777 | list_add(&wq->list, &workqueues); |
857 | list_for_each_entry(wq, &workqueues, list) { | 778 | |
858 | if (!create_workqueue_thread(wq, hotcpu)) { | 779 | for_each_possible_cpu(cpu) { |
859 | printk("workqueue for %i failed\n", hotcpu); | 780 | cwq = init_cpu_workqueue(wq, cpu); |
860 | return NOTIFY_BAD; | 781 | if (err || !cpu_online(cpu)) |
861 | } | 782 | continue; |
783 | err = create_workqueue_thread(cwq, cpu); | ||
862 | } | 784 | } |
863 | break; | 785 | mutex_unlock(&workqueue_mutex); |
786 | } | ||
787 | |||
788 | if (err) { | ||
789 | destroy_workqueue(wq); | ||
790 | wq = NULL; | ||
791 | } | ||
792 | return wq; | ||
793 | } | ||
794 | EXPORT_SYMBOL_GPL(__create_workqueue); | ||
864 | 795 | ||
865 | case CPU_ONLINE: | 796 | static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) |
866 | /* Kick off worker threads. */ | 797 | { |
867 | list_for_each_entry(wq, &workqueues, list) { | 798 | struct wq_barrier barr; |
868 | struct cpu_workqueue_struct *cwq; | 799 | int alive = 0; |
869 | 800 | ||
870 | cwq = per_cpu_ptr(wq->cpu_wq, hotcpu); | 801 | spin_lock_irq(&cwq->lock); |
871 | kthread_bind(cwq->thread, hotcpu); | 802 | if (cwq->thread != NULL) { |
872 | wake_up_process(cwq->thread); | 803 | insert_wq_barrier(cwq, &barr, 1); |
873 | } | 804 | cwq->should_stop = 1; |
805 | alive = 1; | ||
806 | } | ||
807 | spin_unlock_irq(&cwq->lock); | ||
808 | |||
809 | if (alive) { | ||
810 | wait_for_completion(&barr.done); | ||
811 | |||
812 | while (unlikely(cwq->thread != NULL)) | ||
813 | cpu_relax(); | ||
814 | /* | ||
815 | * Wait until cwq->thread unlocks cwq->lock, | ||
816 | * it won't touch *cwq after that. | ||
817 | */ | ||
818 | smp_rmb(); | ||
819 | spin_unlock_wait(&cwq->lock); | ||
820 | } | ||
821 | } | ||
822 | |||
823 | /** | ||
824 | * destroy_workqueue - safely terminate a workqueue | ||
825 | * @wq: target workqueue | ||
826 | * | ||
827 | * Safely destroy a workqueue. All work currently pending will be done first. | ||
828 | */ | ||
829 | void destroy_workqueue(struct workqueue_struct *wq) | ||
830 | { | ||
831 | struct cpu_workqueue_struct *cwq; | ||
832 | |||
833 | if (is_single_threaded(wq)) { | ||
834 | cwq = per_cpu_ptr(wq->cpu_wq, singlethread_cpu); | ||
835 | cleanup_workqueue_thread(cwq, singlethread_cpu); | ||
836 | } else { | ||
837 | int cpu; | ||
838 | |||
839 | mutex_lock(&workqueue_mutex); | ||
840 | list_del(&wq->list); | ||
874 | mutex_unlock(&workqueue_mutex); | 841 | mutex_unlock(&workqueue_mutex); |
875 | break; | ||
876 | 842 | ||
877 | case CPU_UP_CANCELED: | 843 | for_each_cpu_mask(cpu, cpu_populated_map) { |
878 | list_for_each_entry(wq, &workqueues, list) { | 844 | cwq = per_cpu_ptr(wq->cpu_wq, cpu); |
879 | if (!per_cpu_ptr(wq->cpu_wq, hotcpu)->thread) | 845 | cleanup_workqueue_thread(cwq, cpu); |
880 | continue; | ||
881 | /* Unbind so it can run. */ | ||
882 | kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread, | ||
883 | any_online_cpu(cpu_online_map)); | ||
884 | cleanup_workqueue_thread(wq, hotcpu); | ||
885 | } | 846 | } |
886 | mutex_unlock(&workqueue_mutex); | 847 | } |
887 | break; | ||
888 | 848 | ||
889 | case CPU_DOWN_PREPARE: | 849 | free_percpu(wq->cpu_wq); |
850 | kfree(wq); | ||
851 | } | ||
852 | EXPORT_SYMBOL_GPL(destroy_workqueue); | ||
853 | |||
854 | static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, | ||
855 | unsigned long action, | ||
856 | void *hcpu) | ||
857 | { | ||
858 | unsigned int cpu = (unsigned long)hcpu; | ||
859 | struct cpu_workqueue_struct *cwq; | ||
860 | struct workqueue_struct *wq; | ||
861 | |||
862 | switch (action) { | ||
863 | case CPU_LOCK_ACQUIRE: | ||
890 | mutex_lock(&workqueue_mutex); | 864 | mutex_lock(&workqueue_mutex); |
891 | break; | 865 | return NOTIFY_OK; |
892 | 866 | ||
893 | case CPU_DOWN_FAILED: | 867 | case CPU_LOCK_RELEASE: |
894 | mutex_unlock(&workqueue_mutex); | 868 | mutex_unlock(&workqueue_mutex); |
895 | break; | 869 | return NOTIFY_OK; |
896 | 870 | ||
897 | case CPU_DEAD: | 871 | case CPU_UP_PREPARE: |
898 | list_for_each_entry(wq, &workqueues, list) | 872 | cpu_set(cpu, cpu_populated_map); |
899 | cleanup_workqueue_thread(wq, hotcpu); | 873 | } |
900 | list_for_each_entry(wq, &workqueues, list) | 874 | |
901 | take_over_work(wq, hotcpu); | 875 | list_for_each_entry(wq, &workqueues, list) { |
902 | mutex_unlock(&workqueue_mutex); | 876 | cwq = per_cpu_ptr(wq->cpu_wq, cpu); |
903 | break; | 877 | |
878 | switch (action) { | ||
879 | case CPU_UP_PREPARE: | ||
880 | if (!create_workqueue_thread(cwq, cpu)) | ||
881 | break; | ||
882 | printk(KERN_ERR "workqueue for %i failed\n", cpu); | ||
883 | return NOTIFY_BAD; | ||
884 | |||
885 | case CPU_ONLINE: | ||
886 | wake_up_process(cwq->thread); | ||
887 | break; | ||
888 | |||
889 | case CPU_UP_CANCELED: | ||
890 | if (cwq->thread) | ||
891 | wake_up_process(cwq->thread); | ||
892 | case CPU_DEAD: | ||
893 | cleanup_workqueue_thread(cwq, cpu); | ||
894 | break; | ||
895 | } | ||
904 | } | 896 | } |
905 | 897 | ||
906 | return NOTIFY_OK; | 898 | return NOTIFY_OK; |
@@ -908,9 +900,9 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, | |||
908 | 900 | ||
909 | void init_workqueues(void) | 901 | void init_workqueues(void) |
910 | { | 902 | { |
903 | cpu_populated_map = cpu_online_map; | ||
911 | singlethread_cpu = first_cpu(cpu_possible_map); | 904 | singlethread_cpu = first_cpu(cpu_possible_map); |
912 | hotcpu_notifier(workqueue_cpu_callback, 0); | 905 | hotcpu_notifier(workqueue_cpu_callback, 0); |
913 | keventd_wq = create_workqueue("events"); | 906 | keventd_wq = create_workqueue("events"); |
914 | BUG_ON(!keventd_wq); | 907 | BUG_ON(!keventd_wq); |
915 | } | 908 | } |
916 | |||