aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPaul E. McKenney <paulmck@linux.vnet.ibm.com>2014-12-18 15:31:27 -0500
committerPaul E. McKenney <paulmck@linux.vnet.ibm.com>2015-01-06 14:01:15 -0500
commit41050a009640f9f330a5b916563ca7faf853a98c (patch)
treee3788995b7ae651215a39ac1e98fb2b1a81aa53c
parent87af9e7ff9d909e70a006ca0974466e2a1d8db0a (diff)
rcu: Fix rcu_barrier() race that could result in too-short wait
The rcu_barrier() no-callbacks check for no-CBs CPUs has race conditions. It checks a given CPU's lists of callbacks, and if all three no-CBs lists are empty, ignores that CPU. However, these three lists could potentially be empty even when callbacks are present if the check executed just as the callbacks were being moved from one list to another. It turns out that recent versions of rcutorture can spot this race. This commit plugs this hole by consolidating the per-list counts of no-CBs callbacks into a single count, which is incremented before the corresponding callback is posted and after it is invoked. Then rcu_barrier() checks this single count to reliably determine whether the corresponding CPU has no-CBs callbacks. Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
-rw-r--r--kernel/rcu/tree.c1
-rw-r--r--kernel/rcu/tree.h29
-rw-r--r--kernel/rcu/tree_plugin.h45
3 files changed, 36 insertions, 39 deletions
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 203b50d7ecbd..5b9f3b972a79 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -3344,6 +3344,7 @@ static void _rcu_barrier(struct rcu_state *rsp)
3344 } else { 3344 } else {
3345 _rcu_barrier_trace(rsp, "OnlineNoCB", cpu, 3345 _rcu_barrier_trace(rsp, "OnlineNoCB", cpu,
3346 rsp->n_barrier_done); 3346 rsp->n_barrier_done);
3347 smp_mb__before_atomic();
3347 atomic_inc(&rsp->barrier_cpu_count); 3348 atomic_inc(&rsp->barrier_cpu_count);
3348 __call_rcu(&rdp->barrier_head, 3349 __call_rcu(&rdp->barrier_head,
3349 rcu_barrier_callback, rsp, cpu, 0); 3350 rcu_barrier_callback, rsp, cpu, 0);
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index 8e7b1843896e..cb5908672f11 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -340,14 +340,10 @@ struct rcu_data {
340#ifdef CONFIG_RCU_NOCB_CPU 340#ifdef CONFIG_RCU_NOCB_CPU
341 struct rcu_head *nocb_head; /* CBs waiting for kthread. */ 341 struct rcu_head *nocb_head; /* CBs waiting for kthread. */
342 struct rcu_head **nocb_tail; 342 struct rcu_head **nocb_tail;
343 atomic_long_t nocb_q_count; /* # CBs waiting for kthread */ 343 atomic_long_t nocb_q_count; /* # CBs waiting for nocb */
344 atomic_long_t nocb_q_count_lazy; /* (approximate). */ 344 atomic_long_t nocb_q_count_lazy; /* invocation (all stages). */
345 struct rcu_head *nocb_follower_head; /* CBs ready to invoke. */ 345 struct rcu_head *nocb_follower_head; /* CBs ready to invoke. */
346 struct rcu_head **nocb_follower_tail; 346 struct rcu_head **nocb_follower_tail;
347 atomic_long_t nocb_follower_count; /* # CBs ready to invoke. */
348 atomic_long_t nocb_follower_count_lazy; /* (approximate). */
349 int nocb_p_count; /* # CBs being invoked by kthread */
350 int nocb_p_count_lazy; /* (approximate). */
351 wait_queue_head_t nocb_wq; /* For nocb kthreads to sleep on. */ 347 wait_queue_head_t nocb_wq; /* For nocb kthreads to sleep on. */
352 struct task_struct *nocb_kthread; 348 struct task_struct *nocb_kthread;
353 int nocb_defer_wakeup; /* Defer wakeup of nocb_kthread. */ 349 int nocb_defer_wakeup; /* Defer wakeup of nocb_kthread. */
@@ -356,8 +352,6 @@ struct rcu_data {
356 struct rcu_head *nocb_gp_head ____cacheline_internodealigned_in_smp; 352 struct rcu_head *nocb_gp_head ____cacheline_internodealigned_in_smp;
357 /* CBs waiting for GP. */ 353 /* CBs waiting for GP. */
358 struct rcu_head **nocb_gp_tail; 354 struct rcu_head **nocb_gp_tail;
359 long nocb_gp_count;
360 long nocb_gp_count_lazy;
361 bool nocb_leader_sleep; /* Is the nocb leader thread asleep? */ 355 bool nocb_leader_sleep; /* Is the nocb leader thread asleep? */
362 struct rcu_data *nocb_next_follower; 356 struct rcu_data *nocb_next_follower;
363 /* Next follower in wakeup chain. */ 357 /* Next follower in wakeup chain. */
@@ -622,24 +616,15 @@ static void rcu_dynticks_task_exit(void);
622#endif /* #ifndef RCU_TREE_NONCORE */ 616#endif /* #ifndef RCU_TREE_NONCORE */
623 617
624#ifdef CONFIG_RCU_TRACE 618#ifdef CONFIG_RCU_TRACE
625#ifdef CONFIG_RCU_NOCB_CPU 619/* Read out queue lengths for tracing. */
626/* Sum up queue lengths for tracing. */
627static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll) 620static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll)
628{ 621{
629 *ql = atomic_long_read(&rdp->nocb_q_count) + 622#ifdef CONFIG_RCU_NOCB_CPU
630 rdp->nocb_p_count + 623 *ql = atomic_long_read(&rdp->nocb_q_count);
631 atomic_long_read(&rdp->nocb_follower_count) + 624 *qll = atomic_long_read(&rdp->nocb_q_count_lazy);
632 rdp->nocb_p_count + rdp->nocb_gp_count;
633 *qll = atomic_long_read(&rdp->nocb_q_count_lazy) +
634 rdp->nocb_p_count_lazy +
635 atomic_long_read(&rdp->nocb_follower_count_lazy) +
636 rdp->nocb_p_count_lazy + rdp->nocb_gp_count_lazy;
637}
638#else /* #ifdef CONFIG_RCU_NOCB_CPU */ 625#else /* #ifdef CONFIG_RCU_NOCB_CPU */
639static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll)
640{
641 *ql = 0; 626 *ql = 0;
642 *qll = 0; 627 *qll = 0;
643}
644#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */ 628#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
629}
645#endif /* #ifdef CONFIG_RCU_TRACE */ 630#endif /* #ifdef CONFIG_RCU_TRACE */
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
index 3ec85cb5d544..e5c43b7f63f2 100644
--- a/kernel/rcu/tree_plugin.h
+++ b/kernel/rcu/tree_plugin.h
@@ -2056,9 +2056,26 @@ static void wake_nocb_leader(struct rcu_data *rdp, bool force)
2056static bool rcu_nocb_cpu_needs_barrier(struct rcu_state *rsp, int cpu) 2056static bool rcu_nocb_cpu_needs_barrier(struct rcu_state *rsp, int cpu)
2057{ 2057{
2058 struct rcu_data *rdp = per_cpu_ptr(rsp->rda, cpu); 2058 struct rcu_data *rdp = per_cpu_ptr(rsp->rda, cpu);
2059 unsigned long ret;
2060#ifdef CONFIG_PROVE_RCU
2059 struct rcu_head *rhp; 2061 struct rcu_head *rhp;
2062#endif /* #ifdef CONFIG_PROVE_RCU */
2063
2064 /*
2065 * Check count of all no-CBs callbacks awaiting invocation.
2066 * There needs to be a barrier before this function is called,
2067 * but associated with a prior determination that no more
2068 * callbacks would be posted. In the worst case, the first
2069 * barrier in _rcu_barrier() suffices (but the caller cannot
2070 * necessarily rely on this, not a substitute for the caller
2071 * getting the concurrency design right!). There must also be
2072 * a barrier between the following load an posting of a callback
2073 * (if a callback is in fact needed). This is associated with an
2074 * atomic_inc() in the caller.
2075 */
2076 ret = atomic_long_read(&rdp->nocb_q_count);
2060 2077
2061 /* No-CBs CPUs might have callbacks on any of three lists. */ 2078#ifdef CONFIG_PROVE_RCU
2062 rhp = ACCESS_ONCE(rdp->nocb_head); 2079 rhp = ACCESS_ONCE(rdp->nocb_head);
2063 if (!rhp) 2080 if (!rhp)
2064 rhp = ACCESS_ONCE(rdp->nocb_gp_head); 2081 rhp = ACCESS_ONCE(rdp->nocb_gp_head);
@@ -2072,8 +2089,9 @@ static bool rcu_nocb_cpu_needs_barrier(struct rcu_state *rsp, int cpu)
2072 cpu, rhp->func); 2089 cpu, rhp->func);
2073 WARN_ON_ONCE(1); 2090 WARN_ON_ONCE(1);
2074 } 2091 }
2092#endif /* #ifdef CONFIG_PROVE_RCU */
2075 2093
2076 return !!rhp; 2094 return !!ret;
2077} 2095}
2078 2096
2079/* 2097/*
@@ -2095,9 +2113,10 @@ static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
2095 struct task_struct *t; 2113 struct task_struct *t;
2096 2114
2097 /* Enqueue the callback on the nocb list and update counts. */ 2115 /* Enqueue the callback on the nocb list and update counts. */
2116 atomic_long_add(rhcount, &rdp->nocb_q_count);
2117 /* rcu_barrier() relies on ->nocb_q_count add before xchg. */
2098 old_rhpp = xchg(&rdp->nocb_tail, rhtp); 2118 old_rhpp = xchg(&rdp->nocb_tail, rhtp);
2099 ACCESS_ONCE(*old_rhpp) = rhp; 2119 ACCESS_ONCE(*old_rhpp) = rhp;
2100 atomic_long_add(rhcount, &rdp->nocb_q_count);
2101 atomic_long_add(rhcount_lazy, &rdp->nocb_q_count_lazy); 2120 atomic_long_add(rhcount_lazy, &rdp->nocb_q_count_lazy);
2102 smp_mb__after_atomic(); /* Store *old_rhpp before _wake test. */ 2121 smp_mb__after_atomic(); /* Store *old_rhpp before _wake test. */
2103 2122
@@ -2288,9 +2307,6 @@ wait_again:
2288 /* Move callbacks to wait-for-GP list, which is empty. */ 2307 /* Move callbacks to wait-for-GP list, which is empty. */
2289 ACCESS_ONCE(rdp->nocb_head) = NULL; 2308 ACCESS_ONCE(rdp->nocb_head) = NULL;
2290 rdp->nocb_gp_tail = xchg(&rdp->nocb_tail, &rdp->nocb_head); 2309 rdp->nocb_gp_tail = xchg(&rdp->nocb_tail, &rdp->nocb_head);
2291 rdp->nocb_gp_count = atomic_long_xchg(&rdp->nocb_q_count, 0);
2292 rdp->nocb_gp_count_lazy =
2293 atomic_long_xchg(&rdp->nocb_q_count_lazy, 0);
2294 gotcbs = true; 2310 gotcbs = true;
2295 } 2311 }
2296 2312
@@ -2338,9 +2354,6 @@ wait_again:
2338 /* Append callbacks to follower's "done" list. */ 2354 /* Append callbacks to follower's "done" list. */
2339 tail = xchg(&rdp->nocb_follower_tail, rdp->nocb_gp_tail); 2355 tail = xchg(&rdp->nocb_follower_tail, rdp->nocb_gp_tail);
2340 *tail = rdp->nocb_gp_head; 2356 *tail = rdp->nocb_gp_head;
2341 atomic_long_add(rdp->nocb_gp_count, &rdp->nocb_follower_count);
2342 atomic_long_add(rdp->nocb_gp_count_lazy,
2343 &rdp->nocb_follower_count_lazy);
2344 smp_mb__after_atomic(); /* Store *tail before wakeup. */ 2357 smp_mb__after_atomic(); /* Store *tail before wakeup. */
2345 if (rdp != my_rdp && tail == &rdp->nocb_follower_head) { 2358 if (rdp != my_rdp && tail == &rdp->nocb_follower_head) {
2346 /* 2359 /*
@@ -2415,13 +2428,11 @@ static int rcu_nocb_kthread(void *arg)
2415 trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "WokeNonEmpty"); 2428 trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "WokeNonEmpty");
2416 ACCESS_ONCE(rdp->nocb_follower_head) = NULL; 2429 ACCESS_ONCE(rdp->nocb_follower_head) = NULL;
2417 tail = xchg(&rdp->nocb_follower_tail, &rdp->nocb_follower_head); 2430 tail = xchg(&rdp->nocb_follower_tail, &rdp->nocb_follower_head);
2418 c = atomic_long_xchg(&rdp->nocb_follower_count, 0);
2419 cl = atomic_long_xchg(&rdp->nocb_follower_count_lazy, 0);
2420 rdp->nocb_p_count += c;
2421 rdp->nocb_p_count_lazy += cl;
2422 2431
2423 /* Each pass through the following loop invokes a callback. */ 2432 /* Each pass through the following loop invokes a callback. */
2424 trace_rcu_batch_start(rdp->rsp->name, cl, c, -1); 2433 trace_rcu_batch_start(rdp->rsp->name,
2434 atomic_long_read(&rdp->nocb_q_count_lazy),
2435 atomic_long_read(&rdp->nocb_q_count), -1);
2425 c = cl = 0; 2436 c = cl = 0;
2426 while (list) { 2437 while (list) {
2427 next = list->next; 2438 next = list->next;
@@ -2443,9 +2454,9 @@ static int rcu_nocb_kthread(void *arg)
2443 list = next; 2454 list = next;
2444 } 2455 }
2445 trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1); 2456 trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1);
2446 ACCESS_ONCE(rdp->nocb_p_count) = rdp->nocb_p_count - c; 2457 smp_mb__before_atomic(); /* _add after CB invocation. */
2447 ACCESS_ONCE(rdp->nocb_p_count_lazy) = 2458 atomic_long_add(-c, &rdp->nocb_q_count);
2448 rdp->nocb_p_count_lazy - cl; 2459 atomic_long_add(-cl, &rdp->nocb_q_count_lazy);
2449 rdp->n_nocbs_invoked += c; 2460 rdp->n_nocbs_invoked += c;
2450 } 2461 }
2451 return 0; 2462 return 0;