aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTejun Heo <tj@kernel.org>2010-06-29 04:07:13 -0400
committerTejun Heo <tj@kernel.org>2010-06-29 04:07:13 -0400
commit7a22ad757ec75186ad43a5b4670fa7423ee8f480 (patch)
tree698807765421a46dcb5e2daa609336a61d1cdea5
parent8cca0eea3964b72b14e8c3f88e3a40bef7b9113e (diff)
workqueue: carry cpu number in work data once execution starts
To implement non-reentrant workqueue, the last gcwq a work was executed on must be reliably obtainable as long as the work structure is valid even if the previous workqueue has been destroyed. To achieve this, work->data will be overloaded to carry the last cpu number once execution starts so that the previous gcwq can be located reliably. This means that cwq can't be obtained from work after execution starts but only gcwq. Implement set_work_{cwq|cpu}(), get_work_[g]cwq() and clear_work_data() to set work data to the cpu number when starting execution, access the overloaded work data and clear it after cancellation. queue_delayed_work_on() is updated to preserve the last cpu while in-flight in timer and other callers which depended on getting cwq from work after execution starts are converted to depend on gcwq instead. * Anton Blanchard fixed compile error on powerpc due to missing linux/threads.h include. Signed-off-by: Tejun Heo <tj@kernel.org> Cc: Anton Blanchard <anton@samba.org>
-rw-r--r--include/linux/workqueue.h7
-rw-r--r--kernel/workqueue.c163
2 files changed, 109 insertions, 61 deletions
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 10611f7fc809..0a7814131e66 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -9,6 +9,7 @@
9#include <linux/linkage.h> 9#include <linux/linkage.h>
10#include <linux/bitops.h> 10#include <linux/bitops.h>
11#include <linux/lockdep.h> 11#include <linux/lockdep.h>
12#include <linux/threads.h>
12#include <asm/atomic.h> 13#include <asm/atomic.h>
13 14
14struct workqueue_struct; 15struct workqueue_struct;
@@ -59,6 +60,7 @@ enum {
59 60
60 WORK_STRUCT_FLAG_MASK = (1UL << WORK_STRUCT_FLAG_BITS) - 1, 61 WORK_STRUCT_FLAG_MASK = (1UL << WORK_STRUCT_FLAG_BITS) - 1,
61 WORK_STRUCT_WQ_DATA_MASK = ~WORK_STRUCT_FLAG_MASK, 62 WORK_STRUCT_WQ_DATA_MASK = ~WORK_STRUCT_FLAG_MASK,
63 WORK_STRUCT_NO_CPU = NR_CPUS << WORK_STRUCT_FLAG_BITS,
62}; 64};
63 65
64struct work_struct { 66struct work_struct {
@@ -70,8 +72,9 @@ struct work_struct {
70#endif 72#endif
71}; 73};
72 74
73#define WORK_DATA_INIT() ATOMIC_LONG_INIT(0) 75#define WORK_DATA_INIT() ATOMIC_LONG_INIT(WORK_STRUCT_NO_CPU)
74#define WORK_DATA_STATIC_INIT() ATOMIC_LONG_INIT(WORK_STRUCT_STATIC) 76#define WORK_DATA_STATIC_INIT() \
77 ATOMIC_LONG_INIT(WORK_STRUCT_NO_CPU | WORK_STRUCT_STATIC)
75 78
76struct delayed_work { 79struct delayed_work {
77 struct work_struct work; 80 struct work_struct work;
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index c276dec75ea4..c68277c204ab 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -319,31 +319,71 @@ static int work_next_color(int color)
319} 319}
320 320
321/* 321/*
322 * Set the workqueue on which a work item is to be run 322 * Work data points to the cwq while a work is on queue. Once
323 * - Must *only* be called if the pending flag is set 323 * execution starts, it points to the cpu the work was last on. This
324 * can be distinguished by comparing the data value against
325 * PAGE_OFFSET.
326 *
327 * set_work_{cwq|cpu}() and clear_work_data() can be used to set the
328 * cwq, cpu or clear work->data. These functions should only be
329 * called while the work is owned - ie. while the PENDING bit is set.
330 *
331 * get_work_[g]cwq() can be used to obtain the gcwq or cwq
332 * corresponding to a work. gcwq is available once the work has been
333 * queued anywhere after initialization. cwq is available only from
334 * queueing until execution starts.
324 */ 335 */
325static inline void set_wq_data(struct work_struct *work, 336static inline void set_work_data(struct work_struct *work, unsigned long data,
326 struct cpu_workqueue_struct *cwq, 337 unsigned long flags)
327 unsigned long extra_flags)
328{ 338{
329 BUG_ON(!work_pending(work)); 339 BUG_ON(!work_pending(work));
340 atomic_long_set(&work->data, data | flags | work_static(work));
341}
330 342
331 atomic_long_set(&work->data, (unsigned long)cwq | work_static(work) | 343static void set_work_cwq(struct work_struct *work,
332 WORK_STRUCT_PENDING | extra_flags); 344 struct cpu_workqueue_struct *cwq,
345 unsigned long extra_flags)
346{
347 set_work_data(work, (unsigned long)cwq,
348 WORK_STRUCT_PENDING | extra_flags);
333} 349}
334 350
335/* 351static void set_work_cpu(struct work_struct *work, unsigned int cpu)
336 * Clear WORK_STRUCT_PENDING and the workqueue on which it was queued. 352{
337 */ 353 set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, WORK_STRUCT_PENDING);
338static inline void clear_wq_data(struct work_struct *work) 354}
355
356static void clear_work_data(struct work_struct *work)
357{
358 set_work_data(work, WORK_STRUCT_NO_CPU, 0);
359}
360
361static inline unsigned long get_work_data(struct work_struct *work)
362{
363 return atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK;
364}
365
366static struct cpu_workqueue_struct *get_work_cwq(struct work_struct *work)
339{ 367{
340 atomic_long_set(&work->data, work_static(work)); 368 unsigned long data = get_work_data(work);
369
370 return data >= PAGE_OFFSET ? (void *)data : NULL;
341} 371}
342 372
343static inline struct cpu_workqueue_struct *get_wq_data(struct work_struct *work) 373static struct global_cwq *get_work_gcwq(struct work_struct *work)
344{ 374{
345 return (void *)(atomic_long_read(&work->data) & 375 unsigned long data = get_work_data(work);
346 WORK_STRUCT_WQ_DATA_MASK); 376 unsigned int cpu;
377
378 if (data >= PAGE_OFFSET)
379 return ((struct cpu_workqueue_struct *)data)->gcwq;
380
381 cpu = data >> WORK_STRUCT_FLAG_BITS;
382 if (cpu == NR_CPUS)
383 return NULL;
384
385 BUG_ON(cpu >= num_possible_cpus());
386 return get_gcwq(cpu);
347} 387}
348 388
349/** 389/**
@@ -443,7 +483,7 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
443 unsigned int extra_flags) 483 unsigned int extra_flags)
444{ 484{
445 /* we own @work, set data and link */ 485 /* we own @work, set data and link */
446 set_wq_data(work, cwq, extra_flags); 486 set_work_cwq(work, cwq, extra_flags);
447 487
448 /* 488 /*
449 * Ensure that we get the right work->data if we see the 489 * Ensure that we get the right work->data if we see the
@@ -599,7 +639,7 @@ EXPORT_SYMBOL_GPL(queue_work_on);
599static void delayed_work_timer_fn(unsigned long __data) 639static void delayed_work_timer_fn(unsigned long __data)
600{ 640{
601 struct delayed_work *dwork = (struct delayed_work *)__data; 641 struct delayed_work *dwork = (struct delayed_work *)__data;
602 struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work); 642 struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work);
603 643
604 __queue_work(smp_processor_id(), cwq->wq, &dwork->work); 644 __queue_work(smp_processor_id(), cwq->wq, &dwork->work);
605} 645}
@@ -639,13 +679,19 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
639 struct work_struct *work = &dwork->work; 679 struct work_struct *work = &dwork->work;
640 680
641 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { 681 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
682 struct global_cwq *gcwq = get_work_gcwq(work);
683 unsigned int lcpu = gcwq ? gcwq->cpu : raw_smp_processor_id();
684
642 BUG_ON(timer_pending(timer)); 685 BUG_ON(timer_pending(timer));
643 BUG_ON(!list_empty(&work->entry)); 686 BUG_ON(!list_empty(&work->entry));
644 687
645 timer_stats_timer_set_start_info(&dwork->timer); 688 timer_stats_timer_set_start_info(&dwork->timer);
646 689 /*
647 /* This stores cwq for the moment, for the timer_fn */ 690 * This stores cwq for the moment, for the timer_fn.
648 set_wq_data(work, get_cwq(raw_smp_processor_id(), wq), 0); 691 * Note that the work's gcwq is preserved to allow
692 * reentrance detection for delayed works.
693 */
694 set_work_cwq(work, get_cwq(lcpu, wq), 0);
649 timer->expires = jiffies + delay; 695 timer->expires = jiffies + delay;
650 timer->data = (unsigned long)dwork; 696 timer->data = (unsigned long)dwork;
651 timer->function = delayed_work_timer_fn; 697 timer->function = delayed_work_timer_fn;
@@ -970,11 +1016,14 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
970 worker->current_work = work; 1016 worker->current_work = work;
971 worker->current_cwq = cwq; 1017 worker->current_cwq = cwq;
972 work_color = get_work_color(work); 1018 work_color = get_work_color(work);
1019
1020 BUG_ON(get_work_cwq(work) != cwq);
1021 /* record the current cpu number in the work data and dequeue */
1022 set_work_cpu(work, gcwq->cpu);
973 list_del_init(&work->entry); 1023 list_del_init(&work->entry);
974 1024
975 spin_unlock_irq(&gcwq->lock); 1025 spin_unlock_irq(&gcwq->lock);
976 1026
977 BUG_ON(get_wq_data(work) != cwq);
978 work_clear_pending(work); 1027 work_clear_pending(work);
979 lock_map_acquire(&cwq->wq->lockdep_map); 1028 lock_map_acquire(&cwq->wq->lockdep_map);
980 lock_map_acquire(&lockdep_map); 1029 lock_map_acquire(&lockdep_map);
@@ -1406,37 +1455,39 @@ EXPORT_SYMBOL_GPL(flush_workqueue);
1406int flush_work(struct work_struct *work) 1455int flush_work(struct work_struct *work)
1407{ 1456{
1408 struct worker *worker = NULL; 1457 struct worker *worker = NULL;
1409 struct cpu_workqueue_struct *cwq;
1410 struct global_cwq *gcwq; 1458 struct global_cwq *gcwq;
1459 struct cpu_workqueue_struct *cwq;
1411 struct wq_barrier barr; 1460 struct wq_barrier barr;
1412 1461
1413 might_sleep(); 1462 might_sleep();
1414 cwq = get_wq_data(work); 1463 gcwq = get_work_gcwq(work);
1415 if (!cwq) 1464 if (!gcwq)
1416 return 0; 1465 return 0;
1417 gcwq = cwq->gcwq;
1418
1419 lock_map_acquire(&cwq->wq->lockdep_map);
1420 lock_map_release(&cwq->wq->lockdep_map);
1421 1466
1422 spin_lock_irq(&gcwq->lock); 1467 spin_lock_irq(&gcwq->lock);
1423 if (!list_empty(&work->entry)) { 1468 if (!list_empty(&work->entry)) {
1424 /* 1469 /*
1425 * See the comment near try_to_grab_pending()->smp_rmb(). 1470 * See the comment near try_to_grab_pending()->smp_rmb().
1426 * If it was re-queued under us we are not going to wait. 1471 * If it was re-queued to a different gcwq under us, we
1472 * are not going to wait.
1427 */ 1473 */
1428 smp_rmb(); 1474 smp_rmb();
1429 if (unlikely(cwq != get_wq_data(work))) 1475 cwq = get_work_cwq(work);
1476 if (unlikely(!cwq || gcwq != cwq->gcwq))
1430 goto already_gone; 1477 goto already_gone;
1431 } else { 1478 } else {
1432 if (cwq->worker && cwq->worker->current_work == work) 1479 worker = find_worker_executing_work(gcwq, work);
1433 worker = cwq->worker;
1434 if (!worker) 1480 if (!worker)
1435 goto already_gone; 1481 goto already_gone;
1482 cwq = worker->current_cwq;
1436 } 1483 }
1437 1484
1438 insert_wq_barrier(cwq, &barr, work, worker); 1485 insert_wq_barrier(cwq, &barr, work, worker);
1439 spin_unlock_irq(&gcwq->lock); 1486 spin_unlock_irq(&gcwq->lock);
1487
1488 lock_map_acquire(&cwq->wq->lockdep_map);
1489 lock_map_release(&cwq->wq->lockdep_map);
1490
1440 wait_for_completion(&barr.done); 1491 wait_for_completion(&barr.done);
1441 destroy_work_on_stack(&barr.work); 1492 destroy_work_on_stack(&barr.work);
1442 return 1; 1493 return 1;
@@ -1453,7 +1504,6 @@ EXPORT_SYMBOL_GPL(flush_work);
1453static int try_to_grab_pending(struct work_struct *work) 1504static int try_to_grab_pending(struct work_struct *work)
1454{ 1505{
1455 struct global_cwq *gcwq; 1506 struct global_cwq *gcwq;
1456 struct cpu_workqueue_struct *cwq;
1457 int ret = -1; 1507 int ret = -1;
1458 1508
1459 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) 1509 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
@@ -1463,24 +1513,23 @@ static int try_to_grab_pending(struct work_struct *work)
1463 * The queueing is in progress, or it is already queued. Try to 1513 * The queueing is in progress, or it is already queued. Try to
1464 * steal it from ->worklist without clearing WORK_STRUCT_PENDING. 1514 * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
1465 */ 1515 */
1466 1516 gcwq = get_work_gcwq(work);
1467 cwq = get_wq_data(work); 1517 if (!gcwq)
1468 if (!cwq)
1469 return ret; 1518 return ret;
1470 gcwq = cwq->gcwq;
1471 1519
1472 spin_lock_irq(&gcwq->lock); 1520 spin_lock_irq(&gcwq->lock);
1473 if (!list_empty(&work->entry)) { 1521 if (!list_empty(&work->entry)) {
1474 /* 1522 /*
1475 * This work is queued, but perhaps we locked the wrong cwq. 1523 * This work is queued, but perhaps we locked the wrong gcwq.
1476 * In that case we must see the new value after rmb(), see 1524 * In that case we must see the new value after rmb(), see
1477 * insert_work()->wmb(). 1525 * insert_work()->wmb().
1478 */ 1526 */
1479 smp_rmb(); 1527 smp_rmb();
1480 if (cwq == get_wq_data(work)) { 1528 if (gcwq == get_work_gcwq(work)) {
1481 debug_work_deactivate(work); 1529 debug_work_deactivate(work);
1482 list_del_init(&work->entry); 1530 list_del_init(&work->entry);
1483 cwq_dec_nr_in_flight(cwq, get_work_color(work)); 1531 cwq_dec_nr_in_flight(get_work_cwq(work),
1532 get_work_color(work));
1484 ret = 1; 1533 ret = 1;
1485 } 1534 }
1486 } 1535 }
@@ -1489,20 +1538,16 @@ static int try_to_grab_pending(struct work_struct *work)
1489 return ret; 1538 return ret;
1490} 1539}
1491 1540
1492static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq, 1541static void wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work)
1493 struct work_struct *work)
1494{ 1542{
1495 struct global_cwq *gcwq = cwq->gcwq;
1496 struct wq_barrier barr; 1543 struct wq_barrier barr;
1497 struct worker *worker; 1544 struct worker *worker;
1498 1545
1499 spin_lock_irq(&gcwq->lock); 1546 spin_lock_irq(&gcwq->lock);
1500 1547
1501 worker = NULL; 1548 worker = find_worker_executing_work(gcwq, work);
1502 if (unlikely(cwq->worker && cwq->worker->current_work == work)) { 1549 if (unlikely(worker))
1503 worker = cwq->worker; 1550 insert_wq_barrier(worker->current_cwq, &barr, work, worker);
1504 insert_wq_barrier(cwq, &barr, work, worker);
1505 }
1506 1551
1507 spin_unlock_irq(&gcwq->lock); 1552 spin_unlock_irq(&gcwq->lock);
1508 1553
@@ -1514,8 +1559,6 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
1514 1559
1515static void wait_on_work(struct work_struct *work) 1560static void wait_on_work(struct work_struct *work)
1516{ 1561{
1517 struct cpu_workqueue_struct *cwq;
1518 struct workqueue_struct *wq;
1519 int cpu; 1562 int cpu;
1520 1563
1521 might_sleep(); 1564 might_sleep();
@@ -1523,14 +1566,8 @@ static void wait_on_work(struct work_struct *work)
1523 lock_map_acquire(&work->lockdep_map); 1566 lock_map_acquire(&work->lockdep_map);
1524 lock_map_release(&work->lockdep_map); 1567 lock_map_release(&work->lockdep_map);
1525 1568
1526 cwq = get_wq_data(work);
1527 if (!cwq)
1528 return;
1529
1530 wq = cwq->wq;
1531
1532 for_each_possible_cpu(cpu) 1569 for_each_possible_cpu(cpu)
1533 wait_on_cpu_work(get_cwq(cpu, wq), work); 1570 wait_on_cpu_work(get_gcwq(cpu), work);
1534} 1571}
1535 1572
1536static int __cancel_work_timer(struct work_struct *work, 1573static int __cancel_work_timer(struct work_struct *work,
@@ -1545,7 +1582,7 @@ static int __cancel_work_timer(struct work_struct *work,
1545 wait_on_work(work); 1582 wait_on_work(work);
1546 } while (unlikely(ret < 0)); 1583 } while (unlikely(ret < 0));
1547 1584
1548 clear_wq_data(work); 1585 clear_work_data(work);
1549 return ret; 1586 return ret;
1550} 1587}
1551 1588
@@ -1647,7 +1684,7 @@ EXPORT_SYMBOL(schedule_delayed_work);
1647void flush_delayed_work(struct delayed_work *dwork) 1684void flush_delayed_work(struct delayed_work *dwork)
1648{ 1685{
1649 if (del_timer_sync(&dwork->timer)) { 1686 if (del_timer_sync(&dwork->timer)) {
1650 __queue_work(get_cpu(), get_wq_data(&dwork->work)->wq, 1687 __queue_work(get_cpu(), get_work_cwq(&dwork->work)->wq,
1651 &dwork->work); 1688 &dwork->work);
1652 put_cpu(); 1689 put_cpu();
1653 } 1690 }
@@ -2405,6 +2442,14 @@ void __init init_workqueues(void)
2405 unsigned int cpu; 2442 unsigned int cpu;
2406 int i; 2443 int i;
2407 2444
2445 /*
2446 * The pointer part of work->data is either pointing to the
2447 * cwq or contains the cpu number the work ran last on. Make
2448 * sure cpu number won't overflow into kernel pointer area so
2449 * that they can be distinguished.
2450 */
2451 BUILD_BUG_ON(NR_CPUS << WORK_STRUCT_FLAG_BITS >= PAGE_OFFSET);
2452
2408 hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE); 2453 hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
2409 2454
2410 /* initialize gcwqs */ 2455 /* initialize gcwqs */