summaryrefslogtreecommitdiffstats
path: root/kernel/rcu
diff options
context:
space:
mode:
authorPaul E. McKenney <paulmck@linux.vnet.ibm.com>2014-06-24 12:26:11 -0400
committerPaul E. McKenney <paulmck@linux.vnet.ibm.com>2014-07-07 18:13:44 -0400
commitfbce7497ee5af800a1c350c73f3c3f103cb27a15 (patch)
treeb65cf880d3b4b0afd1becd66f1119f9d1ec9d853 /kernel/rcu
parent4a81e8328d3791a4f99bf5b436d050f6dc5ffea3 (diff)
rcu: Parallelize and economize NOCB kthread wakeups
An 80-CPU system with a context-switch-heavy workload can require so many NOCB kthread wakeups that the RCU grace-period kthreads spend several tens of percent of a CPU just awakening things. This clearly will not scale well: If you add enough CPUs, the RCU grace-period kthreads would get behind, increasing grace-period latency. To avoid this problem, this commit divides the NOCB kthreads into leaders and followers, where the grace-period kthreads awaken the leaders each of whom in turn awakens its followers. By default, the number of groups of kthreads is the square root of the number of CPUs, but this default may be overridden using the rcutree.rcu_nocb_leader_stride boot parameter. This reduces the number of wakeups done per grace period by the RCU grace-period kthread by the square root of the number of CPUs, but of course by shifting those wakeups to the leaders. In addition, because the leaders do grace periods on behalf of their respective followers, the number of wakeups of the followers decreases by up to a factor of two. Instead of being awakened once when new callbacks arrive and again at the end of the grace period, the followers are awakened only at the end of the grace period. For a numerical example, in a 4096-CPU system, the grace-period kthread would awaken 64 leaders, each of which would awaken its 63 followers at the end of the grace period. This compares favorably with the 79 wakeups for the grace-period kthread on an 80-CPU system. Reported-by: Rik van Riel <riel@redhat.com> Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Diffstat (limited to 'kernel/rcu')
-rw-r--r--kernel/rcu/tree.h28
-rw-r--r--kernel/rcu/tree_plugin.h252
2 files changed, 237 insertions, 43 deletions
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index 0f69a79c5b7d..e996d1e53c84 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -334,11 +334,29 @@ struct rcu_data {
334 struct rcu_head **nocb_tail; 334 struct rcu_head **nocb_tail;
335 atomic_long_t nocb_q_count; /* # CBs waiting for kthread */ 335 atomic_long_t nocb_q_count; /* # CBs waiting for kthread */
336 atomic_long_t nocb_q_count_lazy; /* (approximate). */ 336 atomic_long_t nocb_q_count_lazy; /* (approximate). */
337 struct rcu_head *nocb_follower_head; /* CBs ready to invoke. */
338 struct rcu_head **nocb_follower_tail;
339 atomic_long_t nocb_follower_count; /* # CBs ready to invoke. */
340 atomic_long_t nocb_follower_count_lazy; /* (approximate). */
337 int nocb_p_count; /* # CBs being invoked by kthread */ 341 int nocb_p_count; /* # CBs being invoked by kthread */
338 int nocb_p_count_lazy; /* (approximate). */ 342 int nocb_p_count_lazy; /* (approximate). */
339 wait_queue_head_t nocb_wq; /* For nocb kthreads to sleep on. */ 343 wait_queue_head_t nocb_wq; /* For nocb kthreads to sleep on. */
340 struct task_struct *nocb_kthread; 344 struct task_struct *nocb_kthread;
341 bool nocb_defer_wakeup; /* Defer wakeup of nocb_kthread. */ 345 bool nocb_defer_wakeup; /* Defer wakeup of nocb_kthread. */
346
347 /* The following fields are used by the leader, hence own cacheline. */
348 struct rcu_head *nocb_gp_head ____cacheline_internodealigned_in_smp;
349 /* CBs waiting for GP. */
350 struct rcu_head **nocb_gp_tail;
351 long nocb_gp_count;
352 long nocb_gp_count_lazy;
353 bool nocb_leader_wake; /* Is the nocb leader thread awake? */
354 struct rcu_data *nocb_next_follower;
355 /* Next follower in wakeup chain. */
356
357 /* The following fields are used by the follower, hence new cachline. */
358 struct rcu_data *nocb_leader ____cacheline_internodealigned_in_smp;
359 /* Leader CPU takes GP-end wakeups. */
342#endif /* #ifdef CONFIG_RCU_NOCB_CPU */ 360#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
343 361
344 /* 8) RCU CPU stall data. */ 362 /* 8) RCU CPU stall data. */
@@ -587,8 +605,14 @@ static bool rcu_nohz_full_cpu(struct rcu_state *rsp);
587/* Sum up queue lengths for tracing. */ 605/* Sum up queue lengths for tracing. */
588static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll) 606static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll)
589{ 607{
590 *ql = atomic_long_read(&rdp->nocb_q_count) + rdp->nocb_p_count; 608 *ql = atomic_long_read(&rdp->nocb_q_count) +
591 *qll = atomic_long_read(&rdp->nocb_q_count_lazy) + rdp->nocb_p_count_lazy; 609 rdp->nocb_p_count +
610 atomic_long_read(&rdp->nocb_follower_count) +
611 rdp->nocb_p_count + rdp->nocb_gp_count;
612 *qll = atomic_long_read(&rdp->nocb_q_count_lazy) +
613 rdp->nocb_p_count_lazy +
614 atomic_long_read(&rdp->nocb_follower_count_lazy) +
615 rdp->nocb_p_count_lazy + rdp->nocb_gp_count_lazy;
592} 616}
593#else /* #ifdef CONFIG_RCU_NOCB_CPU */ 617#else /* #ifdef CONFIG_RCU_NOCB_CPU */
594static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll) 618static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll)
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
index 02ac0fb186b8..b27b86c7bbfa 100644
--- a/kernel/rcu/tree_plugin.h
+++ b/kernel/rcu/tree_plugin.h
@@ -2060,6 +2060,22 @@ bool rcu_is_nocb_cpu(int cpu)
2060#endif /* #ifndef CONFIG_RCU_NOCB_CPU_ALL */ 2060#endif /* #ifndef CONFIG_RCU_NOCB_CPU_ALL */
2061 2061
2062/* 2062/*
2063 * Kick the leader kthread for this NOCB group.
2064 */
2065static void wake_nocb_leader(struct rcu_data *rdp, bool force)
2066{
2067 struct rcu_data *rdp_leader = rdp->nocb_leader;
2068
2069 if (!ACCESS_ONCE(rdp_leader->nocb_kthread))
2070 return;
2071 if (!ACCESS_ONCE(rdp_leader->nocb_leader_wake) || force) {
2072 /* Prior xchg orders against prior callback enqueue. */
2073 ACCESS_ONCE(rdp_leader->nocb_leader_wake) = true;
2074 wake_up(&rdp_leader->nocb_wq);
2075 }
2076}
2077
2078/*
2063 * Enqueue the specified string of rcu_head structures onto the specified 2079 * Enqueue the specified string of rcu_head structures onto the specified
2064 * CPU's no-CBs lists. The CPU is specified by rdp, the head of the 2080 * CPU's no-CBs lists. The CPU is specified by rdp, the head of the
2065 * string by rhp, and the tail of the string by rhtp. The non-lazy/lazy 2081 * string by rhp, and the tail of the string by rhtp. The non-lazy/lazy
@@ -2093,7 +2109,8 @@ static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
2093 len = atomic_long_read(&rdp->nocb_q_count); 2109 len = atomic_long_read(&rdp->nocb_q_count);
2094 if (old_rhpp == &rdp->nocb_head) { 2110 if (old_rhpp == &rdp->nocb_head) {
2095 if (!irqs_disabled_flags(flags)) { 2111 if (!irqs_disabled_flags(flags)) {
2096 wake_up(&rdp->nocb_wq); /* ... if queue was empty ... */ 2112 /* ... if queue was empty ... */
2113 wake_nocb_leader(rdp, false);
2097 trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, 2114 trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
2098 TPS("WakeEmpty")); 2115 TPS("WakeEmpty"));
2099 } else { 2116 } else {
@@ -2103,7 +2120,8 @@ static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
2103 } 2120 }
2104 rdp->qlen_last_fqs_check = 0; 2121 rdp->qlen_last_fqs_check = 0;
2105 } else if (len > rdp->qlen_last_fqs_check + qhimark) { 2122 } else if (len > rdp->qlen_last_fqs_check + qhimark) {
2106 wake_up_process(t); /* ... or if many callbacks queued. */ 2123 /* ... or if many callbacks queued. */
2124 wake_nocb_leader(rdp, true);
2107 rdp->qlen_last_fqs_check = LONG_MAX / 2; 2125 rdp->qlen_last_fqs_check = LONG_MAX / 2;
2108 trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("WakeOvf")); 2126 trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("WakeOvf"));
2109 } else { 2127 } else {
@@ -2213,13 +2231,150 @@ static void rcu_nocb_wait_gp(struct rcu_data *rdp)
2213} 2231}
2214 2232
2215/* 2233/*
2234 * Leaders come here to wait for additional callbacks to show up.
2235 * This function does not return until callbacks appear.
2236 */
2237static void nocb_leader_wait(struct rcu_data *my_rdp)
2238{
2239 bool firsttime = true;
2240 bool gotcbs;
2241 struct rcu_data *rdp;
2242 struct rcu_head **tail;
2243
2244wait_again:
2245
2246 /* Wait for callbacks to appear. */
2247 if (!rcu_nocb_poll) {
2248 trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "Sleep");
2249 wait_event_interruptible(my_rdp->nocb_wq,
2250 ACCESS_ONCE(my_rdp->nocb_leader_wake));
2251 /* Memory barrier handled by smp_mb() calls below and repoll. */
2252 } else if (firsttime) {
2253 firsttime = false; /* Don't drown trace log with "Poll"! */
2254 trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "Poll");
2255 }
2256
2257 /*
2258 * Each pass through the following loop checks a follower for CBs.
2259 * We are our own first follower. Any CBs found are moved to
2260 * nocb_gp_head, where they await a grace period.
2261 */
2262 gotcbs = false;
2263 for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
2264 rdp->nocb_gp_head = ACCESS_ONCE(rdp->nocb_head);
2265 if (!rdp->nocb_gp_head)
2266 continue; /* No CBs here, try next follower. */
2267
2268 /* Move callbacks to wait-for-GP list, which is empty. */
2269 ACCESS_ONCE(rdp->nocb_head) = NULL;
2270 rdp->nocb_gp_tail = xchg(&rdp->nocb_tail, &rdp->nocb_head);
2271 rdp->nocb_gp_count = atomic_long_xchg(&rdp->nocb_q_count, 0);
2272 rdp->nocb_gp_count_lazy =
2273 atomic_long_xchg(&rdp->nocb_q_count_lazy, 0);
2274 gotcbs = true;
2275 }
2276
2277 /*
2278 * If there were no callbacks, sleep a bit, rescan after a
2279 * memory barrier, and go retry.
2280 */
2281 if (unlikely(!gotcbs)) {
2282 if (!rcu_nocb_poll)
2283 trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu,
2284 "WokeEmpty");
2285 flush_signals(current);
2286 schedule_timeout_interruptible(1);
2287
2288 /* Rescan in case we were a victim of memory ordering. */
2289 my_rdp->nocb_leader_wake = false;
2290 smp_mb(); /* Ensure _wake false before scan. */
2291 for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower)
2292 if (ACCESS_ONCE(rdp->nocb_head)) {
2293 /* Found CB, so short-circuit next wait. */
2294 my_rdp->nocb_leader_wake = true;
2295 break;
2296 }
2297 goto wait_again;
2298 }
2299
2300 /* Wait for one grace period. */
2301 rcu_nocb_wait_gp(my_rdp);
2302
2303 /*
2304 * We left ->nocb_leader_wake set to reduce cache thrashing.
2305 * We clear it now, but recheck for new callbacks while
2306 * traversing our follower list.
2307 */
2308 my_rdp->nocb_leader_wake = false;
2309 smp_mb(); /* Ensure _wake false before scan of ->nocb_head. */
2310
2311 /* Each pass through the following loop wakes a follower, if needed. */
2312 for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
2313 if (ACCESS_ONCE(rdp->nocb_head))
2314 my_rdp->nocb_leader_wake = true; /* No need to wait. */
2315 if (!rdp->nocb_gp_head)
2316 continue; /* No CBs, so no need to wake follower. */
2317
2318 /* Append callbacks to follower's "done" list. */
2319 tail = xchg(&rdp->nocb_follower_tail, rdp->nocb_gp_tail);
2320 *tail = rdp->nocb_gp_head;
2321 atomic_long_add(rdp->nocb_gp_count, &rdp->nocb_follower_count);
2322 atomic_long_add(rdp->nocb_gp_count_lazy,
2323 &rdp->nocb_follower_count_lazy);
2324 if (rdp != my_rdp && tail == &rdp->nocb_follower_head) {
2325 /*
2326 * List was empty, wake up the follower.
2327 * Memory barriers supplied by atomic_long_add().
2328 */
2329 wake_up(&rdp->nocb_wq);
2330 }
2331 }
2332
2333 /* If we (the leader) don't have CBs, go wait some more. */
2334 if (!my_rdp->nocb_follower_head)
2335 goto wait_again;
2336}
2337
2338/*
2339 * Followers come here to wait for additional callbacks to show up.
2340 * This function does not return until callbacks appear.
2341 */
2342static void nocb_follower_wait(struct rcu_data *rdp)
2343{
2344 bool firsttime = true;
2345
2346 for (;;) {
2347 if (!rcu_nocb_poll) {
2348 trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
2349 "FollowerSleep");
2350 wait_event_interruptible(rdp->nocb_wq,
2351 ACCESS_ONCE(rdp->nocb_follower_head));
2352 } else if (firsttime) {
2353 /* Don't drown trace log with "Poll"! */
2354 firsttime = false;
2355 trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "Poll");
2356 }
2357 if (smp_load_acquire(&rdp->nocb_follower_head)) {
2358 /* ^^^ Ensure CB invocation follows _head test. */
2359 return;
2360 }
2361 if (!rcu_nocb_poll)
2362 trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
2363 "WokeEmpty");
2364 flush_signals(current);
2365 schedule_timeout_interruptible(1);
2366 }
2367}
2368
2369/*
2216 * Per-rcu_data kthread, but only for no-CBs CPUs. Each kthread invokes 2370 * Per-rcu_data kthread, but only for no-CBs CPUs. Each kthread invokes
2217 * callbacks queued by the corresponding no-CBs CPU. 2371 * callbacks queued by the corresponding no-CBs CPU, however, there is
2372 * an optional leader-follower relationship so that the grace-period
2373 * kthreads don't have to do quite so many wakeups.
2218 */ 2374 */
2219static int rcu_nocb_kthread(void *arg) 2375static int rcu_nocb_kthread(void *arg)
2220{ 2376{
2221 int c, cl; 2377 int c, cl;
2222 bool firsttime = 1;
2223 struct rcu_head *list; 2378 struct rcu_head *list;
2224 struct rcu_head *next; 2379 struct rcu_head *next;
2225 struct rcu_head **tail; 2380 struct rcu_head **tail;
@@ -2227,41 +2382,22 @@ static int rcu_nocb_kthread(void *arg)
2227 2382
2228 /* Each pass through this loop invokes one batch of callbacks */ 2383 /* Each pass through this loop invokes one batch of callbacks */
2229 for (;;) { 2384 for (;;) {
2230 /* If not polling, wait for next batch of callbacks. */ 2385 /* Wait for callbacks. */
2231 if (!rcu_nocb_poll) { 2386 if (rdp->nocb_leader == rdp)
2232 trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, 2387 nocb_leader_wait(rdp);
2233 TPS("Sleep")); 2388 else
2234 wait_event_interruptible(rdp->nocb_wq, rdp->nocb_head); 2389 nocb_follower_wait(rdp);
2235 /* Memory barrier provide by xchg() below. */ 2390
2236 } else if (firsttime) { 2391 /* Pull the ready-to-invoke callbacks onto local list. */
2237 firsttime = 0; 2392 list = ACCESS_ONCE(rdp->nocb_follower_head);
2238 trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, 2393 BUG_ON(!list);
2239 TPS("Poll")); 2394 trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "WokeNonEmpty");
2240 } 2395 ACCESS_ONCE(rdp->nocb_follower_head) = NULL;
2241 list = ACCESS_ONCE(rdp->nocb_head); 2396 tail = xchg(&rdp->nocb_follower_tail, &rdp->nocb_follower_head);
2242 if (!list) { 2397 c = atomic_long_xchg(&rdp->nocb_follower_count, 0);
2243 if (!rcu_nocb_poll) 2398 cl = atomic_long_xchg(&rdp->nocb_follower_count_lazy, 0);
2244 trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, 2399 rdp->nocb_p_count += c;
2245 TPS("WokeEmpty")); 2400 rdp->nocb_p_count_lazy += cl;
2246 schedule_timeout_interruptible(1);
2247 flush_signals(current);
2248 continue;
2249 }
2250 firsttime = 1;
2251 trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
2252 TPS("WokeNonEmpty"));
2253
2254 /*
2255 * Extract queued callbacks, update counts, and wait
2256 * for a grace period to elapse.
2257 */
2258 ACCESS_ONCE(rdp->nocb_head) = NULL;
2259 tail = xchg(&rdp->nocb_tail, &rdp->nocb_head);
2260 c = atomic_long_xchg(&rdp->nocb_q_count, 0);
2261 cl = atomic_long_xchg(&rdp->nocb_q_count_lazy, 0);
2262 ACCESS_ONCE(rdp->nocb_p_count) += c;
2263 ACCESS_ONCE(rdp->nocb_p_count_lazy) += cl;
2264 rcu_nocb_wait_gp(rdp);
2265 2401
2266 /* Each pass through the following loop invokes a callback. */ 2402 /* Each pass through the following loop invokes a callback. */
2267 trace_rcu_batch_start(rdp->rsp->name, cl, c, -1); 2403 trace_rcu_batch_start(rdp->rsp->name, cl, c, -1);
@@ -2305,7 +2441,7 @@ static void do_nocb_deferred_wakeup(struct rcu_data *rdp)
2305 if (!rcu_nocb_need_deferred_wakeup(rdp)) 2441 if (!rcu_nocb_need_deferred_wakeup(rdp))
2306 return; 2442 return;
2307 ACCESS_ONCE(rdp->nocb_defer_wakeup) = false; 2443 ACCESS_ONCE(rdp->nocb_defer_wakeup) = false;
2308 wake_up(&rdp->nocb_wq); 2444 wake_nocb_leader(rdp, false);
2309 trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("DeferredWakeEmpty")); 2445 trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("DeferredWakeEmpty"));
2310} 2446}
2311 2447
@@ -2314,19 +2450,53 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
2314{ 2450{
2315 rdp->nocb_tail = &rdp->nocb_head; 2451 rdp->nocb_tail = &rdp->nocb_head;
2316 init_waitqueue_head(&rdp->nocb_wq); 2452 init_waitqueue_head(&rdp->nocb_wq);
2453 rdp->nocb_follower_tail = &rdp->nocb_follower_head;
2317} 2454}
2318 2455
2319/* Create a kthread for each RCU flavor for each no-CBs CPU. */ 2456/* How many follower CPU IDs per leader? Default of -1 for sqrt(nr_cpu_ids). */
2457static int rcu_nocb_leader_stride = -1;
2458module_param(rcu_nocb_leader_stride, int, 0444);
2459
2460/*
2461 * Create a kthread for each RCU flavor for each no-CBs CPU.
2462 * Also initialize leader-follower relationships.
2463 */
2320static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp) 2464static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp)
2321{ 2465{
2322 int cpu; 2466 int cpu;
2467 int ls = rcu_nocb_leader_stride;
2468 int nl = 0; /* Next leader. */
2323 struct rcu_data *rdp; 2469 struct rcu_data *rdp;
2470 struct rcu_data *rdp_leader = NULL; /* Suppress misguided gcc warn. */
2471 struct rcu_data *rdp_prev = NULL;
2324 struct task_struct *t; 2472 struct task_struct *t;
2325 2473
2326 if (rcu_nocb_mask == NULL) 2474 if (rcu_nocb_mask == NULL)
2327 return; 2475 return;
2476 if (ls == -1) {
2477 ls = int_sqrt(nr_cpu_ids);
2478 rcu_nocb_leader_stride = ls;
2479 }
2480
2481 /*
2482 * Each pass through this loop sets up one rcu_data structure and
2483 * spawns one rcu_nocb_kthread().
2484 */
2328 for_each_cpu(cpu, rcu_nocb_mask) { 2485 for_each_cpu(cpu, rcu_nocb_mask) {
2329 rdp = per_cpu_ptr(rsp->rda, cpu); 2486 rdp = per_cpu_ptr(rsp->rda, cpu);
2487 if (rdp->cpu >= nl) {
2488 /* New leader, set up for followers & next leader. */
2489 nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls;
2490 rdp->nocb_leader = rdp;
2491 rdp_leader = rdp;
2492 } else {
2493 /* Another follower, link to previous leader. */
2494 rdp->nocb_leader = rdp_leader;
2495 rdp_prev->nocb_next_follower = rdp;
2496 }
2497 rdp_prev = rdp;
2498
2499 /* Spawn the kthread for this CPU. */
2330 t = kthread_run(rcu_nocb_kthread, rdp, 2500 t = kthread_run(rcu_nocb_kthread, rdp,
2331 "rcuo%c/%d", rsp->abbr, cpu); 2501 "rcuo%c/%d", rsp->abbr, cpu);
2332 BUG_ON(IS_ERR(t)); 2502 BUG_ON(IS_ERR(t));