aboutsummaryrefslogtreecommitdiffstats
path: root/kernel
diff options
context:
space:
mode:
authorPaul E. McKenney <paulmck@linux.vnet.ibm.com>2017-04-05 12:01:53 -0400
committerPaul E. McKenney <paulmck@linux.vnet.ibm.com>2017-04-21 08:59:26 -0400
commitda915ad5cf25b5f5d358dd3670c3378d8ae8c03e (patch)
treeed8c8277867809e59e724aa99fac393bc113eb2a /kernel
parent6ade8694f471d847500c7cec152cc15171cef5d5 (diff)
srcu: Parallelize callback handling
Peter Zijlstra proposed using SRCU to reduce mmap_sem contention [1,2], however, there are workloads that could result in a high volume of concurrent invocations of call_srcu(), which with current SRCU would result in excessive lock contention on the srcu_struct structure's ->queue_lock, which protects SRCU's callback lists. This commit therefore moves SRCU to per-CPU callback lists, thus greatly reducing contention. Because a given SRCU instance no longer has a single centralized callback list, starting grace periods and invoking callbacks are both more complex than in the single-list Classic SRCU implementation. Starting grace periods and handling callbacks are now handled using an srcu_node tree that is in some ways similar to the rcu_node trees used by RCU-bh, RCU-preempt, and RCU-sched (for example, the srcu_node tree shape is controlled by exactly the same Kconfig options and boot parameters that control the shape of the rcu_node tree). In addition, the old per-CPU srcu_array structure is now named srcu_data and contains an rcu_segcblist structure named ->srcu_cblist for its callbacks (and a spinlock to protect this). The srcu_struct gets an srcu_gp_seq that is used to associate callback segments with the corresponding completion-time grace-period number. These completion-time grace-period numbers are propagated up the srcu_node tree so that the grace-period workqueue handler can determine whether additional grace periods are needed on the one hand and where to look for callbacks that are ready to be invoked. The srcu_barrier() function must now wait on all instances of the per-CPU ->srcu_cblist. Because each ->srcu_cblist is protected by ->lock, srcu_barrier() can remotely add the needed callbacks. In theory, it could also remotely start grace periods, but in practice doing so is complex and racy. And interestingly enough, it is never necessary for srcu_barrier() to start a grace period because srcu_barrier() only enqueues a callback when a callback is already present--and it turns out that a grace period has to have already been started for this pre-existing callback. Furthermore, it is only the callback that srcu_barrier() needs to wait on, not any particular grace period. Therefore, a new rcu_segcblist_entrain() function enqueues the srcu_barrier() function's callback into the same segment occupied by the last pre-existing callback in the list. The special case where all the pre-existing callbacks are on a different list (because they are in the process of being invoked) is handled by enqueuing srcu_barrier()'s callback into the RCU_DONE_TAIL segment, relying on the done-callbacks check that takes place after all callbacks are inovked. Note that the readers use the same algorithm as before. Note that there is a separate srcu_idx that tells the readers what counter to increment. This unfortunately cannot be combined with srcu_gp_seq because they need to be incremented at different times. This commit introduces some ugly #ifdefs in rcutorture. These will go away when I feel good enough about Tree SRCU to ditch Classic SRCU. Some crude performance comparisons, courtesy of a quickly hacked rcuperf asynchronous-grace-period capability: Callback Queuing Overhead ------------------------- # CPUS Classic SRCU Tree SRCU ------ ------------ --------- 2 0.349 us 0.342 us 16 31.66 us 0.4 us 41 --------- 0.417 us The times are the 90th percentiles, a statistic that was chosen to reject the overheads of the occasional srcu_barrier() call needed to avoid OOMing the test machine. The rcuperf test hangs when running Classic SRCU at 41 CPUs, hence the line of dashes. Despite the hacks to both the rcuperf code and that statistics, this is a convincing demonstration of Tree SRCU's performance and scalability advantages. [1] https://lwn.net/Articles/309030/ [2] https://patchwork.kernel.org/patch/5108281/ Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com> [ paulmck: Fix initialization if synchronize_srcu_expedited() called first. ]
Diffstat (limited to 'kernel')
-rw-r--r--kernel/rcu/rcutorture.c20
-rw-r--r--kernel/rcu/srcutree.c642
-rw-r--r--kernel/rcu/tree.c6
-rw-r--r--kernel/rcu/tree.h8
4 files changed, 545 insertions, 131 deletions
diff --git a/kernel/rcu/rcutorture.c b/kernel/rcu/rcutorture.c
index 6f344b6748a8..e9d4527cdd43 100644
--- a/kernel/rcu/rcutorture.c
+++ b/kernel/rcu/rcutorture.c
@@ -563,17 +563,30 @@ static void srcu_torture_stats(void)
563 int idx; 563 int idx;
564 564
565#if defined(CONFIG_TREE_SRCU) || defined(CONFIG_CLASSIC_SRCU) 565#if defined(CONFIG_TREE_SRCU) || defined(CONFIG_CLASSIC_SRCU)
566#ifdef CONFIG_TREE_SRCU
567 idx = srcu_ctlp->srcu_idx & 0x1;
568#else /* #ifdef CONFIG_TREE_SRCU */
566 idx = srcu_ctlp->completed & 0x1; 569 idx = srcu_ctlp->completed & 0x1;
570#endif /* #else #ifdef CONFIG_TREE_SRCU */
567 pr_alert("%s%s Tree SRCU per-CPU(idx=%d):", 571 pr_alert("%s%s Tree SRCU per-CPU(idx=%d):",
568 torture_type, TORTURE_FLAG, idx); 572 torture_type, TORTURE_FLAG, idx);
569 for_each_possible_cpu(cpu) { 573 for_each_possible_cpu(cpu) {
570 unsigned long l0, l1; 574 unsigned long l0, l1;
571 unsigned long u0, u1; 575 unsigned long u0, u1;
572 long c0, c1; 576 long c0, c1;
573 struct srcu_array *counts = per_cpu_ptr(srcu_ctlp->per_cpu_ref, cpu); 577#ifdef CONFIG_TREE_SRCU
578 struct srcu_data *counts;
574 579
580 counts = per_cpu_ptr(srcu_ctlp->sda, cpu);
581 u0 = counts->srcu_unlock_count[!idx];
582 u1 = counts->srcu_unlock_count[idx];
583#else /* #ifdef CONFIG_TREE_SRCU */
584 struct srcu_array *counts;
585
586 counts = per_cpu_ptr(srcu_ctlp->per_cpu_ref, cpu);
575 u0 = counts->unlock_count[!idx]; 587 u0 = counts->unlock_count[!idx];
576 u1 = counts->unlock_count[idx]; 588 u1 = counts->unlock_count[idx];
589#endif /* #else #ifdef CONFIG_TREE_SRCU */
577 590
578 /* 591 /*
579 * Make sure that a lock is always counted if the corresponding 592 * Make sure that a lock is always counted if the corresponding
@@ -581,8 +594,13 @@ static void srcu_torture_stats(void)
581 */ 594 */
582 smp_rmb(); 595 smp_rmb();
583 596
597#ifdef CONFIG_TREE_SRCU
598 l0 = counts->srcu_lock_count[!idx];
599 l1 = counts->srcu_lock_count[idx];
600#else /* #ifdef CONFIG_TREE_SRCU */
584 l0 = counts->lock_count[!idx]; 601 l0 = counts->lock_count[!idx];
585 l1 = counts->lock_count[idx]; 602 l1 = counts->lock_count[idx];
603#endif /* #else #ifdef CONFIG_TREE_SRCU */
586 604
587 c0 = l0 - u0; 605 c0 = l0 - u0;
588 c1 = l1 - u1; 606 c1 = l1 - u1;
diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
index da676b0d016b..12feeca18f46 100644
--- a/kernel/rcu/srcutree.c
+++ b/kernel/rcu/srcutree.c
@@ -36,19 +36,110 @@
36#include <linux/delay.h> 36#include <linux/delay.h>
37#include <linux/srcu.h> 37#include <linux/srcu.h>
38 38
39#include <linux/rcu_node_tree.h>
40#include "rcu.h" 39#include "rcu.h"
41 40
42static int init_srcu_struct_fields(struct srcu_struct *sp) 41static void srcu_invoke_callbacks(struct work_struct *work);
42static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay);
43
44/*
45 * Initialize SRCU combining tree. Note that statically allocated
46 * srcu_struct structures might already have srcu_read_lock() and
47 * srcu_read_unlock() running against them. So if the is_static parameter
48 * is set, don't initialize ->srcu_lock_count[] and ->srcu_unlock_count[].
49 */
50static void init_srcu_struct_nodes(struct srcu_struct *sp, bool is_static)
43{ 51{
44 sp->completed = 0; 52 int cpu;
53 int i;
54 int level = 0;
55 int levelspread[RCU_NUM_LVLS];
56 struct srcu_data *sdp;
57 struct srcu_node *snp;
58 struct srcu_node *snp_first;
59
60 /* Work out the overall tree geometry. */
61 sp->level[0] = &sp->node[0];
62 for (i = 1; i < rcu_num_lvls; i++)
63 sp->level[i] = sp->level[i - 1] + num_rcu_lvl[i - 1];
64 rcu_init_levelspread(levelspread, num_rcu_lvl);
65
66 /* Each pass through this loop initializes one srcu_node structure. */
67 rcu_for_each_node_breadth_first(sp, snp) {
68 spin_lock_init(&snp->lock);
69 for (i = 0; i < ARRAY_SIZE(snp->srcu_have_cbs); i++)
70 snp->srcu_have_cbs[i] = 0;
71 snp->grplo = -1;
72 snp->grphi = -1;
73 if (snp == &sp->node[0]) {
74 /* Root node, special case. */
75 snp->srcu_parent = NULL;
76 continue;
77 }
78
79 /* Non-root node. */
80 if (snp == sp->level[level + 1])
81 level++;
82 snp->srcu_parent = sp->level[level - 1] +
83 (snp - sp->level[level]) /
84 levelspread[level - 1];
85 }
86
87 /*
88 * Initialize the per-CPU srcu_data array, which feeds into the
89 * leaves of the srcu_node tree.
90 */
91 WARN_ON_ONCE(ARRAY_SIZE(sdp->srcu_lock_count) !=
92 ARRAY_SIZE(sdp->srcu_unlock_count));
93 level = rcu_num_lvls - 1;
94 snp_first = sp->level[level];
95 for_each_possible_cpu(cpu) {
96 sdp = per_cpu_ptr(sp->sda, cpu);
97 spin_lock_init(&sdp->lock);
98 rcu_segcblist_init(&sdp->srcu_cblist);
99 sdp->srcu_cblist_invoking = false;
100 sdp->srcu_gp_seq_needed = sp->srcu_gp_seq;
101 sdp->mynode = &snp_first[cpu / levelspread[level]];
102 for (snp = sdp->mynode; snp != NULL; snp = snp->srcu_parent) {
103 if (snp->grplo < 0)
104 snp->grplo = cpu;
105 snp->grphi = cpu;
106 }
107 sdp->cpu = cpu;
108 INIT_DELAYED_WORK(&sdp->work, srcu_invoke_callbacks);
109 sdp->sp = sp;
110 if (is_static)
111 continue;
112
113 /* Dynamically allocated, better be no srcu_read_locks()! */
114 for (i = 0; i < ARRAY_SIZE(sdp->srcu_lock_count); i++) {
115 sdp->srcu_lock_count[i] = 0;
116 sdp->srcu_unlock_count[i] = 0;
117 }
118 }
119}
120
121/*
122 * Initialize non-compile-time initialized fields, including the
123 * associated srcu_node and srcu_data structures. The is_static
124 * parameter is passed through to init_srcu_struct_nodes(), and
125 * also tells us that ->sda has already been wired up to srcu_data.
126 */
127static int init_srcu_struct_fields(struct srcu_struct *sp, bool is_static)
128{
129 mutex_init(&sp->srcu_cb_mutex);
130 mutex_init(&sp->srcu_gp_mutex);
131 sp->srcu_idx = 0;
45 sp->srcu_gp_seq = 0; 132 sp->srcu_gp_seq = 0;
46 atomic_set(&sp->srcu_exp_cnt, 0); 133 atomic_set(&sp->srcu_exp_cnt, 0);
47 spin_lock_init(&sp->queue_lock); 134 sp->srcu_barrier_seq = 0;
48 rcu_segcblist_init(&sp->srcu_cblist); 135 mutex_init(&sp->srcu_barrier_mutex);
136 atomic_set(&sp->srcu_barrier_cpu_cnt, 0);
49 INIT_DELAYED_WORK(&sp->work, process_srcu); 137 INIT_DELAYED_WORK(&sp->work, process_srcu);
50 sp->per_cpu_ref = alloc_percpu(struct srcu_array); 138 if (!is_static)
51 return sp->per_cpu_ref ? 0 : -ENOMEM; 139 sp->sda = alloc_percpu(struct srcu_data);
140 init_srcu_struct_nodes(sp, is_static);
141 smp_store_release(&sp->srcu_gp_seq_needed, 0); /* Init done. */
142 return sp->sda ? 0 : -ENOMEM;
52} 143}
53 144
54#ifdef CONFIG_DEBUG_LOCK_ALLOC 145#ifdef CONFIG_DEBUG_LOCK_ALLOC
@@ -59,7 +150,8 @@ int __init_srcu_struct(struct srcu_struct *sp, const char *name,
59 /* Don't re-initialize a lock while it is held. */ 150 /* Don't re-initialize a lock while it is held. */
60 debug_check_no_locks_freed((void *)sp, sizeof(*sp)); 151 debug_check_no_locks_freed((void *)sp, sizeof(*sp));
61 lockdep_init_map(&sp->dep_map, name, key, 0); 152 lockdep_init_map(&sp->dep_map, name, key, 0);
62 return init_srcu_struct_fields(sp); 153 spin_lock_init(&sp->gp_lock);
154 return init_srcu_struct_fields(sp, false);
63} 155}
64EXPORT_SYMBOL_GPL(__init_srcu_struct); 156EXPORT_SYMBOL_GPL(__init_srcu_struct);
65 157
@@ -75,15 +167,41 @@ EXPORT_SYMBOL_GPL(__init_srcu_struct);
75 */ 167 */
76int init_srcu_struct(struct srcu_struct *sp) 168int init_srcu_struct(struct srcu_struct *sp)
77{ 169{
78 return init_srcu_struct_fields(sp); 170 spin_lock_init(&sp->gp_lock);
171 return init_srcu_struct_fields(sp, false);
79} 172}
80EXPORT_SYMBOL_GPL(init_srcu_struct); 173EXPORT_SYMBOL_GPL(init_srcu_struct);
81 174
82#endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */ 175#endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */
83 176
84/* 177/*
85 * Returns approximate total of the readers' ->lock_count[] values for the 178 * First-use initialization of statically allocated srcu_struct
86 * rank of per-CPU counters specified by idx. 179 * structure. Wiring up the combining tree is more than can be
180 * done with compile-time initialization, so this check is added
181 * to each update-side SRCU primitive. Use ->gp_lock, which -is-
182 * compile-time initialized, to resolve races involving multiple
183 * CPUs trying to garner first-use privileges.
184 */
185static void check_init_srcu_struct(struct srcu_struct *sp)
186{
187 unsigned long flags;
188
189 WARN_ON_ONCE(rcu_scheduler_active == RCU_SCHEDULER_INIT);
190 /* The smp_load_acquire() pairs with the smp_store_release(). */
191 if (!rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq_needed))) /*^^^*/
192 return; /* Already initialized. */
193 spin_lock_irqsave(&sp->gp_lock, flags);
194 if (!rcu_seq_state(sp->srcu_gp_seq_needed)) {
195 spin_unlock_irqrestore(&sp->gp_lock, flags);
196 return;
197 }
198 init_srcu_struct_fields(sp, true);
199 spin_unlock_irqrestore(&sp->gp_lock, flags);
200}
201
202/*
203 * Returns approximate total of the readers' ->srcu_lock_count[] values
204 * for the rank of per-CPU counters specified by idx.
87 */ 205 */
88static unsigned long srcu_readers_lock_idx(struct srcu_struct *sp, int idx) 206static unsigned long srcu_readers_lock_idx(struct srcu_struct *sp, int idx)
89{ 207{
@@ -91,16 +209,16 @@ static unsigned long srcu_readers_lock_idx(struct srcu_struct *sp, int idx)
91 unsigned long sum = 0; 209 unsigned long sum = 0;
92 210
93 for_each_possible_cpu(cpu) { 211 for_each_possible_cpu(cpu) {
94 struct srcu_array *cpuc = per_cpu_ptr(sp->per_cpu_ref, cpu); 212 struct srcu_data *cpuc = per_cpu_ptr(sp->sda, cpu);
95 213
96 sum += READ_ONCE(cpuc->lock_count[idx]); 214 sum += READ_ONCE(cpuc->srcu_lock_count[idx]);
97 } 215 }
98 return sum; 216 return sum;
99} 217}
100 218
101/* 219/*
102 * Returns approximate total of the readers' ->unlock_count[] values for the 220 * Returns approximate total of the readers' ->srcu_unlock_count[] values
103 * rank of per-CPU counters specified by idx. 221 * for the rank of per-CPU counters specified by idx.
104 */ 222 */
105static unsigned long srcu_readers_unlock_idx(struct srcu_struct *sp, int idx) 223static unsigned long srcu_readers_unlock_idx(struct srcu_struct *sp, int idx)
106{ 224{
@@ -108,9 +226,9 @@ static unsigned long srcu_readers_unlock_idx(struct srcu_struct *sp, int idx)
108 unsigned long sum = 0; 226 unsigned long sum = 0;
109 227
110 for_each_possible_cpu(cpu) { 228 for_each_possible_cpu(cpu) {
111 struct srcu_array *cpuc = per_cpu_ptr(sp->per_cpu_ref, cpu); 229 struct srcu_data *cpuc = per_cpu_ptr(sp->sda, cpu);
112 230
113 sum += READ_ONCE(cpuc->unlock_count[idx]); 231 sum += READ_ONCE(cpuc->srcu_unlock_count[idx]);
114 } 232 }
115 return sum; 233 return sum;
116} 234}
@@ -145,14 +263,14 @@ static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx)
145 * the current index but not have incremented the lock counter yet. 263 * the current index but not have incremented the lock counter yet.
146 * 264 *
147 * Possible bug: There is no guarantee that there haven't been 265 * Possible bug: There is no guarantee that there haven't been
148 * ULONG_MAX increments of ->lock_count[] since the unlocks were 266 * ULONG_MAX increments of ->srcu_lock_count[] since the unlocks were
149 * counted, meaning that this could return true even if there are 267 * counted, meaning that this could return true even if there are
150 * still active readers. Since there are no memory barriers around 268 * still active readers. Since there are no memory barriers around
151 * srcu_flip(), the CPU is not required to increment ->completed 269 * srcu_flip(), the CPU is not required to increment ->srcu_idx
152 * before running srcu_readers_unlock_idx(), which means that there 270 * before running srcu_readers_unlock_idx(), which means that there
153 * could be an arbitrarily large number of critical sections that 271 * could be an arbitrarily large number of critical sections that
154 * execute after srcu_readers_unlock_idx() but use the old value 272 * execute after srcu_readers_unlock_idx() but use the old value
155 * of ->completed. 273 * of ->srcu_idx.
156 */ 274 */
157 return srcu_readers_lock_idx(sp, idx) == unlocks; 275 return srcu_readers_lock_idx(sp, idx) == unlocks;
158} 276}
@@ -172,12 +290,12 @@ static bool srcu_readers_active(struct srcu_struct *sp)
172 unsigned long sum = 0; 290 unsigned long sum = 0;
173 291
174 for_each_possible_cpu(cpu) { 292 for_each_possible_cpu(cpu) {
175 struct srcu_array *cpuc = per_cpu_ptr(sp->per_cpu_ref, cpu); 293 struct srcu_data *cpuc = per_cpu_ptr(sp->sda, cpu);
176 294
177 sum += READ_ONCE(cpuc->lock_count[0]); 295 sum += READ_ONCE(cpuc->srcu_lock_count[0]);
178 sum += READ_ONCE(cpuc->lock_count[1]); 296 sum += READ_ONCE(cpuc->srcu_lock_count[1]);
179 sum -= READ_ONCE(cpuc->unlock_count[0]); 297 sum -= READ_ONCE(cpuc->srcu_unlock_count[0]);
180 sum -= READ_ONCE(cpuc->unlock_count[1]); 298 sum -= READ_ONCE(cpuc->srcu_unlock_count[1]);
181 } 299 }
182 return sum; 300 return sum;
183} 301}
@@ -193,18 +311,21 @@ static bool srcu_readers_active(struct srcu_struct *sp)
193 */ 311 */
194void cleanup_srcu_struct(struct srcu_struct *sp) 312void cleanup_srcu_struct(struct srcu_struct *sp)
195{ 313{
314 int cpu;
315
196 WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt)); 316 WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt));
197 if (WARN_ON(srcu_readers_active(sp))) 317 if (WARN_ON(srcu_readers_active(sp)))
198 return; /* Leakage unless caller handles error. */ 318 return; /* Leakage unless caller handles error. */
199 if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist)))
200 return; /* Leakage unless caller handles error. */
201 flush_delayed_work(&sp->work); 319 flush_delayed_work(&sp->work);
202 if (WARN_ON(rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) != SRCU_STATE_IDLE)) { 320 for_each_possible_cpu(cpu)
203 pr_info("cleanup_srcu_struct: Active srcu_struct %lu CBs %c state: %d\n", rcu_segcblist_n_cbs(&sp->srcu_cblist), ".E"[rcu_segcblist_empty(&sp->srcu_cblist)], rcu_seq_state(READ_ONCE(sp->srcu_gp_seq))); 321 flush_delayed_work(&per_cpu_ptr(sp->sda, cpu)->work);
322 if (WARN_ON(rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) != SRCU_STATE_IDLE) ||
323 WARN_ON(srcu_readers_active(sp))) {
324 pr_info("cleanup_srcu_struct: Active srcu_struct %p state: %d\n", sp, rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)));
204 return; /* Caller forgot to stop doing call_srcu()? */ 325 return; /* Caller forgot to stop doing call_srcu()? */
205 } 326 }
206 free_percpu(sp->per_cpu_ref); 327 free_percpu(sp->sda);
207 sp->per_cpu_ref = NULL; 328 sp->sda = NULL;
208} 329}
209EXPORT_SYMBOL_GPL(cleanup_srcu_struct); 330EXPORT_SYMBOL_GPL(cleanup_srcu_struct);
210 331
@@ -217,8 +338,8 @@ int __srcu_read_lock(struct srcu_struct *sp)
217{ 338{
218 int idx; 339 int idx;
219 340
220 idx = READ_ONCE(sp->completed) & 0x1; 341 idx = READ_ONCE(sp->srcu_idx) & 0x1;
221 __this_cpu_inc(sp->per_cpu_ref->lock_count[idx]); 342 __this_cpu_inc(sp->sda->srcu_lock_count[idx]);
222 smp_mb(); /* B */ /* Avoid leaking the critical section. */ 343 smp_mb(); /* B */ /* Avoid leaking the critical section. */
223 return idx; 344 return idx;
224} 345}
@@ -233,7 +354,7 @@ EXPORT_SYMBOL_GPL(__srcu_read_lock);
233void __srcu_read_unlock(struct srcu_struct *sp, int idx) 354void __srcu_read_unlock(struct srcu_struct *sp, int idx)
234{ 355{
235 smp_mb(); /* C */ /* Avoid leaking the critical section. */ 356 smp_mb(); /* C */ /* Avoid leaking the critical section. */
236 this_cpu_inc(sp->per_cpu_ref->unlock_count[idx]); 357 this_cpu_inc(sp->sda->srcu_unlock_count[idx]);
237} 358}
238EXPORT_SYMBOL_GPL(__srcu_read_unlock); 359EXPORT_SYMBOL_GPL(__srcu_read_unlock);
239 360
@@ -251,19 +372,207 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock);
251 */ 372 */
252static void srcu_gp_start(struct srcu_struct *sp) 373static void srcu_gp_start(struct srcu_struct *sp)
253{ 374{
375 struct srcu_data *sdp = this_cpu_ptr(sp->sda);
254 int state; 376 int state;
255 377
256 rcu_segcblist_accelerate(&sp->srcu_cblist, 378 RCU_LOCKDEP_WARN(!lockdep_is_held(&sp->gp_lock),
257 rcu_seq_snap(&sp->srcu_gp_seq)); 379 "Invoked srcu_gp_start() without ->gp_lock!");
380 WARN_ON_ONCE(ULONG_CMP_GE(sp->srcu_gp_seq, sp->srcu_gp_seq_needed));
381 rcu_segcblist_advance(&sdp->srcu_cblist,
382 rcu_seq_current(&sp->srcu_gp_seq));
383 (void)rcu_segcblist_accelerate(&sdp->srcu_cblist,
384 rcu_seq_snap(&sp->srcu_gp_seq));
258 rcu_seq_start(&sp->srcu_gp_seq); 385 rcu_seq_start(&sp->srcu_gp_seq);
259 state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)); 386 state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
260 WARN_ON_ONCE(state != SRCU_STATE_SCAN1); 387 WARN_ON_ONCE(state != SRCU_STATE_SCAN1);
261} 388}
262 389
263/* 390/*
391 * Track online CPUs to guide callback workqueue placement.
392 */
393DEFINE_PER_CPU(bool, srcu_online);
394
395void srcu_online_cpu(unsigned int cpu)
396{
397 WRITE_ONCE(per_cpu(srcu_online, cpu), true);
398}
399
400void srcu_offline_cpu(unsigned int cpu)
401{
402 WRITE_ONCE(per_cpu(srcu_online, cpu), false);
403}
404
405/*
406 * Place the workqueue handler on the specified CPU if online, otherwise
407 * just run it whereever. This is useful for placing workqueue handlers
408 * that are to invoke the specified CPU's callbacks.
409 */
410static bool srcu_queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
411 struct delayed_work *dwork,
412 unsigned long delay)
413{
414 bool ret;
415
416 preempt_disable();
417 if (READ_ONCE(per_cpu(srcu_online, cpu)))
418 ret = queue_delayed_work_on(cpu, wq, dwork, delay);
419 else
420 ret = queue_delayed_work(wq, dwork, delay);
421 preempt_enable();
422 return ret;
423}
424
425/*
426 * Schedule callback invocation for the specified srcu_data structure,
427 * if possible, on the corresponding CPU.
428 */
429static void srcu_schedule_cbs_sdp(struct srcu_data *sdp, unsigned long delay)
430{
431 srcu_queue_delayed_work_on(sdp->cpu, system_power_efficient_wq,
432 &sdp->work, delay);
433}
434
435/*
436 * Schedule callback invocation for all srcu_data structures associated
437 * with the specified srcu_node structure, if possible, on the corresponding
438 * CPUs.
439 */
440static void srcu_schedule_cbs_snp(struct srcu_struct *sp, struct srcu_node *snp)
441{
442 int cpu;
443
444 for (cpu = snp->grplo; cpu <= snp->grphi; cpu++)
445 srcu_schedule_cbs_sdp(per_cpu_ptr(sp->sda, cpu), SRCU_INTERVAL);
446}
447
448/*
449 * Note the end of an SRCU grace period. Initiates callback invocation
450 * and starts a new grace period if needed.
451 *
452 * The ->srcu_cb_mutex acquisition does not protect any data, but
453 * instead prevents more than one grace period from starting while we
454 * are initiating callback invocation. This allows the ->srcu_have_cbs[]
455 * array to have a finite number of elements.
456 */
457static void srcu_gp_end(struct srcu_struct *sp)
458{
459 bool cbs;
460 unsigned long gpseq;
461 int idx;
462 int idxnext;
463 struct srcu_node *snp;
464
465 /* Prevent more than one additional grace period. */
466 mutex_lock(&sp->srcu_cb_mutex);
467
468 /* End the current grace period. */
469 spin_lock_irq(&sp->gp_lock);
470 idx = rcu_seq_state(sp->srcu_gp_seq);
471 WARN_ON_ONCE(idx != SRCU_STATE_SCAN2);
472 rcu_seq_end(&sp->srcu_gp_seq);
473 gpseq = rcu_seq_current(&sp->srcu_gp_seq);
474 spin_unlock_irq(&sp->gp_lock);
475 mutex_unlock(&sp->srcu_gp_mutex);
476 /* A new grace period can start at this point. But only one. */
477
478 /* Initiate callback invocation as needed. */
479 idx = rcu_seq_ctr(gpseq) % ARRAY_SIZE(snp->srcu_have_cbs);
480 idxnext = (idx + 1) % ARRAY_SIZE(snp->srcu_have_cbs);
481 rcu_for_each_node_breadth_first(sp, snp) {
482 spin_lock_irq(&snp->lock);
483 cbs = false;
484 if (snp >= sp->level[rcu_num_lvls - 1])
485 cbs = snp->srcu_have_cbs[idx] == gpseq;
486 snp->srcu_have_cbs[idx] = gpseq;
487 rcu_seq_set_state(&snp->srcu_have_cbs[idx], 1);
488 spin_unlock_irq(&snp->lock);
489 if (cbs) {
490 smp_mb(); /* GP end before CB invocation. */
491 srcu_schedule_cbs_snp(sp, snp);
492 }
493 }
494
495 /* Callback initiation done, allow grace periods after next. */
496 mutex_unlock(&sp->srcu_cb_mutex);
497
498 /* Start a new grace period if needed. */
499 spin_lock_irq(&sp->gp_lock);
500 gpseq = rcu_seq_current(&sp->srcu_gp_seq);
501 if (!rcu_seq_state(gpseq) &&
502 ULONG_CMP_LT(gpseq, sp->srcu_gp_seq_needed)) {
503 srcu_gp_start(sp);
504 spin_unlock_irq(&sp->gp_lock);
505 /* Throttle expedited grace periods: Should be rare! */
506 srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) &&
507 rcu_seq_ctr(gpseq) & 0xf
508 ? 0
509 : SRCU_INTERVAL);
510 } else {
511 spin_unlock_irq(&sp->gp_lock);
512 }
513}
514
515/*
516 * Funnel-locking scheme to scalably mediate many concurrent grace-period
517 * requests. The winner has to do the work of actually starting grace
518 * period s. Losers must either ensure that their desired grace-period
519 * number is recorded on at least their leaf srcu_node structure, or they
520 * must take steps to invoke their own callbacks.
521 */
522static void srcu_funnel_gp_start(struct srcu_struct *sp,
523 struct srcu_data *sdp,
524 unsigned long s)
525{
526 unsigned long flags;
527 int idx = rcu_seq_ctr(s) % ARRAY_SIZE(sdp->mynode->srcu_have_cbs);
528 struct srcu_node *snp = sdp->mynode;
529 unsigned long snp_seq;
530
531 /* Each pass through the loop does one level of the srcu_node tree. */
532 for (; snp != NULL; snp = snp->srcu_parent) {
533 if (rcu_seq_done(&sp->srcu_gp_seq, s) && snp != sdp->mynode)
534 return; /* GP already done and CBs recorded. */
535 spin_lock_irqsave(&snp->lock, flags);
536 if (ULONG_CMP_GE(snp->srcu_have_cbs[idx], s)) {
537 snp_seq = snp->srcu_have_cbs[idx];
538 spin_unlock_irqrestore(&snp->lock, flags);
539 if (snp == sdp->mynode && snp_seq != s) {
540 smp_mb(); /* CBs after GP! */
541 srcu_schedule_cbs_sdp(sdp, 0);
542 }
543 return;
544 }
545 snp->srcu_have_cbs[idx] = s;
546 spin_unlock_irqrestore(&snp->lock, flags);
547 }
548
549 /* Top of tree, must ensure the grace period will be started. */
550 spin_lock_irqsave(&sp->gp_lock, flags);
551 if (ULONG_CMP_LT(sp->srcu_gp_seq_needed, s)) {
552 /*
553 * Record need for grace period s. Pair with load
554 * acquire setting up for initialization.
555 */
556 smp_store_release(&sp->srcu_gp_seq_needed, s); /*^^^*/
557 }
558
559 /* If grace period not already done and none in progress, start it. */
560 if (!rcu_seq_done(&sp->srcu_gp_seq, s) &&
561 rcu_seq_state(sp->srcu_gp_seq) == SRCU_STATE_IDLE) {
562 WARN_ON_ONCE(ULONG_CMP_GE(sp->srcu_gp_seq, sp->srcu_gp_seq_needed));
563 srcu_gp_start(sp);
564 queue_delayed_work(system_power_efficient_wq, &sp->work,
565 atomic_read(&sp->srcu_exp_cnt)
566 ? 0
567 : SRCU_INTERVAL);
568 }
569 spin_unlock_irqrestore(&sp->gp_lock, flags);
570}
571
572/*
264 * Wait until all readers counted by array index idx complete, but 573 * Wait until all readers counted by array index idx complete, but
265 * loop an additional time if there is an expedited grace period pending. 574 * loop an additional time if there is an expedited grace period pending.
266 * The caller must ensure that ->completed is not changed while checking. 575 * The caller must ensure that ->srcu_idx is not changed while checking.
267 */ 576 */
268static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount) 577static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount)
269{ 578{
@@ -277,13 +586,13 @@ static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount)
277} 586}
278 587
279/* 588/*
280 * Increment the ->completed counter so that future SRCU readers will 589 * Increment the ->srcu_idx counter so that future SRCU readers will
281 * use the other rank of the ->(un)lock_count[] arrays. This allows 590 * use the other rank of the ->srcu_(un)lock_count[] arrays. This allows
282 * us to wait for pre-existing readers in a starvation-free manner. 591 * us to wait for pre-existing readers in a starvation-free manner.
283 */ 592 */
284static void srcu_flip(struct srcu_struct *sp) 593static void srcu_flip(struct srcu_struct *sp)
285{ 594{
286 WRITE_ONCE(sp->completed, sp->completed + 1); 595 WRITE_ONCE(sp->srcu_idx, sp->srcu_idx + 1);
287 596
288 /* 597 /*
289 * Ensure that if the updater misses an __srcu_read_unlock() 598 * Ensure that if the updater misses an __srcu_read_unlock()
@@ -296,21 +605,9 @@ static void srcu_flip(struct srcu_struct *sp)
296} 605}
297 606
298/* 607/*
299 * End an SRCU grace period. 608 * Enqueue an SRCU callback on the srcu_data structure associated with
300 */ 609 * the current CPU and the specified srcu_struct structure, initiating
301static void srcu_gp_end(struct srcu_struct *sp) 610 * grace-period processing if it is not already running.
302{
303 rcu_seq_end(&sp->srcu_gp_seq);
304
305 spin_lock_irq(&sp->queue_lock);
306 rcu_segcblist_advance(&sp->srcu_cblist,
307 rcu_seq_current(&sp->srcu_gp_seq));
308 spin_unlock_irq(&sp->queue_lock);
309}
310
311/*
312 * Enqueue an SRCU callback on the specified srcu_struct structure,
313 * initiating grace-period processing if it is not already running.
314 * 611 *
315 * Note that all CPUs must agree that the grace period extended beyond 612 * Note that all CPUs must agree that the grace period extended beyond
316 * all pre-existing SRCU read-side critical section. On systems with 613 * all pre-existing SRCU read-side critical section. On systems with
@@ -335,33 +632,40 @@ static void srcu_gp_end(struct srcu_struct *sp)
335 * srcu_read_lock(), and srcu_read_unlock() that are all passed the same 632 * srcu_read_lock(), and srcu_read_unlock() that are all passed the same
336 * srcu_struct structure. 633 * srcu_struct structure.
337 */ 634 */
338void call_srcu(struct srcu_struct *sp, struct rcu_head *head, 635void call_srcu(struct srcu_struct *sp, struct rcu_head *rhp,
339 rcu_callback_t func) 636 rcu_callback_t func)
340{ 637{
341 unsigned long flags; 638 unsigned long flags;
342 639 bool needgp = false;
343 head->next = NULL; 640 unsigned long s;
344 head->func = func; 641 struct srcu_data *sdp;
345 spin_lock_irqsave(&sp->queue_lock, flags); 642
346 smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ 643 check_init_srcu_struct(sp);
347 rcu_segcblist_enqueue(&sp->srcu_cblist, head, false); 644 rhp->func = func;
348 if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) { 645 local_irq_save(flags);
349 srcu_gp_start(sp); 646 sdp = this_cpu_ptr(sp->sda);
350 queue_delayed_work(system_power_efficient_wq, &sp->work, 0); 647 spin_lock(&sdp->lock);
648 rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp, false);
649 rcu_segcblist_advance(&sdp->srcu_cblist,
650 rcu_seq_current(&sp->srcu_gp_seq));
651 s = rcu_seq_snap(&sp->srcu_gp_seq);
652 (void)rcu_segcblist_accelerate(&sdp->srcu_cblist, s);
653 if (ULONG_CMP_LT(sdp->srcu_gp_seq_needed, s)) {
654 sdp->srcu_gp_seq_needed = s;
655 needgp = true;
351 } 656 }
352 spin_unlock_irqrestore(&sp->queue_lock, flags); 657 spin_unlock_irqrestore(&sdp->lock, flags);
658 if (needgp)
659 srcu_funnel_gp_start(sp, sdp, s);
353} 660}
354EXPORT_SYMBOL_GPL(call_srcu); 661EXPORT_SYMBOL_GPL(call_srcu);
355 662
356static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay);
357
358/* 663/*
359 * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). 664 * Helper function for synchronize_srcu() and synchronize_srcu_expedited().
360 */ 665 */
361static void __synchronize_srcu(struct srcu_struct *sp) 666static void __synchronize_srcu(struct srcu_struct *sp)
362{ 667{
363 struct rcu_synchronize rcu; 668 struct rcu_synchronize rcu;
364 struct rcu_head *head = &rcu.head;
365 669
366 RCU_LOCKDEP_WARN(lock_is_held(&sp->dep_map) || 670 RCU_LOCKDEP_WARN(lock_is_held(&sp->dep_map) ||
367 lock_is_held(&rcu_bh_lock_map) || 671 lock_is_held(&rcu_bh_lock_map) ||
@@ -372,26 +676,12 @@ static void __synchronize_srcu(struct srcu_struct *sp)
372 if (rcu_scheduler_active == RCU_SCHEDULER_INACTIVE) 676 if (rcu_scheduler_active == RCU_SCHEDULER_INACTIVE)
373 return; 677 return;
374 might_sleep(); 678 might_sleep();
679 check_init_srcu_struct(sp);
375 init_completion(&rcu.completion); 680 init_completion(&rcu.completion);
376 681 init_rcu_head_on_stack(&rcu.head);
377 head->next = NULL; 682 call_srcu(sp, &rcu.head, wakeme_after_rcu);
378 head->func = wakeme_after_rcu;
379 spin_lock_irq(&sp->queue_lock);
380 smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
381 if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) {
382 /* steal the processing owner */
383 rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
384 srcu_gp_start(sp);
385 spin_unlock_irq(&sp->queue_lock);
386 /* give the processing owner to work_struct */
387 srcu_reschedule(sp, 0);
388 } else {
389 rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
390 spin_unlock_irq(&sp->queue_lock);
391 }
392
393 wait_for_completion(&rcu.completion); 683 wait_for_completion(&rcu.completion);
394 smp_mb(); /* Caller's later accesses after GP. */ 684 destroy_rcu_head_on_stack(&rcu.head);
395} 685}
396 686
397/** 687/**
@@ -408,6 +698,7 @@ void synchronize_srcu_expedited(struct srcu_struct *sp)
408{ 698{
409 bool do_norm = rcu_gp_is_normal(); 699 bool do_norm = rcu_gp_is_normal();
410 700
701 check_init_srcu_struct(sp);
411 if (!do_norm) { 702 if (!do_norm) {
412 atomic_inc(&sp->srcu_exp_cnt); 703 atomic_inc(&sp->srcu_exp_cnt);
413 smp_mb__after_atomic(); /* increment before GP. */ 704 smp_mb__after_atomic(); /* increment before GP. */
@@ -415,7 +706,7 @@ void synchronize_srcu_expedited(struct srcu_struct *sp)
415 __synchronize_srcu(sp); 706 __synchronize_srcu(sp);
416 if (!do_norm) { 707 if (!do_norm) {
417 smp_mb__before_atomic(); /* GP before decrement. */ 708 smp_mb__before_atomic(); /* GP before decrement. */
418 atomic_dec(&sp->srcu_exp_cnt); 709 WARN_ON_ONCE(atomic_dec_return(&sp->srcu_exp_cnt) < 0);
419 } 710 }
420} 711}
421EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); 712EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
@@ -426,8 +717,8 @@ EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
426 * 717 *
427 * Wait for the count to drain to zero of both indexes. To avoid the 718 * Wait for the count to drain to zero of both indexes. To avoid the
428 * possible starvation of synchronize_srcu(), it waits for the count of 719 * possible starvation of synchronize_srcu(), it waits for the count of
429 * the index=((->completed & 1) ^ 1) to drain to zero at first, 720 * the index=((->srcu_idx & 1) ^ 1) to drain to zero at first,
430 * and then flip the completed and wait for the count of the other index. 721 * and then flip the srcu_idx and wait for the count of the other index.
431 * 722 *
432 * Can block; must be called from process context. 723 * Can block; must be called from process context.
433 * 724 *
@@ -468,13 +759,69 @@ void synchronize_srcu(struct srcu_struct *sp)
468} 759}
469EXPORT_SYMBOL_GPL(synchronize_srcu); 760EXPORT_SYMBOL_GPL(synchronize_srcu);
470 761
762/*
763 * Callback function for srcu_barrier() use.
764 */
765static void srcu_barrier_cb(struct rcu_head *rhp)
766{
767 struct srcu_data *sdp;
768 struct srcu_struct *sp;
769
770 sdp = container_of(rhp, struct srcu_data, srcu_barrier_head);
771 sp = sdp->sp;
772 if (atomic_dec_and_test(&sp->srcu_barrier_cpu_cnt))
773 complete(&sp->srcu_barrier_completion);
774}
775
471/** 776/**
472 * srcu_barrier - Wait until all in-flight call_srcu() callbacks complete. 777 * srcu_barrier - Wait until all in-flight call_srcu() callbacks complete.
473 * @sp: srcu_struct on which to wait for in-flight callbacks. 778 * @sp: srcu_struct on which to wait for in-flight callbacks.
474 */ 779 */
475void srcu_barrier(struct srcu_struct *sp) 780void srcu_barrier(struct srcu_struct *sp)
476{ 781{
477 synchronize_srcu(sp); 782 int cpu;
783 struct srcu_data *sdp;
784 unsigned long s = rcu_seq_snap(&sp->srcu_barrier_seq);
785
786 check_init_srcu_struct(sp);
787 mutex_lock(&sp->srcu_barrier_mutex);
788 if (rcu_seq_done(&sp->srcu_barrier_seq, s)) {
789 smp_mb(); /* Force ordering following return. */
790 mutex_unlock(&sp->srcu_barrier_mutex);
791 return; /* Someone else did our work for us. */
792 }
793 rcu_seq_start(&sp->srcu_barrier_seq);
794 init_completion(&sp->srcu_barrier_completion);
795
796 /* Initial count prevents reaching zero until all CBs are posted. */
797 atomic_set(&sp->srcu_barrier_cpu_cnt, 1);
798
799 /*
800 * Each pass through this loop enqueues a callback, but only
801 * on CPUs already having callbacks enqueued. Note that if
802 * a CPU already has callbacks enqueue, it must have already
803 * registered the need for a future grace period, so all we
804 * need do is enqueue a callback that will use the same
805 * grace period as the last callback already in the queue.
806 */
807 for_each_possible_cpu(cpu) {
808 sdp = per_cpu_ptr(sp->sda, cpu);
809 spin_lock_irq(&sdp->lock);
810 atomic_inc(&sp->srcu_barrier_cpu_cnt);
811 sdp->srcu_barrier_head.func = srcu_barrier_cb;
812 if (!rcu_segcblist_entrain(&sdp->srcu_cblist,
813 &sdp->srcu_barrier_head, 0))
814 atomic_dec(&sp->srcu_barrier_cpu_cnt);
815 spin_unlock_irq(&sdp->lock);
816 }
817
818 /* Remove the initial count, at which point reaching zero can happen. */
819 if (atomic_dec_and_test(&sp->srcu_barrier_cpu_cnt))
820 complete(&sp->srcu_barrier_completion);
821 wait_for_completion(&sp->srcu_barrier_completion);
822
823 rcu_seq_end(&sp->srcu_barrier_seq);
824 mutex_unlock(&sp->srcu_barrier_mutex);
478} 825}
479EXPORT_SYMBOL_GPL(srcu_barrier); 826EXPORT_SYMBOL_GPL(srcu_barrier);
480 827
@@ -487,21 +834,24 @@ EXPORT_SYMBOL_GPL(srcu_barrier);
487 */ 834 */
488unsigned long srcu_batches_completed(struct srcu_struct *sp) 835unsigned long srcu_batches_completed(struct srcu_struct *sp)
489{ 836{
490 return sp->completed; 837 return sp->srcu_idx;
491} 838}
492EXPORT_SYMBOL_GPL(srcu_batches_completed); 839EXPORT_SYMBOL_GPL(srcu_batches_completed);
493 840
494/* 841/*
495 * Core SRCU state machine. Advance callbacks from ->batch_check0 to 842 * Core SRCU state machine. Push state bits of ->srcu_gp_seq
496 * ->batch_check1 and then to ->batch_done as readers drain. 843 * to SRCU_STATE_SCAN2, and invoke srcu_gp_end() when scan has
844 * completed in that state.
497 */ 845 */
498static void srcu_advance_batches(struct srcu_struct *sp) 846static void srcu_advance_state(struct srcu_struct *sp)
499{ 847{
500 int idx; 848 int idx;
501 849
850 mutex_lock(&sp->srcu_gp_mutex);
851
502 /* 852 /*
503 * Because readers might be delayed for an extended period after 853 * Because readers might be delayed for an extended period after
504 * fetching ->completed for their index, at any point in time there 854 * fetching ->srcu_idx for their index, at any point in time there
505 * might well be readers using both idx=0 and idx=1. We therefore 855 * might well be readers using both idx=0 and idx=1. We therefore
506 * need to wait for readers to clear from both index values before 856 * need to wait for readers to clear from both index values before
507 * invoking a callback. 857 * invoking a callback.
@@ -511,23 +861,29 @@ static void srcu_advance_batches(struct srcu_struct *sp)
511 */ 861 */
512 idx = rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq)); /* ^^^ */ 862 idx = rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq)); /* ^^^ */
513 if (idx == SRCU_STATE_IDLE) { 863 if (idx == SRCU_STATE_IDLE) {
514 spin_lock_irq(&sp->queue_lock); 864 spin_lock_irq(&sp->gp_lock);
515 if (rcu_segcblist_empty(&sp->srcu_cblist)) { 865 if (ULONG_CMP_GE(sp->srcu_gp_seq, sp->srcu_gp_seq_needed)) {
516 spin_unlock_irq(&sp->queue_lock); 866 WARN_ON_ONCE(rcu_seq_state(sp->srcu_gp_seq));
867 spin_unlock_irq(&sp->gp_lock);
868 mutex_unlock(&sp->srcu_gp_mutex);
517 return; 869 return;
518 } 870 }
519 idx = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)); 871 idx = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
520 if (idx == SRCU_STATE_IDLE) 872 if (idx == SRCU_STATE_IDLE)
521 srcu_gp_start(sp); 873 srcu_gp_start(sp);
522 spin_unlock_irq(&sp->queue_lock); 874 spin_unlock_irq(&sp->gp_lock);
523 if (idx != SRCU_STATE_IDLE) 875 if (idx != SRCU_STATE_IDLE) {
876 mutex_unlock(&sp->srcu_gp_mutex);
524 return; /* Someone else started the grace period. */ 877 return; /* Someone else started the grace period. */
878 }
525 } 879 }
526 880
527 if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN1) { 881 if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN1) {
528 idx = 1 ^ (sp->completed & 1); 882 idx = 1 ^ (sp->srcu_idx & 1);
529 if (!try_check_zero(sp, idx, 1)) 883 if (!try_check_zero(sp, idx, 1)) {
884 mutex_unlock(&sp->srcu_gp_mutex);
530 return; /* readers present, retry later. */ 885 return; /* readers present, retry later. */
886 }
531 srcu_flip(sp); 887 srcu_flip(sp);
532 rcu_seq_set_state(&sp->srcu_gp_seq, SRCU_STATE_SCAN2); 888 rcu_seq_set_state(&sp->srcu_gp_seq, SRCU_STATE_SCAN2);
533 } 889 }
@@ -538,10 +894,12 @@ static void srcu_advance_batches(struct srcu_struct *sp)
538 * SRCU read-side critical sections are normally short, 894 * SRCU read-side critical sections are normally short,
539 * so check at least twice in quick succession after a flip. 895 * so check at least twice in quick succession after a flip.
540 */ 896 */
541 idx = 1 ^ (sp->completed & 1); 897 idx = 1 ^ (sp->srcu_idx & 1);
542 if (!try_check_zero(sp, idx, 2)) 898 if (!try_check_zero(sp, idx, 2)) {
543 return; /* readers present, retry after later. */ 899 mutex_unlock(&sp->srcu_gp_mutex);
544 srcu_gp_end(sp); 900 return; /* readers present, retry later. */
901 }
902 srcu_gp_end(sp); /* Releases ->srcu_gp_mutex. */
545 } 903 }
546} 904}
547 905
@@ -551,28 +909,51 @@ static void srcu_advance_batches(struct srcu_struct *sp)
551 * the workqueue. Note that needed memory barriers have been executed 909 * the workqueue. Note that needed memory barriers have been executed
552 * in this task's context by srcu_readers_active_idx_check(). 910 * in this task's context by srcu_readers_active_idx_check().
553 */ 911 */
554static void srcu_invoke_callbacks(struct srcu_struct *sp) 912static void srcu_invoke_callbacks(struct work_struct *work)
555{ 913{
914 bool more;
556 struct rcu_cblist ready_cbs; 915 struct rcu_cblist ready_cbs;
557 struct rcu_head *rhp; 916 struct rcu_head *rhp;
917 struct srcu_data *sdp;
918 struct srcu_struct *sp;
558 919
559 spin_lock_irq(&sp->queue_lock); 920 sdp = container_of(work, struct srcu_data, work.work);
560 if (!rcu_segcblist_ready_cbs(&sp->srcu_cblist)) { 921 sp = sdp->sp;
561 spin_unlock_irq(&sp->queue_lock);
562 return;
563 }
564 rcu_cblist_init(&ready_cbs); 922 rcu_cblist_init(&ready_cbs);
565 rcu_segcblist_extract_done_cbs(&sp->srcu_cblist, &ready_cbs); 923 spin_lock_irq(&sdp->lock);
566 spin_unlock_irq(&sp->queue_lock); 924 smp_mb(); /* Old grace periods before callback invocation! */
925 rcu_segcblist_advance(&sdp->srcu_cblist,
926 rcu_seq_current(&sp->srcu_gp_seq));
927 if (sdp->srcu_cblist_invoking ||
928 !rcu_segcblist_ready_cbs(&sdp->srcu_cblist)) {
929 spin_unlock_irq(&sdp->lock);
930 return; /* Someone else on the job or nothing to do. */
931 }
932
933 /* We are on the job! Extract and invoke ready callbacks. */
934 sdp->srcu_cblist_invoking = true;
935 rcu_segcblist_extract_done_cbs(&sdp->srcu_cblist, &ready_cbs);
936 spin_unlock_irq(&sdp->lock);
567 rhp = rcu_cblist_dequeue(&ready_cbs); 937 rhp = rcu_cblist_dequeue(&ready_cbs);
568 for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) { 938 for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) {
569 local_bh_disable(); 939 local_bh_disable();
570 rhp->func(rhp); 940 rhp->func(rhp);
571 local_bh_enable(); 941 local_bh_enable();
572 } 942 }
573 spin_lock_irq(&sp->queue_lock); 943
574 rcu_segcblist_insert_count(&sp->srcu_cblist, &ready_cbs); 944 /*
575 spin_unlock_irq(&sp->queue_lock); 945 * Update counts, accelerate new callbacks, and if needed,
946 * schedule another round of callback invocation.
947 */
948 spin_lock_irq(&sdp->lock);
949 rcu_segcblist_insert_count(&sdp->srcu_cblist, &ready_cbs);
950 (void)rcu_segcblist_accelerate(&sdp->srcu_cblist,
951 rcu_seq_snap(&sp->srcu_gp_seq));
952 sdp->srcu_cblist_invoking = false;
953 more = rcu_segcblist_ready_cbs(&sdp->srcu_cblist);
954 spin_unlock_irq(&sdp->lock);
955 if (more)
956 srcu_schedule_cbs_sdp(sdp, 0);
576} 957}
577 958
578/* 959/*
@@ -581,19 +962,21 @@ static void srcu_invoke_callbacks(struct srcu_struct *sp)
581 */ 962 */
582static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay) 963static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay)
583{ 964{
584 bool pending = true; 965 bool pushgp = true;
585 int state;
586 966
587 if (rcu_segcblist_empty(&sp->srcu_cblist)) { 967 spin_lock_irq(&sp->gp_lock);
588 spin_lock_irq(&sp->queue_lock); 968 if (ULONG_CMP_GE(sp->srcu_gp_seq, sp->srcu_gp_seq_needed)) {
589 state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)); 969 if (!WARN_ON_ONCE(rcu_seq_state(sp->srcu_gp_seq))) {
590 if (rcu_segcblist_empty(&sp->srcu_cblist) && 970 /* All requests fulfilled, time to go idle. */
591 state == SRCU_STATE_IDLE) 971 pushgp = false;
592 pending = false; 972 }
593 spin_unlock_irq(&sp->queue_lock); 973 } else if (!rcu_seq_state(sp->srcu_gp_seq)) {
974 /* Outstanding request and no GP. Start one. */
975 srcu_gp_start(sp);
594 } 976 }
977 spin_unlock_irq(&sp->gp_lock);
595 978
596 if (pending) 979 if (pushgp)
597 queue_delayed_work(system_power_efficient_wq, &sp->work, delay); 980 queue_delayed_work(system_power_efficient_wq, &sp->work, delay);
598} 981}
599 982
@@ -606,8 +989,7 @@ void process_srcu(struct work_struct *work)
606 989
607 sp = container_of(work, struct srcu_struct, work.work); 990 sp = container_of(work, struct srcu_struct, work.work);
608 991
609 srcu_advance_batches(sp); 992 srcu_advance_state(sp);
610 srcu_invoke_callbacks(sp);
611 srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL); 993 srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL);
612} 994}
613EXPORT_SYMBOL_GPL(process_srcu); 995EXPORT_SYMBOL_GPL(process_srcu);
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 346948b51b0b..3c23435d2083 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -3776,12 +3776,16 @@ int rcutree_online_cpu(unsigned int cpu)
3776{ 3776{
3777 sync_sched_exp_online_cleanup(cpu); 3777 sync_sched_exp_online_cleanup(cpu);
3778 rcutree_affinity_setting(cpu, -1); 3778 rcutree_affinity_setting(cpu, -1);
3779 if (IS_ENABLED(CONFIG_TREE_SRCU))
3780 srcu_online_cpu(cpu);
3779 return 0; 3781 return 0;
3780} 3782}
3781 3783
3782int rcutree_offline_cpu(unsigned int cpu) 3784int rcutree_offline_cpu(unsigned int cpu)
3783{ 3785{
3784 rcutree_affinity_setting(cpu, cpu); 3786 rcutree_affinity_setting(cpu, cpu);
3787 if (IS_ENABLED(CONFIG_TREE_SRCU))
3788 srcu_offline_cpu(cpu);
3785 return 0; 3789 return 0;
3786} 3790}
3787 3791
@@ -4157,6 +4161,8 @@ void __init rcu_init(void)
4157 for_each_online_cpu(cpu) { 4161 for_each_online_cpu(cpu) {
4158 rcutree_prepare_cpu(cpu); 4162 rcutree_prepare_cpu(cpu);
4159 rcu_cpu_starting(cpu); 4163 rcu_cpu_starting(cpu);
4164 if (IS_ENABLED(CONFIG_TREE_SRCU))
4165 srcu_online_cpu(cpu);
4160 } 4166 }
4161} 4167}
4162 4168
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index a2a45cb629d6..0e598ab08fea 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -541,6 +541,14 @@ static bool rcu_nohz_full_cpu(struct rcu_state *rsp);
541static void rcu_dynticks_task_enter(void); 541static void rcu_dynticks_task_enter(void);
542static void rcu_dynticks_task_exit(void); 542static void rcu_dynticks_task_exit(void);
543 543
544#ifdef CONFIG_SRCU
545void srcu_online_cpu(unsigned int cpu);
546void srcu_offline_cpu(unsigned int cpu);
547#else /* #ifdef CONFIG_SRCU */
548void srcu_online_cpu(unsigned int cpu) { }
549void srcu_offline_cpu(unsigned int cpu) { }
550#endif /* #else #ifdef CONFIG_SRCU */
551
544#endif /* #ifndef RCU_TREE_NONCORE */ 552#endif /* #ifndef RCU_TREE_NONCORE */
545 553
546#ifdef CONFIG_RCU_TRACE 554#ifdef CONFIG_RCU_TRACE