aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Documentation/workqueue.txt2
-rw-r--r--include/linux/workqueue.h31
-rw-r--r--kernel/workqueue.c484
3 files changed, 353 insertions, 164 deletions
diff --git a/Documentation/workqueue.txt b/Documentation/workqueue.txt
index f81a65b54c29..5e0e05c5183e 100644
--- a/Documentation/workqueue.txt
+++ b/Documentation/workqueue.txt
@@ -365,7 +365,7 @@ root 5674 0.0 0.0 0 0 ? S 12:13 0:00 [kworker/1:0]
365If kworkers are going crazy (using too much cpu), there are two types 365If kworkers are going crazy (using too much cpu), there are two types
366of possible problems: 366of possible problems:
367 367
368 1. Something beeing scheduled in rapid succession 368 1. Something being scheduled in rapid succession
369 2. A single work item that consumes lots of cpu cycles 369 2. A single work item that consumes lots of cpu cycles
370 370
371The first one can be tracked using tracing: 371The first one can be tracked using tracing:
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index deee212af8e0..738b30b39b68 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -424,6 +424,7 @@ struct workqueue_attrs *alloc_workqueue_attrs(gfp_t gfp_mask);
424void free_workqueue_attrs(struct workqueue_attrs *attrs); 424void free_workqueue_attrs(struct workqueue_attrs *attrs);
425int apply_workqueue_attrs(struct workqueue_struct *wq, 425int apply_workqueue_attrs(struct workqueue_struct *wq,
426 const struct workqueue_attrs *attrs); 426 const struct workqueue_attrs *attrs);
427int workqueue_set_unbound_cpumask(cpumask_var_t cpumask);
427 428
428extern bool queue_work_on(int cpu, struct workqueue_struct *wq, 429extern bool queue_work_on(int cpu, struct workqueue_struct *wq,
429 struct work_struct *work); 430 struct work_struct *work);
@@ -434,7 +435,6 @@ extern bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
434 435
435extern void flush_workqueue(struct workqueue_struct *wq); 436extern void flush_workqueue(struct workqueue_struct *wq);
436extern void drain_workqueue(struct workqueue_struct *wq); 437extern void drain_workqueue(struct workqueue_struct *wq);
437extern void flush_scheduled_work(void);
438 438
439extern int schedule_on_each_cpu(work_func_t func); 439extern int schedule_on_each_cpu(work_func_t func);
440 440
@@ -531,6 +531,35 @@ static inline bool schedule_work(struct work_struct *work)
531} 531}
532 532
533/** 533/**
534 * flush_scheduled_work - ensure that any scheduled work has run to completion.
535 *
536 * Forces execution of the kernel-global workqueue and blocks until its
537 * completion.
538 *
539 * Think twice before calling this function! It's very easy to get into
540 * trouble if you don't take great care. Either of the following situations
541 * will lead to deadlock:
542 *
543 * One of the work items currently on the workqueue needs to acquire
544 * a lock held by your code or its caller.
545 *
546 * Your code is running in the context of a work routine.
547 *
548 * They will be detected by lockdep when they occur, but the first might not
549 * occur very often. It depends on what work items are on the workqueue and
550 * what locks they need, which you have no control over.
551 *
552 * In most situations flushing the entire workqueue is overkill; you merely
553 * need to know that a particular work item isn't queued and isn't running.
554 * In such cases you should use cancel_delayed_work_sync() or
555 * cancel_work_sync() instead.
556 */
557static inline void flush_scheduled_work(void)
558{
559 flush_workqueue(system_wq);
560}
561
562/**
534 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay 563 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
535 * @cpu: cpu to use 564 * @cpu: cpu to use
536 * @dwork: job to be done 565 * @dwork: job to be done
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 586ad91300b0..5243d4b03087 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -127,6 +127,11 @@ enum {
127 * 127 *
128 * PR: wq_pool_mutex protected for writes. Sched-RCU protected for reads. 128 * PR: wq_pool_mutex protected for writes. Sched-RCU protected for reads.
129 * 129 *
130 * PW: wq_pool_mutex and wq->mutex protected for writes. Either for reads.
131 *
132 * PWR: wq_pool_mutex and wq->mutex protected for writes. Either or
133 * sched-RCU for reads.
134 *
130 * WQ: wq->mutex protected. 135 * WQ: wq->mutex protected.
131 * 136 *
132 * WR: wq->mutex protected for writes. Sched-RCU protected for reads. 137 * WR: wq->mutex protected for writes. Sched-RCU protected for reads.
@@ -247,8 +252,8 @@ struct workqueue_struct {
247 int nr_drainers; /* WQ: drain in progress */ 252 int nr_drainers; /* WQ: drain in progress */
248 int saved_max_active; /* WQ: saved pwq max_active */ 253 int saved_max_active; /* WQ: saved pwq max_active */
249 254
250 struct workqueue_attrs *unbound_attrs; /* WQ: only for unbound wqs */ 255 struct workqueue_attrs *unbound_attrs; /* PW: only for unbound wqs */
251 struct pool_workqueue *dfl_pwq; /* WQ: only for unbound wqs */ 256 struct pool_workqueue *dfl_pwq; /* PW: only for unbound wqs */
252 257
253#ifdef CONFIG_SYSFS 258#ifdef CONFIG_SYSFS
254 struct wq_device *wq_dev; /* I: for sysfs interface */ 259 struct wq_device *wq_dev; /* I: for sysfs interface */
@@ -268,7 +273,7 @@ struct workqueue_struct {
268 /* hot fields used during command issue, aligned to cacheline */ 273 /* hot fields used during command issue, aligned to cacheline */
269 unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */ 274 unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */
270 struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */ 275 struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */
271 struct pool_workqueue __rcu *numa_pwq_tbl[]; /* FR: unbound pwqs indexed by node */ 276 struct pool_workqueue __rcu *numa_pwq_tbl[]; /* PWR: unbound pwqs indexed by node */
272}; 277};
273 278
274static struct kmem_cache *pwq_cache; 279static struct kmem_cache *pwq_cache;
@@ -299,6 +304,8 @@ static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */
299static LIST_HEAD(workqueues); /* PR: list of all workqueues */ 304static LIST_HEAD(workqueues); /* PR: list of all workqueues */
300static bool workqueue_freezing; /* PL: have wqs started freezing? */ 305static bool workqueue_freezing; /* PL: have wqs started freezing? */
301 306
307static cpumask_var_t wq_unbound_cpumask; /* PL: low level cpumask for all unbound wqs */
308
302/* the per-cpu worker pools */ 309/* the per-cpu worker pools */
303static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], 310static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
304 cpu_worker_pools); 311 cpu_worker_pools);
@@ -330,8 +337,6 @@ struct workqueue_struct *system_freezable_power_efficient_wq __read_mostly;
330EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq); 337EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
331 338
332static int worker_thread(void *__worker); 339static int worker_thread(void *__worker);
333static void copy_workqueue_attrs(struct workqueue_attrs *to,
334 const struct workqueue_attrs *from);
335static void workqueue_sysfs_unregister(struct workqueue_struct *wq); 340static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
336 341
337#define CREATE_TRACE_POINTS 342#define CREATE_TRACE_POINTS
@@ -347,6 +352,12 @@ static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
347 lockdep_is_held(&wq->mutex), \ 352 lockdep_is_held(&wq->mutex), \
348 "sched RCU or wq->mutex should be held") 353 "sched RCU or wq->mutex should be held")
349 354
355#define assert_rcu_or_wq_mutex_or_pool_mutex(wq) \
356 rcu_lockdep_assert(rcu_read_lock_sched_held() || \
357 lockdep_is_held(&wq->mutex) || \
358 lockdep_is_held(&wq_pool_mutex), \
359 "sched RCU, wq->mutex or wq_pool_mutex should be held")
360
350#define for_each_cpu_worker_pool(pool, cpu) \ 361#define for_each_cpu_worker_pool(pool, cpu) \
351 for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0]; \ 362 for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0]; \
352 (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \ 363 (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
@@ -551,7 +562,8 @@ static int worker_pool_assign_id(struct worker_pool *pool)
551 * @wq: the target workqueue 562 * @wq: the target workqueue
552 * @node: the node ID 563 * @node: the node ID
553 * 564 *
554 * This must be called either with pwq_lock held or sched RCU read locked. 565 * This must be called with any of wq_pool_mutex, wq->mutex or sched RCU
566 * read locked.
555 * If the pwq needs to be used beyond the locking in effect, the caller is 567 * If the pwq needs to be used beyond the locking in effect, the caller is
556 * responsible for guaranteeing that the pwq stays online. 568 * responsible for guaranteeing that the pwq stays online.
557 * 569 *
@@ -560,7 +572,7 @@ static int worker_pool_assign_id(struct worker_pool *pool)
560static struct pool_workqueue *unbound_pwq_by_node(struct workqueue_struct *wq, 572static struct pool_workqueue *unbound_pwq_by_node(struct workqueue_struct *wq,
561 int node) 573 int node)
562{ 574{
563 assert_rcu_or_wq_mutex(wq); 575 assert_rcu_or_wq_mutex_or_pool_mutex(wq);
564 return rcu_dereference_raw(wq->numa_pwq_tbl[node]); 576 return rcu_dereference_raw(wq->numa_pwq_tbl[node]);
565} 577}
566 578
@@ -976,7 +988,7 @@ static struct worker *find_worker_executing_work(struct worker_pool *pool,
976 * move_linked_works - move linked works to a list 988 * move_linked_works - move linked works to a list
977 * @work: start of series of works to be scheduled 989 * @work: start of series of works to be scheduled
978 * @head: target list to append @work to 990 * @head: target list to append @work to
979 * @nextp: out paramter for nested worklist walking 991 * @nextp: out parameter for nested worklist walking
980 * 992 *
981 * Schedule linked works starting from @work to @head. Work series to 993 * Schedule linked works starting from @work to @head. Work series to
982 * be scheduled starts at @work and includes any consecutive work with 994 * be scheduled starts at @work and includes any consecutive work with
@@ -2616,7 +2628,7 @@ EXPORT_SYMBOL_GPL(flush_workqueue);
2616 * Wait until the workqueue becomes empty. While draining is in progress, 2628 * Wait until the workqueue becomes empty. While draining is in progress,
2617 * only chain queueing is allowed. IOW, only currently pending or running 2629 * only chain queueing is allowed. IOW, only currently pending or running
2618 * work items on @wq can queue further work items on it. @wq is flushed 2630 * work items on @wq can queue further work items on it. @wq is flushed
2619 * repeatedly until it becomes empty. The number of flushing is detemined 2631 * repeatedly until it becomes empty. The number of flushing is determined
2620 * by the depth of chaining and should be relatively short. Whine if it 2632 * by the depth of chaining and should be relatively short. Whine if it
2621 * takes too long. 2633 * takes too long.
2622 */ 2634 */
@@ -2947,36 +2959,6 @@ int schedule_on_each_cpu(work_func_t func)
2947} 2959}
2948 2960
2949/** 2961/**
2950 * flush_scheduled_work - ensure that any scheduled work has run to completion.
2951 *
2952 * Forces execution of the kernel-global workqueue and blocks until its
2953 * completion.
2954 *
2955 * Think twice before calling this function! It's very easy to get into
2956 * trouble if you don't take great care. Either of the following situations
2957 * will lead to deadlock:
2958 *
2959 * One of the work items currently on the workqueue needs to acquire
2960 * a lock held by your code or its caller.
2961 *
2962 * Your code is running in the context of a work routine.
2963 *
2964 * They will be detected by lockdep when they occur, but the first might not
2965 * occur very often. It depends on what work items are on the workqueue and
2966 * what locks they need, which you have no control over.
2967 *
2968 * In most situations flushing the entire workqueue is overkill; you merely
2969 * need to know that a particular work item isn't queued and isn't running.
2970 * In such cases you should use cancel_delayed_work_sync() or
2971 * cancel_work_sync() instead.
2972 */
2973void flush_scheduled_work(void)
2974{
2975 flush_workqueue(system_wq);
2976}
2977EXPORT_SYMBOL(flush_scheduled_work);
2978
2979/**
2980 * execute_in_process_context - reliably execute the routine with user context 2962 * execute_in_process_context - reliably execute the routine with user context
2981 * @fn: the function to execute 2963 * @fn: the function to execute
2982 * @ew: guaranteed storage for the execute work structure (must 2964 * @ew: guaranteed storage for the execute work structure (must
@@ -3081,7 +3063,7 @@ static bool wqattrs_equal(const struct workqueue_attrs *a,
3081 * init_worker_pool - initialize a newly zalloc'd worker_pool 3063 * init_worker_pool - initialize a newly zalloc'd worker_pool
3082 * @pool: worker_pool to initialize 3064 * @pool: worker_pool to initialize
3083 * 3065 *
3084 * Initiailize a newly zalloc'd @pool. It also allocates @pool->attrs. 3066 * Initialize a newly zalloc'd @pool. It also allocates @pool->attrs.
3085 * 3067 *
3086 * Return: 0 on success, -errno on failure. Even on failure, all fields 3068 * Return: 0 on success, -errno on failure. Even on failure, all fields
3087 * inside @pool proper are initialized and put_unbound_pool() can be called 3069 * inside @pool proper are initialized and put_unbound_pool() can be called
@@ -3425,20 +3407,9 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
3425 return pwq; 3407 return pwq;
3426} 3408}
3427 3409
3428/* undo alloc_unbound_pwq(), used only in the error path */
3429static void free_unbound_pwq(struct pool_workqueue *pwq)
3430{
3431 lockdep_assert_held(&wq_pool_mutex);
3432
3433 if (pwq) {
3434 put_unbound_pool(pwq->pool);
3435 kmem_cache_free(pwq_cache, pwq);
3436 }
3437}
3438
3439/** 3410/**
3440 * wq_calc_node_mask - calculate a wq_attrs' cpumask for the specified node 3411 * wq_calc_node_cpumask - calculate a wq_attrs' cpumask for the specified node
3441 * @attrs: the wq_attrs of interest 3412 * @attrs: the wq_attrs of the default pwq of the target workqueue
3442 * @node: the target NUMA node 3413 * @node: the target NUMA node
3443 * @cpu_going_down: if >= 0, the CPU to consider as offline 3414 * @cpu_going_down: if >= 0, the CPU to consider as offline
3444 * @cpumask: outarg, the resulting cpumask 3415 * @cpumask: outarg, the resulting cpumask
@@ -3488,6 +3459,7 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
3488{ 3459{
3489 struct pool_workqueue *old_pwq; 3460 struct pool_workqueue *old_pwq;
3490 3461
3462 lockdep_assert_held(&wq_pool_mutex);
3491 lockdep_assert_held(&wq->mutex); 3463 lockdep_assert_held(&wq->mutex);
3492 3464
3493 /* link_pwq() can handle duplicate calls */ 3465 /* link_pwq() can handle duplicate calls */
@@ -3498,46 +3470,59 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
3498 return old_pwq; 3470 return old_pwq;
3499} 3471}
3500 3472
3501/** 3473/* context to store the prepared attrs & pwqs before applying */
3502 * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue 3474struct apply_wqattrs_ctx {
3503 * @wq: the target workqueue 3475 struct workqueue_struct *wq; /* target workqueue */
3504 * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs() 3476 struct workqueue_attrs *attrs; /* attrs to apply */
3505 * 3477 struct list_head list; /* queued for batching commit */
3506 * Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA 3478 struct pool_workqueue *dfl_pwq;
3507 * machines, this function maps a separate pwq to each NUMA node with 3479 struct pool_workqueue *pwq_tbl[];
3508 * possibles CPUs in @attrs->cpumask so that work items are affine to the 3480};
3509 * NUMA node it was issued on. Older pwqs are released as in-flight work 3481
3510 * items finish. Note that a work item which repeatedly requeues itself 3482/* free the resources after success or abort */
3511 * back-to-back will stay on its current pwq. 3483static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
3512 * 3484{
3513 * Performs GFP_KERNEL allocations. 3485 if (ctx) {
3514 * 3486 int node;
3515 * Return: 0 on success and -errno on failure. 3487
3516 */ 3488 for_each_node(node)
3517int apply_workqueue_attrs(struct workqueue_struct *wq, 3489 put_pwq_unlocked(ctx->pwq_tbl[node]);
3518 const struct workqueue_attrs *attrs) 3490 put_pwq_unlocked(ctx->dfl_pwq);
3491
3492 free_workqueue_attrs(ctx->attrs);
3493
3494 kfree(ctx);
3495 }
3496}
3497
3498/* allocate the attrs and pwqs for later installation */
3499static struct apply_wqattrs_ctx *
3500apply_wqattrs_prepare(struct workqueue_struct *wq,
3501 const struct workqueue_attrs *attrs)
3519{ 3502{
3503 struct apply_wqattrs_ctx *ctx;
3520 struct workqueue_attrs *new_attrs, *tmp_attrs; 3504 struct workqueue_attrs *new_attrs, *tmp_attrs;
3521 struct pool_workqueue **pwq_tbl, *dfl_pwq; 3505 int node;
3522 int node, ret;
3523 3506
3524 /* only unbound workqueues can change attributes */ 3507 lockdep_assert_held(&wq_pool_mutex);
3525 if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
3526 return -EINVAL;
3527 3508
3528 /* creating multiple pwqs breaks ordering guarantee */ 3509 ctx = kzalloc(sizeof(*ctx) + nr_node_ids * sizeof(ctx->pwq_tbl[0]),
3529 if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs))) 3510 GFP_KERNEL);
3530 return -EINVAL;
3531 3511
3532 pwq_tbl = kzalloc(nr_node_ids * sizeof(pwq_tbl[0]), GFP_KERNEL);
3533 new_attrs = alloc_workqueue_attrs(GFP_KERNEL); 3512 new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
3534 tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL); 3513 tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL);
3535 if (!pwq_tbl || !new_attrs || !tmp_attrs) 3514 if (!ctx || !new_attrs || !tmp_attrs)
3536 goto enomem; 3515 goto out_free;
3537 3516
3538 /* make a copy of @attrs and sanitize it */ 3517 /*
3518 * Calculate the attrs of the default pwq.
3519 * If the user configured cpumask doesn't overlap with the
3520 * wq_unbound_cpumask, we fallback to the wq_unbound_cpumask.
3521 */
3539 copy_workqueue_attrs(new_attrs, attrs); 3522 copy_workqueue_attrs(new_attrs, attrs);
3540 cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask); 3523 cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_cpumask);
3524 if (unlikely(cpumask_empty(new_attrs->cpumask)))
3525 cpumask_copy(new_attrs->cpumask, wq_unbound_cpumask);
3541 3526
3542 /* 3527 /*
3543 * We may create multiple pwqs with differing cpumasks. Make a 3528 * We may create multiple pwqs with differing cpumasks. Make a
@@ -3547,75 +3532,129 @@ int apply_workqueue_attrs(struct workqueue_struct *wq,
3547 copy_workqueue_attrs(tmp_attrs, new_attrs); 3532 copy_workqueue_attrs(tmp_attrs, new_attrs);
3548 3533
3549 /* 3534 /*
3550 * CPUs should stay stable across pwq creations and installations.
3551 * Pin CPUs, determine the target cpumask for each node and create
3552 * pwqs accordingly.
3553 */
3554 get_online_cpus();
3555
3556 mutex_lock(&wq_pool_mutex);
3557
3558 /*
3559 * If something goes wrong during CPU up/down, we'll fall back to 3535 * If something goes wrong during CPU up/down, we'll fall back to
3560 * the default pwq covering whole @attrs->cpumask. Always create 3536 * the default pwq covering whole @attrs->cpumask. Always create
3561 * it even if we don't use it immediately. 3537 * it even if we don't use it immediately.
3562 */ 3538 */
3563 dfl_pwq = alloc_unbound_pwq(wq, new_attrs); 3539 ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
3564 if (!dfl_pwq) 3540 if (!ctx->dfl_pwq)
3565 goto enomem_pwq; 3541 goto out_free;
3566 3542
3567 for_each_node(node) { 3543 for_each_node(node) {
3568 if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) { 3544 if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) {
3569 pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs); 3545 ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
3570 if (!pwq_tbl[node]) 3546 if (!ctx->pwq_tbl[node])
3571 goto enomem_pwq; 3547 goto out_free;
3572 } else { 3548 } else {
3573 dfl_pwq->refcnt++; 3549 ctx->dfl_pwq->refcnt++;
3574 pwq_tbl[node] = dfl_pwq; 3550 ctx->pwq_tbl[node] = ctx->dfl_pwq;
3575 } 3551 }
3576 } 3552 }
3577 3553
3578 mutex_unlock(&wq_pool_mutex); 3554 /* save the user configured attrs and sanitize it. */
3555 copy_workqueue_attrs(new_attrs, attrs);
3556 cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
3557 ctx->attrs = new_attrs;
3558
3559 ctx->wq = wq;
3560 free_workqueue_attrs(tmp_attrs);
3561 return ctx;
3562
3563out_free:
3564 free_workqueue_attrs(tmp_attrs);
3565 free_workqueue_attrs(new_attrs);
3566 apply_wqattrs_cleanup(ctx);
3567 return NULL;
3568}
3569
3570/* set attrs and install prepared pwqs, @ctx points to old pwqs on return */
3571static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
3572{
3573 int node;
3579 3574
3580 /* all pwqs have been created successfully, let's install'em */ 3575 /* all pwqs have been created successfully, let's install'em */
3581 mutex_lock(&wq->mutex); 3576 mutex_lock(&ctx->wq->mutex);
3582 3577
3583 copy_workqueue_attrs(wq->unbound_attrs, new_attrs); 3578 copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);
3584 3579
3585 /* save the previous pwq and install the new one */ 3580 /* save the previous pwq and install the new one */
3586 for_each_node(node) 3581 for_each_node(node)
3587 pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]); 3582 ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node,
3583 ctx->pwq_tbl[node]);
3588 3584
3589 /* @dfl_pwq might not have been used, ensure it's linked */ 3585 /* @dfl_pwq might not have been used, ensure it's linked */
3590 link_pwq(dfl_pwq); 3586 link_pwq(ctx->dfl_pwq);
3591 swap(wq->dfl_pwq, dfl_pwq); 3587 swap(ctx->wq->dfl_pwq, ctx->dfl_pwq);
3592 3588
3593 mutex_unlock(&wq->mutex); 3589 mutex_unlock(&ctx->wq->mutex);
3590}
3594 3591
3595 /* put the old pwqs */ 3592static void apply_wqattrs_lock(void)
3596 for_each_node(node) 3593{
3597 put_pwq_unlocked(pwq_tbl[node]); 3594 /* CPUs should stay stable across pwq creations and installations */
3598 put_pwq_unlocked(dfl_pwq); 3595 get_online_cpus();
3596 mutex_lock(&wq_pool_mutex);
3597}
3599 3598
3599static void apply_wqattrs_unlock(void)
3600{
3601 mutex_unlock(&wq_pool_mutex);
3600 put_online_cpus(); 3602 put_online_cpus();
3601 ret = 0; 3603}
3602 /* fall through */ 3604
3603out_free: 3605static int apply_workqueue_attrs_locked(struct workqueue_struct *wq,
3604 free_workqueue_attrs(tmp_attrs); 3606 const struct workqueue_attrs *attrs)
3605 free_workqueue_attrs(new_attrs); 3607{
3606 kfree(pwq_tbl); 3608 struct apply_wqattrs_ctx *ctx;
3609 int ret = -ENOMEM;
3610
3611 /* only unbound workqueues can change attributes */
3612 if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
3613 return -EINVAL;
3614
3615 /* creating multiple pwqs breaks ordering guarantee */
3616 if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
3617 return -EINVAL;
3618
3619 ctx = apply_wqattrs_prepare(wq, attrs);
3620
3621 /* the ctx has been prepared successfully, let's commit it */
3622 if (ctx) {
3623 apply_wqattrs_commit(ctx);
3624 ret = 0;
3625 }
3626
3627 apply_wqattrs_cleanup(ctx);
3628
3607 return ret; 3629 return ret;
3630}
3608 3631
3609enomem_pwq: 3632/**
3610 free_unbound_pwq(dfl_pwq); 3633 * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
3611 for_each_node(node) 3634 * @wq: the target workqueue
3612 if (pwq_tbl && pwq_tbl[node] != dfl_pwq) 3635 * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
3613 free_unbound_pwq(pwq_tbl[node]); 3636 *
3614 mutex_unlock(&wq_pool_mutex); 3637 * Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA
3615 put_online_cpus(); 3638 * machines, this function maps a separate pwq to each NUMA node with
3616enomem: 3639 * possibles CPUs in @attrs->cpumask so that work items are affine to the
3617 ret = -ENOMEM; 3640 * NUMA node it was issued on. Older pwqs are released as in-flight work
3618 goto out_free; 3641 * items finish. Note that a work item which repeatedly requeues itself
3642 * back-to-back will stay on its current pwq.
3643 *
3644 * Performs GFP_KERNEL allocations.
3645 *
3646 * Return: 0 on success and -errno on failure.
3647 */
3648int apply_workqueue_attrs(struct workqueue_struct *wq,
3649 const struct workqueue_attrs *attrs)
3650{
3651 int ret;
3652
3653 apply_wqattrs_lock();
3654 ret = apply_workqueue_attrs_locked(wq, attrs);
3655 apply_wqattrs_unlock();
3656
3657 return ret;
3619} 3658}
3620 3659
3621/** 3660/**
@@ -3651,7 +3690,8 @@ static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
3651 3690
3652 lockdep_assert_held(&wq_pool_mutex); 3691 lockdep_assert_held(&wq_pool_mutex);
3653 3692
3654 if (!wq_numa_enabled || !(wq->flags & WQ_UNBOUND)) 3693 if (!wq_numa_enabled || !(wq->flags & WQ_UNBOUND) ||
3694 wq->unbound_attrs->no_numa)
3655 return; 3695 return;
3656 3696
3657 /* 3697 /*
@@ -3662,48 +3702,37 @@ static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
3662 target_attrs = wq_update_unbound_numa_attrs_buf; 3702 target_attrs = wq_update_unbound_numa_attrs_buf;
3663 cpumask = target_attrs->cpumask; 3703 cpumask = target_attrs->cpumask;
3664 3704
3665 mutex_lock(&wq->mutex);
3666 if (wq->unbound_attrs->no_numa)
3667 goto out_unlock;
3668
3669 copy_workqueue_attrs(target_attrs, wq->unbound_attrs); 3705 copy_workqueue_attrs(target_attrs, wq->unbound_attrs);
3670 pwq = unbound_pwq_by_node(wq, node); 3706 pwq = unbound_pwq_by_node(wq, node);
3671 3707
3672 /* 3708 /*
3673 * Let's determine what needs to be done. If the target cpumask is 3709 * Let's determine what needs to be done. If the target cpumask is
3674 * different from wq's, we need to compare it to @pwq's and create 3710 * different from the default pwq's, we need to compare it to @pwq's
3675 * a new one if they don't match. If the target cpumask equals 3711 * and create a new one if they don't match. If the target cpumask
3676 * wq's, the default pwq should be used. 3712 * equals the default pwq's, the default pwq should be used.
3677 */ 3713 */
3678 if (wq_calc_node_cpumask(wq->unbound_attrs, node, cpu_off, cpumask)) { 3714 if (wq_calc_node_cpumask(wq->dfl_pwq->pool->attrs, node, cpu_off, cpumask)) {
3679 if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask)) 3715 if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask))
3680 goto out_unlock; 3716 return;
3681 } else { 3717 } else {
3682 goto use_dfl_pwq; 3718 goto use_dfl_pwq;
3683 } 3719 }
3684 3720
3685 mutex_unlock(&wq->mutex);
3686
3687 /* create a new pwq */ 3721 /* create a new pwq */
3688 pwq = alloc_unbound_pwq(wq, target_attrs); 3722 pwq = alloc_unbound_pwq(wq, target_attrs);
3689 if (!pwq) { 3723 if (!pwq) {
3690 pr_warn("workqueue: allocation failed while updating NUMA affinity of \"%s\"\n", 3724 pr_warn("workqueue: allocation failed while updating NUMA affinity of \"%s\"\n",
3691 wq->name); 3725 wq->name);
3692 mutex_lock(&wq->mutex);
3693 goto use_dfl_pwq; 3726 goto use_dfl_pwq;
3694 } 3727 }
3695 3728
3696 /* 3729 /* Install the new pwq. */
3697 * Install the new pwq. As this function is called only from CPU
3698 * hotplug callbacks and applying a new attrs is wrapped with
3699 * get/put_online_cpus(), @wq->unbound_attrs couldn't have changed
3700 * inbetween.
3701 */
3702 mutex_lock(&wq->mutex); 3730 mutex_lock(&wq->mutex);
3703 old_pwq = numa_pwq_tbl_install(wq, node, pwq); 3731 old_pwq = numa_pwq_tbl_install(wq, node, pwq);
3704 goto out_unlock; 3732 goto out_unlock;
3705 3733
3706use_dfl_pwq: 3734use_dfl_pwq:
3735 mutex_lock(&wq->mutex);
3707 spin_lock_irq(&wq->dfl_pwq->pool->lock); 3736 spin_lock_irq(&wq->dfl_pwq->pool->lock);
3708 get_pwq(wq->dfl_pwq); 3737 get_pwq(wq->dfl_pwq);
3709 spin_unlock_irq(&wq->dfl_pwq->pool->lock); 3738 spin_unlock_irq(&wq->dfl_pwq->pool->lock);
@@ -4385,7 +4414,7 @@ static void rebind_workers(struct worker_pool *pool)
4385 /* 4414 /*
4386 * Restore CPU affinity of all workers. As all idle workers should 4415 * Restore CPU affinity of all workers. As all idle workers should
4387 * be on the run-queue of the associated CPU before any local 4416 * be on the run-queue of the associated CPU before any local
4388 * wake-ups for concurrency management happen, restore CPU affinty 4417 * wake-ups for concurrency management happen, restore CPU affinity
4389 * of all workers first and then clear UNBOUND. As we're called 4418 * of all workers first and then clear UNBOUND. As we're called
4390 * from CPU_ONLINE, the following shouldn't fail. 4419 * from CPU_ONLINE, the following shouldn't fail.
4391 */ 4420 */
@@ -4698,6 +4727,82 @@ out_unlock:
4698} 4727}
4699#endif /* CONFIG_FREEZER */ 4728#endif /* CONFIG_FREEZER */
4700 4729
4730static int workqueue_apply_unbound_cpumask(void)
4731{
4732 LIST_HEAD(ctxs);
4733 int ret = 0;
4734 struct workqueue_struct *wq;
4735 struct apply_wqattrs_ctx *ctx, *n;
4736
4737 lockdep_assert_held(&wq_pool_mutex);
4738
4739 list_for_each_entry(wq, &workqueues, list) {
4740 if (!(wq->flags & WQ_UNBOUND))
4741 continue;
4742 /* creating multiple pwqs breaks ordering guarantee */
4743 if (wq->flags & __WQ_ORDERED)
4744 continue;
4745
4746 ctx = apply_wqattrs_prepare(wq, wq->unbound_attrs);
4747 if (!ctx) {
4748 ret = -ENOMEM;
4749 break;
4750 }
4751
4752 list_add_tail(&ctx->list, &ctxs);
4753 }
4754
4755 list_for_each_entry_safe(ctx, n, &ctxs, list) {
4756 if (!ret)
4757 apply_wqattrs_commit(ctx);
4758 apply_wqattrs_cleanup(ctx);
4759 }
4760
4761 return ret;
4762}
4763
4764/**
4765 * workqueue_set_unbound_cpumask - Set the low-level unbound cpumask
4766 * @cpumask: the cpumask to set
4767 *
4768 * The low-level workqueues cpumask is a global cpumask that limits
4769 * the affinity of all unbound workqueues. This function check the @cpumask
4770 * and apply it to all unbound workqueues and updates all pwqs of them.
4771 *
4772 * Retun: 0 - Success
4773 * -EINVAL - Invalid @cpumask
4774 * -ENOMEM - Failed to allocate memory for attrs or pwqs.
4775 */
4776int workqueue_set_unbound_cpumask(cpumask_var_t cpumask)
4777{
4778 int ret = -EINVAL;
4779 cpumask_var_t saved_cpumask;
4780
4781 if (!zalloc_cpumask_var(&saved_cpumask, GFP_KERNEL))
4782 return -ENOMEM;
4783
4784 cpumask_and(cpumask, cpumask, cpu_possible_mask);
4785 if (!cpumask_empty(cpumask)) {
4786 apply_wqattrs_lock();
4787
4788 /* save the old wq_unbound_cpumask. */
4789 cpumask_copy(saved_cpumask, wq_unbound_cpumask);
4790
4791 /* update wq_unbound_cpumask at first and apply it to wqs. */
4792 cpumask_copy(wq_unbound_cpumask, cpumask);
4793 ret = workqueue_apply_unbound_cpumask();
4794
4795 /* restore the wq_unbound_cpumask when failed. */
4796 if (ret < 0)
4797 cpumask_copy(wq_unbound_cpumask, saved_cpumask);
4798
4799 apply_wqattrs_unlock();
4800 }
4801
4802 free_cpumask_var(saved_cpumask);
4803 return ret;
4804}
4805
4701#ifdef CONFIG_SYSFS 4806#ifdef CONFIG_SYSFS
4702/* 4807/*
4703 * Workqueues with WQ_SYSFS flag set is visible to userland via 4808 * Workqueues with WQ_SYSFS flag set is visible to userland via
@@ -4802,13 +4907,13 @@ static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq)
4802{ 4907{
4803 struct workqueue_attrs *attrs; 4908 struct workqueue_attrs *attrs;
4804 4909
4910 lockdep_assert_held(&wq_pool_mutex);
4911
4805 attrs = alloc_workqueue_attrs(GFP_KERNEL); 4912 attrs = alloc_workqueue_attrs(GFP_KERNEL);
4806 if (!attrs) 4913 if (!attrs)
4807 return NULL; 4914 return NULL;
4808 4915
4809 mutex_lock(&wq->mutex);
4810 copy_workqueue_attrs(attrs, wq->unbound_attrs); 4916 copy_workqueue_attrs(attrs, wq->unbound_attrs);
4811 mutex_unlock(&wq->mutex);
4812 return attrs; 4917 return attrs;
4813} 4918}
4814 4919
@@ -4817,18 +4922,22 @@ static ssize_t wq_nice_store(struct device *dev, struct device_attribute *attr,
4817{ 4922{
4818 struct workqueue_struct *wq = dev_to_wq(dev); 4923 struct workqueue_struct *wq = dev_to_wq(dev);
4819 struct workqueue_attrs *attrs; 4924 struct workqueue_attrs *attrs;
4820 int ret; 4925 int ret = -ENOMEM;
4926
4927 apply_wqattrs_lock();
4821 4928
4822 attrs = wq_sysfs_prep_attrs(wq); 4929 attrs = wq_sysfs_prep_attrs(wq);
4823 if (!attrs) 4930 if (!attrs)
4824 return -ENOMEM; 4931 goto out_unlock;
4825 4932
4826 if (sscanf(buf, "%d", &attrs->nice) == 1 && 4933 if (sscanf(buf, "%d", &attrs->nice) == 1 &&
4827 attrs->nice >= MIN_NICE && attrs->nice <= MAX_NICE) 4934 attrs->nice >= MIN_NICE && attrs->nice <= MAX_NICE)
4828 ret = apply_workqueue_attrs(wq, attrs); 4935 ret = apply_workqueue_attrs_locked(wq, attrs);
4829 else 4936 else
4830 ret = -EINVAL; 4937 ret = -EINVAL;
4831 4938
4939out_unlock:
4940 apply_wqattrs_unlock();
4832 free_workqueue_attrs(attrs); 4941 free_workqueue_attrs(attrs);
4833 return ret ?: count; 4942 return ret ?: count;
4834} 4943}
@@ -4852,16 +4961,20 @@ static ssize_t wq_cpumask_store(struct device *dev,
4852{ 4961{
4853 struct workqueue_struct *wq = dev_to_wq(dev); 4962 struct workqueue_struct *wq = dev_to_wq(dev);
4854 struct workqueue_attrs *attrs; 4963 struct workqueue_attrs *attrs;
4855 int ret; 4964 int ret = -ENOMEM;
4965
4966 apply_wqattrs_lock();
4856 4967
4857 attrs = wq_sysfs_prep_attrs(wq); 4968 attrs = wq_sysfs_prep_attrs(wq);
4858 if (!attrs) 4969 if (!attrs)
4859 return -ENOMEM; 4970 goto out_unlock;
4860 4971
4861 ret = cpumask_parse(buf, attrs->cpumask); 4972 ret = cpumask_parse(buf, attrs->cpumask);
4862 if (!ret) 4973 if (!ret)
4863 ret = apply_workqueue_attrs(wq, attrs); 4974 ret = apply_workqueue_attrs_locked(wq, attrs);
4864 4975
4976out_unlock:
4977 apply_wqattrs_unlock();
4865 free_workqueue_attrs(attrs); 4978 free_workqueue_attrs(attrs);
4866 return ret ?: count; 4979 return ret ?: count;
4867} 4980}
@@ -4885,18 +4998,22 @@ static ssize_t wq_numa_store(struct device *dev, struct device_attribute *attr,
4885{ 4998{
4886 struct workqueue_struct *wq = dev_to_wq(dev); 4999 struct workqueue_struct *wq = dev_to_wq(dev);
4887 struct workqueue_attrs *attrs; 5000 struct workqueue_attrs *attrs;
4888 int v, ret; 5001 int v, ret = -ENOMEM;
5002
5003 apply_wqattrs_lock();
4889 5004
4890 attrs = wq_sysfs_prep_attrs(wq); 5005 attrs = wq_sysfs_prep_attrs(wq);
4891 if (!attrs) 5006 if (!attrs)
4892 return -ENOMEM; 5007 goto out_unlock;
4893 5008
4894 ret = -EINVAL; 5009 ret = -EINVAL;
4895 if (sscanf(buf, "%d", &v) == 1) { 5010 if (sscanf(buf, "%d", &v) == 1) {
4896 attrs->no_numa = !v; 5011 attrs->no_numa = !v;
4897 ret = apply_workqueue_attrs(wq, attrs); 5012 ret = apply_workqueue_attrs_locked(wq, attrs);
4898 } 5013 }
4899 5014
5015out_unlock:
5016 apply_wqattrs_unlock();
4900 free_workqueue_attrs(attrs); 5017 free_workqueue_attrs(attrs);
4901 return ret ?: count; 5018 return ret ?: count;
4902} 5019}
@@ -4914,9 +5031,49 @@ static struct bus_type wq_subsys = {
4914 .dev_groups = wq_sysfs_groups, 5031 .dev_groups = wq_sysfs_groups,
4915}; 5032};
4916 5033
5034static ssize_t wq_unbound_cpumask_show(struct device *dev,
5035 struct device_attribute *attr, char *buf)
5036{
5037 int written;
5038
5039 mutex_lock(&wq_pool_mutex);
5040 written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
5041 cpumask_pr_args(wq_unbound_cpumask));
5042 mutex_unlock(&wq_pool_mutex);
5043
5044 return written;
5045}
5046
5047static ssize_t wq_unbound_cpumask_store(struct device *dev,
5048 struct device_attribute *attr, const char *buf, size_t count)
5049{
5050 cpumask_var_t cpumask;
5051 int ret;
5052
5053 if (!zalloc_cpumask_var(&cpumask, GFP_KERNEL))
5054 return -ENOMEM;
5055
5056 ret = cpumask_parse(buf, cpumask);
5057 if (!ret)
5058 ret = workqueue_set_unbound_cpumask(cpumask);
5059
5060 free_cpumask_var(cpumask);
5061 return ret ? ret : count;
5062}
5063
5064static struct device_attribute wq_sysfs_cpumask_attr =
5065 __ATTR(cpumask, 0644, wq_unbound_cpumask_show,
5066 wq_unbound_cpumask_store);
5067
4917static int __init wq_sysfs_init(void) 5068static int __init wq_sysfs_init(void)
4918{ 5069{
4919 return subsys_virtual_register(&wq_subsys, NULL); 5070 int err;
5071
5072 err = subsys_virtual_register(&wq_subsys, NULL);
5073 if (err)
5074 return err;
5075
5076 return device_create_file(wq_subsys.dev_root, &wq_sysfs_cpumask_attr);
4920} 5077}
4921core_initcall(wq_sysfs_init); 5078core_initcall(wq_sysfs_init);
4922 5079
@@ -4948,7 +5105,7 @@ int workqueue_sysfs_register(struct workqueue_struct *wq)
4948 int ret; 5105 int ret;
4949 5106
4950 /* 5107 /*
4951 * Adjusting max_active or creating new pwqs by applyting 5108 * Adjusting max_active or creating new pwqs by applying
4952 * attributes breaks ordering guarantee. Disallow exposing ordered 5109 * attributes breaks ordering guarantee. Disallow exposing ordered
4953 * workqueues. 5110 * workqueues.
4954 */ 5111 */
@@ -5064,6 +5221,9 @@ static int __init init_workqueues(void)
5064 5221
5065 WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long)); 5222 WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));
5066 5223
5224 BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL));
5225 cpumask_copy(wq_unbound_cpumask, cpu_possible_mask);
5226
5067 pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC); 5227 pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);
5068 5228
5069 cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP); 5229 cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);