aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPaul E. McKenney <paulmck@linux.vnet.ibm.com>2009-10-07 00:48:17 -0400
committerIngo Molnar <mingo@elte.hu>2009-10-07 02:11:20 -0400
commite74f4c4564455c91a3b4075bb1721993c2a95dda (patch)
tree213f9df0974c6e1e729de207b2c6dd942a39ba8c
parentd0ec774cb2599c858be9d923bb873cf6697520d8 (diff)
rcu: Make hot-unplugged CPU relinquish its own RCU callbacks
The current interaction between RCU and CPU hotplug requires that RCU block in CPU notifiers waiting for callbacks to drain. This can be greatly simplified by having each CPU relinquish its own callbacks, and for both _rcu_barrier() and CPU_DEAD notifiers to adopt all callbacks that were previously relinquished. This change also eliminates the possibility of certain types of hangs due to the previous practice of waiting for callbacks to be invoked from within CPU notifiers. If you don't every wait, you cannot hang. Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com> Cc: laijs@cn.fujitsu.com Cc: dipankar@in.ibm.com Cc: akpm@linux-foundation.org Cc: mathieu.desnoyers@polymtl.ca Cc: josh@joshtriplett.org Cc: dvhltc@us.ibm.com Cc: niv@us.ibm.com Cc: peterz@infradead.org Cc: rostedt@goodmis.org Cc: Valdis.Kletnieks@vt.edu Cc: dhowells@redhat.com LKML-Reference: <1254890898456-git-send-email-> Signed-off-by: Ingo Molnar <mingo@elte.hu>
-rw-r--r--kernel/rcutree.c151
-rw-r--r--kernel/rcutree.h11
-rw-r--r--kernel/rcutree_plugin.h34
-rw-r--r--kernel/rcutree_trace.c4
4 files changed, 125 insertions, 75 deletions
diff --git a/kernel/rcutree.c b/kernel/rcutree.c
index 0108570a192c..d8d98655c9e7 100644
--- a/kernel/rcutree.c
+++ b/kernel/rcutree.c
@@ -63,6 +63,9 @@
63 .gpnum = -300, \ 63 .gpnum = -300, \
64 .completed = -300, \ 64 .completed = -300, \
65 .onofflock = __SPIN_LOCK_UNLOCKED(&name.onofflock), \ 65 .onofflock = __SPIN_LOCK_UNLOCKED(&name.onofflock), \
66 .orphan_cbs_list = NULL, \
67 .orphan_cbs_tail = &name.orphan_cbs_list, \
68 .orphan_qlen = 0, \
66 .fqslock = __SPIN_LOCK_UNLOCKED(&name.fqslock), \ 69 .fqslock = __SPIN_LOCK_UNLOCKED(&name.fqslock), \
67 .n_force_qs = 0, \ 70 .n_force_qs = 0, \
68 .n_force_qs_ngp = 0, \ 71 .n_force_qs_ngp = 0, \
@@ -838,17 +841,63 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
838#ifdef CONFIG_HOTPLUG_CPU 841#ifdef CONFIG_HOTPLUG_CPU
839 842
840/* 843/*
844 * Move a dying CPU's RCU callbacks to the ->orphan_cbs_list for the
845 * specified flavor of RCU. The callbacks will be adopted by the next
846 * _rcu_barrier() invocation or by the CPU_DEAD notifier, whichever
847 * comes first. Because this is invoked from the CPU_DYING notifier,
848 * irqs are already disabled.
849 */
850static void rcu_send_cbs_to_orphanage(struct rcu_state *rsp)
851{
852 int i;
853 struct rcu_data *rdp = rsp->rda[smp_processor_id()];
854
855 if (rdp->nxtlist == NULL)
856 return; /* irqs disabled, so comparison is stable. */
857 spin_lock(&rsp->onofflock); /* irqs already disabled. */
858 *rsp->orphan_cbs_tail = rdp->nxtlist;
859 rsp->orphan_cbs_tail = rdp->nxttail[RCU_NEXT_TAIL];
860 rdp->nxtlist = NULL;
861 for (i = 0; i < RCU_NEXT_SIZE; i++)
862 rdp->nxttail[i] = &rdp->nxtlist;
863 rsp->orphan_qlen += rdp->qlen;
864 rdp->qlen = 0;
865 spin_unlock(&rsp->onofflock); /* irqs remain disabled. */
866}
867
868/*
869 * Adopt previously orphaned RCU callbacks.
870 */
871static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
872{
873 unsigned long flags;
874 struct rcu_data *rdp;
875
876 spin_lock_irqsave(&rsp->onofflock, flags);
877 rdp = rsp->rda[smp_processor_id()];
878 if (rsp->orphan_cbs_list == NULL) {
879 spin_unlock_irqrestore(&rsp->onofflock, flags);
880 return;
881 }
882 *rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_cbs_list;
883 rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_cbs_tail;
884 rdp->qlen += rsp->orphan_qlen;
885 rsp->orphan_cbs_list = NULL;
886 rsp->orphan_cbs_tail = &rsp->orphan_cbs_list;
887 rsp->orphan_qlen = 0;
888 spin_unlock_irqrestore(&rsp->onofflock, flags);
889}
890
891/*
841 * Remove the outgoing CPU from the bitmasks in the rcu_node hierarchy 892 * Remove the outgoing CPU from the bitmasks in the rcu_node hierarchy
842 * and move all callbacks from the outgoing CPU to the current one. 893 * and move all callbacks from the outgoing CPU to the current one.
843 */ 894 */
844static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp) 895static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp)
845{ 896{
846 int i;
847 unsigned long flags; 897 unsigned long flags;
848 long lastcomp; 898 long lastcomp;
849 unsigned long mask; 899 unsigned long mask;
850 struct rcu_data *rdp = rsp->rda[cpu]; 900 struct rcu_data *rdp = rsp->rda[cpu];
851 struct rcu_data *rdp_me;
852 struct rcu_node *rnp; 901 struct rcu_node *rnp;
853 902
854 /* Exclude any attempts to start a new grace period. */ 903 /* Exclude any attempts to start a new grace period. */
@@ -871,32 +920,9 @@ static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp)
871 } while (rnp != NULL); 920 } while (rnp != NULL);
872 lastcomp = rsp->completed; 921 lastcomp = rsp->completed;
873 922
874 spin_unlock(&rsp->onofflock); /* irqs remain disabled. */ 923 spin_unlock_irqrestore(&rsp->onofflock, flags);
875 924
876 /* 925 rcu_adopt_orphan_cbs(rsp);
877 * Move callbacks from the outgoing CPU to the running CPU.
878 * Note that the outgoing CPU is now quiescent, so it is now
879 * (uncharacteristically) safe to access its rcu_data structure.
880 * Note also that we must carefully retain the order of the
881 * outgoing CPU's callbacks in order for rcu_barrier() to work
882 * correctly. Finally, note that we start all the callbacks
883 * afresh, even those that have passed through a grace period
884 * and are therefore ready to invoke. The theory is that hotplug
885 * events are rare, and that if they are frequent enough to
886 * indefinitely delay callbacks, you have far worse things to
887 * be worrying about.
888 */
889 if (rdp->nxtlist != NULL) {
890 rdp_me = rsp->rda[smp_processor_id()];
891 *rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
892 rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
893 rdp->nxtlist = NULL;
894 for (i = 0; i < RCU_NEXT_SIZE; i++)
895 rdp->nxttail[i] = &rdp->nxtlist;
896 rdp_me->qlen += rdp->qlen;
897 rdp->qlen = 0;
898 }
899 local_irq_restore(flags);
900} 926}
901 927
902/* 928/*
@@ -914,6 +940,14 @@ static void rcu_offline_cpu(int cpu)
914 940
915#else /* #ifdef CONFIG_HOTPLUG_CPU */ 941#else /* #ifdef CONFIG_HOTPLUG_CPU */
916 942
943static void rcu_send_cbs_to_orphanage(struct rcu_state *rsp)
944{
945}
946
947static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
948{
949}
950
917static void rcu_offline_cpu(int cpu) 951static void rcu_offline_cpu(int cpu)
918{ 952{
919} 953}
@@ -1367,9 +1401,6 @@ static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
1367static atomic_t rcu_barrier_cpu_count; 1401static atomic_t rcu_barrier_cpu_count;
1368static DEFINE_MUTEX(rcu_barrier_mutex); 1402static DEFINE_MUTEX(rcu_barrier_mutex);
1369static struct completion rcu_barrier_completion; 1403static struct completion rcu_barrier_completion;
1370static atomic_t rcu_migrate_type_count = ATOMIC_INIT(0);
1371static struct rcu_head rcu_migrate_head[3];
1372static DECLARE_WAIT_QUEUE_HEAD(rcu_migrate_wq);
1373 1404
1374static void rcu_barrier_callback(struct rcu_head *notused) 1405static void rcu_barrier_callback(struct rcu_head *notused)
1375{ 1406{
@@ -1392,21 +1423,16 @@ static void rcu_barrier_func(void *type)
1392 call_rcu_func(head, rcu_barrier_callback); 1423 call_rcu_func(head, rcu_barrier_callback);
1393} 1424}
1394 1425
1395static inline void wait_migrated_callbacks(void)
1396{
1397 wait_event(rcu_migrate_wq, !atomic_read(&rcu_migrate_type_count));
1398 smp_mb(); /* In case we didn't sleep. */
1399}
1400
1401/* 1426/*
1402 * Orchestrate the specified type of RCU barrier, waiting for all 1427 * Orchestrate the specified type of RCU barrier, waiting for all
1403 * RCU callbacks of the specified type to complete. 1428 * RCU callbacks of the specified type to complete.
1404 */ 1429 */
1405static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head, 1430static void _rcu_barrier(struct rcu_state *rsp,
1431 void (*call_rcu_func)(struct rcu_head *head,
1406 void (*func)(struct rcu_head *head))) 1432 void (*func)(struct rcu_head *head)))
1407{ 1433{
1408 BUG_ON(in_interrupt()); 1434 BUG_ON(in_interrupt());
1409 /* Take cpucontrol mutex to protect against CPU hotplug */ 1435 /* Take mutex to serialize concurrent rcu_barrier() requests. */
1410 mutex_lock(&rcu_barrier_mutex); 1436 mutex_lock(&rcu_barrier_mutex);
1411 init_completion(&rcu_barrier_completion); 1437 init_completion(&rcu_barrier_completion);
1412 /* 1438 /*
@@ -1419,29 +1445,22 @@ static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
1419 * early. 1445 * early.
1420 */ 1446 */
1421 atomic_set(&rcu_barrier_cpu_count, 1); 1447 atomic_set(&rcu_barrier_cpu_count, 1);
1448 preempt_disable(); /* stop CPU_DYING from filling orphan_cbs_list */
1449 rcu_adopt_orphan_cbs(rsp);
1422 on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1); 1450 on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
1451 preempt_enable(); /* CPU_DYING can again fill orphan_cbs_list */
1423 if (atomic_dec_and_test(&rcu_barrier_cpu_count)) 1452 if (atomic_dec_and_test(&rcu_barrier_cpu_count))
1424 complete(&rcu_barrier_completion); 1453 complete(&rcu_barrier_completion);
1425 wait_for_completion(&rcu_barrier_completion); 1454 wait_for_completion(&rcu_barrier_completion);
1426 mutex_unlock(&rcu_barrier_mutex); 1455 mutex_unlock(&rcu_barrier_mutex);
1427 wait_migrated_callbacks();
1428}
1429
1430/**
1431 * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
1432 */
1433void rcu_barrier(void)
1434{
1435 _rcu_barrier(call_rcu);
1436} 1456}
1437EXPORT_SYMBOL_GPL(rcu_barrier);
1438 1457
1439/** 1458/**
1440 * rcu_barrier_bh - Wait until all in-flight call_rcu_bh() callbacks complete. 1459 * rcu_barrier_bh - Wait until all in-flight call_rcu_bh() callbacks complete.
1441 */ 1460 */
1442void rcu_barrier_bh(void) 1461void rcu_barrier_bh(void)
1443{ 1462{
1444 _rcu_barrier(call_rcu_bh); 1463 _rcu_barrier(&rcu_bh_state, call_rcu_bh);
1445} 1464}
1446EXPORT_SYMBOL_GPL(rcu_barrier_bh); 1465EXPORT_SYMBOL_GPL(rcu_barrier_bh);
1447 1466
@@ -1450,16 +1469,10 @@ EXPORT_SYMBOL_GPL(rcu_barrier_bh);
1450 */ 1469 */
1451void rcu_barrier_sched(void) 1470void rcu_barrier_sched(void)
1452{ 1471{
1453 _rcu_barrier(call_rcu_sched); 1472 _rcu_barrier(&rcu_sched_state, call_rcu_sched);
1454} 1473}
1455EXPORT_SYMBOL_GPL(rcu_barrier_sched); 1474EXPORT_SYMBOL_GPL(rcu_barrier_sched);
1456 1475
1457static void rcu_migrate_callback(struct rcu_head *notused)
1458{
1459 if (atomic_dec_and_test(&rcu_migrate_type_count))
1460 wake_up(&rcu_migrate_wq);
1461}
1462
1463/* 1476/*
1464 * Do boot-time initialization of a CPU's per-CPU RCU data. 1477 * Do boot-time initialization of a CPU's per-CPU RCU data.
1465 */ 1478 */
@@ -1556,27 +1569,21 @@ int __cpuinit rcu_cpu_notify(struct notifier_block *self,
1556 case CPU_UP_PREPARE_FROZEN: 1569 case CPU_UP_PREPARE_FROZEN:
1557 rcu_online_cpu(cpu); 1570 rcu_online_cpu(cpu);
1558 break; 1571 break;
1559 case CPU_DOWN_PREPARE:
1560 case CPU_DOWN_PREPARE_FROZEN:
1561 /* Don't need to wait until next removal operation. */
1562 /* rcu_migrate_head is protected by cpu_add_remove_lock */
1563 wait_migrated_callbacks();
1564 break;
1565 case CPU_DYING: 1572 case CPU_DYING:
1566 case CPU_DYING_FROZEN: 1573 case CPU_DYING_FROZEN:
1567 /* 1574 /*
1568 * preempt_disable() in on_each_cpu() prevents stop_machine(), 1575 * preempt_disable() in _rcu_barrier() prevents stop_machine(),
1569 * so when "on_each_cpu(rcu_barrier_func, (void *)type, 1);" 1576 * so when "on_each_cpu(rcu_barrier_func, (void *)type, 1);"
1570 * returns, all online cpus have queued rcu_barrier_func(), 1577 * returns, all online cpus have queued rcu_barrier_func().
1571 * and the dead cpu(if it exist) queues rcu_migrate_callback()s. 1578 * The dying CPU clears its cpu_online_mask bit and
1572 * 1579 * moves all of its RCU callbacks to ->orphan_cbs_list
1573 * These callbacks ensure _rcu_barrier() waits for all 1580 * in the context of stop_machine(), so subsequent calls
1574 * RCU callbacks of the specified type to complete. 1581 * to _rcu_barrier() will adopt these callbacks and only
1582 * then queue rcu_barrier_func() on all remaining CPUs.
1575 */ 1583 */
1576 atomic_set(&rcu_migrate_type_count, 3); 1584 rcu_send_cbs_to_orphanage(&rcu_bh_state);
1577 call_rcu_bh(rcu_migrate_head, rcu_migrate_callback); 1585 rcu_send_cbs_to_orphanage(&rcu_sched_state);
1578 call_rcu_sched(rcu_migrate_head + 1, rcu_migrate_callback); 1586 rcu_preempt_send_cbs_to_orphanage();
1579 call_rcu(rcu_migrate_head + 2, rcu_migrate_callback);
1580 break; 1587 break;
1581 case CPU_DEAD: 1588 case CPU_DEAD:
1582 case CPU_DEAD_FROZEN: 1589 case CPU_DEAD_FROZEN:
diff --git a/kernel/rcutree.h b/kernel/rcutree.h
index 676eecd371d9..b40ac5706040 100644
--- a/kernel/rcutree.h
+++ b/kernel/rcutree.h
@@ -244,7 +244,15 @@ struct rcu_state {
244 /* End of fields guarded by root rcu_node's lock. */ 244 /* End of fields guarded by root rcu_node's lock. */
245 245
246 spinlock_t onofflock; /* exclude on/offline and */ 246 spinlock_t onofflock; /* exclude on/offline and */
247 /* starting new GP. */ 247 /* starting new GP. Also */
248 /* protects the following */
249 /* orphan_cbs fields. */
250 struct rcu_head *orphan_cbs_list; /* list of rcu_head structs */
251 /* orphaned by all CPUs in */
252 /* a given leaf rcu_node */
253 /* going offline. */
254 struct rcu_head **orphan_cbs_tail; /* And tail pointer. */
255 long orphan_qlen; /* Number of orphaned cbs. */
248 spinlock_t fqslock; /* Only one task forcing */ 256 spinlock_t fqslock; /* Only one task forcing */
249 /* quiescent states. */ 257 /* quiescent states. */
250 unsigned long jiffies_force_qs; /* Time at which to invoke */ 258 unsigned long jiffies_force_qs; /* Time at which to invoke */
@@ -305,6 +313,7 @@ void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu));
305static int rcu_preempt_pending(int cpu); 313static int rcu_preempt_pending(int cpu);
306static int rcu_preempt_needs_cpu(int cpu); 314static int rcu_preempt_needs_cpu(int cpu);
307static void __cpuinit rcu_preempt_init_percpu_data(int cpu); 315static void __cpuinit rcu_preempt_init_percpu_data(int cpu);
316static void rcu_preempt_send_cbs_to_orphanage(void);
308static void __init __rcu_init_preempt(void); 317static void __init __rcu_init_preempt(void);
309 318
310#endif /* #else #ifdef RCU_TREE_NONCORE */ 319#endif /* #else #ifdef RCU_TREE_NONCORE */
diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h
index 57200fe96d0a..c0cb783aa16a 100644
--- a/kernel/rcutree_plugin.h
+++ b/kernel/rcutree_plugin.h
@@ -410,6 +410,15 @@ static int rcu_preempt_needs_cpu(int cpu)
410 return !!per_cpu(rcu_preempt_data, cpu).nxtlist; 410 return !!per_cpu(rcu_preempt_data, cpu).nxtlist;
411} 411}
412 412
413/**
414 * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
415 */
416void rcu_barrier(void)
417{
418 _rcu_barrier(&rcu_preempt_state, call_rcu);
419}
420EXPORT_SYMBOL_GPL(rcu_barrier);
421
413/* 422/*
414 * Initialize preemptable RCU's per-CPU data. 423 * Initialize preemptable RCU's per-CPU data.
415 */ 424 */
@@ -419,6 +428,14 @@ static void __cpuinit rcu_preempt_init_percpu_data(int cpu)
419} 428}
420 429
421/* 430/*
431 * Move preemptable RCU's callbacks to ->orphan_cbs_list.
432 */
433static void rcu_preempt_send_cbs_to_orphanage(void)
434{
435 rcu_send_cbs_to_orphanage(&rcu_preempt_state);
436}
437
438/*
422 * Initialize preemptable RCU's state structures. 439 * Initialize preemptable RCU's state structures.
423 */ 440 */
424static void __init __rcu_init_preempt(void) 441static void __init __rcu_init_preempt(void)
@@ -564,6 +581,16 @@ static int rcu_preempt_needs_cpu(int cpu)
564} 581}
565 582
566/* 583/*
584 * Because preemptable RCU does not exist, rcu_barrier() is just
585 * another name for rcu_barrier_sched().
586 */
587void rcu_barrier(void)
588{
589 rcu_barrier_sched();
590}
591EXPORT_SYMBOL_GPL(rcu_barrier);
592
593/*
567 * Because preemptable RCU does not exist, there is no per-CPU 594 * Because preemptable RCU does not exist, there is no per-CPU
568 * data to initialize. 595 * data to initialize.
569 */ 596 */
@@ -572,6 +599,13 @@ static void __cpuinit rcu_preempt_init_percpu_data(int cpu)
572} 599}
573 600
574/* 601/*
602 * Because there is no preemptable RCU, there are no callbacks to move.
603 */
604static void rcu_preempt_send_cbs_to_orphanage(void)
605{
606}
607
608/*
575 * Because preemptable RCU does not exist, it need not be initialized. 609 * Because preemptable RCU does not exist, it need not be initialized.
576 */ 610 */
577static void __init __rcu_init_preempt(void) 611static void __init __rcu_init_preempt(void)
diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
index f09af28b8262..4b31c779e62e 100644
--- a/kernel/rcutree_trace.c
+++ b/kernel/rcutree_trace.c
@@ -159,13 +159,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
159 struct rcu_node *rnp; 159 struct rcu_node *rnp;
160 160
161 seq_printf(m, "c=%ld g=%ld s=%d jfq=%ld j=%x " 161 seq_printf(m, "c=%ld g=%ld s=%d jfq=%ld j=%x "
162 "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n", 162 "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld\n",
163 rsp->completed, rsp->gpnum, rsp->signaled, 163 rsp->completed, rsp->gpnum, rsp->signaled,
164 (long)(rsp->jiffies_force_qs - jiffies), 164 (long)(rsp->jiffies_force_qs - jiffies),
165 (int)(jiffies & 0xffff), 165 (int)(jiffies & 0xffff),
166 rsp->n_force_qs, rsp->n_force_qs_ngp, 166 rsp->n_force_qs, rsp->n_force_qs_ngp,
167 rsp->n_force_qs - rsp->n_force_qs_ngp, 167 rsp->n_force_qs - rsp->n_force_qs_ngp,
168 rsp->n_force_qs_lh); 168 rsp->n_force_qs_lh, rsp->orphan_qlen);
169 for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) { 169 for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
170 if (rnp->level != level) { 170 if (rnp->level != level) {
171 seq_puts(m, "\n"); 171 seq_puts(m, "\n");