aboutsummaryrefslogtreecommitdiffstats
path: root/kernel/srcu.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/srcu.c')
-rw-r--r--kernel/srcu.c362
1 files changed, 300 insertions, 62 deletions
diff --git a/kernel/srcu.c b/kernel/srcu.c
index b9088524935a..2095be3318d5 100644
--- a/kernel/srcu.c
+++ b/kernel/srcu.c
@@ -34,10 +34,77 @@
34#include <linux/delay.h> 34#include <linux/delay.h>
35#include <linux/srcu.h> 35#include <linux/srcu.h>
36 36
37/*
38 * Initialize an rcu_batch structure to empty.
39 */
40static inline void rcu_batch_init(struct rcu_batch *b)
41{
42 b->head = NULL;
43 b->tail = &b->head;
44}
45
46/*
47 * Enqueue a callback onto the tail of the specified rcu_batch structure.
48 */
49static inline void rcu_batch_queue(struct rcu_batch *b, struct rcu_head *head)
50{
51 *b->tail = head;
52 b->tail = &head->next;
53}
54
55/*
56 * Is the specified rcu_batch structure empty?
57 */
58static inline bool rcu_batch_empty(struct rcu_batch *b)
59{
60 return b->tail == &b->head;
61}
62
63/*
64 * Remove the callback at the head of the specified rcu_batch structure
65 * and return a pointer to it, or return NULL if the structure is empty.
66 */
67static inline struct rcu_head *rcu_batch_dequeue(struct rcu_batch *b)
68{
69 struct rcu_head *head;
70
71 if (rcu_batch_empty(b))
72 return NULL;
73
74 head = b->head;
75 b->head = head->next;
76 if (b->tail == &head->next)
77 rcu_batch_init(b);
78
79 return head;
80}
81
82/*
83 * Move all callbacks from the rcu_batch structure specified by "from" to
84 * the structure specified by "to".
85 */
86static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from)
87{
88 if (!rcu_batch_empty(from)) {
89 *to->tail = from->head;
90 to->tail = from->tail;
91 rcu_batch_init(from);
92 }
93}
94
95/* single-thread state-machine */
96static void process_srcu(struct work_struct *work);
97
37static int init_srcu_struct_fields(struct srcu_struct *sp) 98static int init_srcu_struct_fields(struct srcu_struct *sp)
38{ 99{
39 sp->completed = 0; 100 sp->completed = 0;
40 mutex_init(&sp->mutex); 101 spin_lock_init(&sp->queue_lock);
102 sp->running = false;
103 rcu_batch_init(&sp->batch_queue);
104 rcu_batch_init(&sp->batch_check0);
105 rcu_batch_init(&sp->batch_check1);
106 rcu_batch_init(&sp->batch_done);
107 INIT_DELAYED_WORK(&sp->work, process_srcu);
41 sp->per_cpu_ref = alloc_percpu(struct srcu_struct_array); 108 sp->per_cpu_ref = alloc_percpu(struct srcu_struct_array);
42 return sp->per_cpu_ref ? 0 : -ENOMEM; 109 return sp->per_cpu_ref ? 0 : -ENOMEM;
43} 110}
@@ -266,43 +333,86 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock);
266 * we repeatedly block for 1-millisecond time periods. This approach 333 * we repeatedly block for 1-millisecond time periods. This approach
267 * has done well in testing, so there is no need for a config parameter. 334 * has done well in testing, so there is no need for a config parameter.
268 */ 335 */
269#define SYNCHRONIZE_SRCU_READER_DELAY 5 336#define SRCU_RETRY_CHECK_DELAY 5
270#define SYNCHRONIZE_SRCU_TRYCOUNT 2 337#define SYNCHRONIZE_SRCU_TRYCOUNT 2
271#define SYNCHRONIZE_SRCU_EXP_TRYCOUNT 12 338#define SYNCHRONIZE_SRCU_EXP_TRYCOUNT 12
272 339
273/* 340/*
274 * Wait until all pre-existing readers complete. Such readers 341 * @@@ Wait until all pre-existing readers complete. Such readers
275 * will have used the index specified by "idx". 342 * will have used the index specified by "idx".
343 * the caller should ensures the ->completed is not changed while checking
344 * and idx = (->completed & 1) ^ 1
276 */ 345 */
277static void wait_idx(struct srcu_struct *sp, int idx, int trycount) 346static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount)
278{ 347{
279 /* 348 for (;;) {
280 * SRCU read-side critical sections are normally short, so wait 349 if (srcu_readers_active_idx_check(sp, idx))
281 * a small amount of time before possibly blocking. 350 return true;
282 */ 351 if (--trycount <= 0)
283 if (!srcu_readers_active_idx_check(sp, idx)) { 352 return false;
284 udelay(SYNCHRONIZE_SRCU_READER_DELAY); 353 udelay(SRCU_RETRY_CHECK_DELAY);
285 while (!srcu_readers_active_idx_check(sp, idx)) {
286 if (trycount > 0) {
287 trycount--;
288 udelay(SYNCHRONIZE_SRCU_READER_DELAY);
289 } else
290 schedule_timeout_interruptible(1);
291 }
292 } 354 }
293} 355}
294 356
357/*
358 * Increment the ->completed counter so that future SRCU readers will
359 * use the other rank of the ->c[] and ->seq[] arrays. This allows
360 * us to wait for pre-existing readers in a starvation-free manner.
361 */
295static void srcu_flip(struct srcu_struct *sp) 362static void srcu_flip(struct srcu_struct *sp)
296{ 363{
297 sp->completed++; 364 sp->completed++;
298} 365}
299 366
300/* 367/*
368 * Enqueue an SRCU callback on the specified srcu_struct structure,
369 * initiating grace-period processing if it is not already running.
370 */
371void call_srcu(struct srcu_struct *sp, struct rcu_head *head,
372 void (*func)(struct rcu_head *head))
373{
374 unsigned long flags;
375
376 head->next = NULL;
377 head->func = func;
378 spin_lock_irqsave(&sp->queue_lock, flags);
379 rcu_batch_queue(&sp->batch_queue, head);
380 if (!sp->running) {
381 sp->running = true;
382 queue_delayed_work(system_nrt_wq, &sp->work, 0);
383 }
384 spin_unlock_irqrestore(&sp->queue_lock, flags);
385}
386EXPORT_SYMBOL_GPL(call_srcu);
387
388struct rcu_synchronize {
389 struct rcu_head head;
390 struct completion completion;
391};
392
393/*
394 * Awaken the corresponding synchronize_srcu() instance now that a
395 * grace period has elapsed.
396 */
397static void wakeme_after_rcu(struct rcu_head *head)
398{
399 struct rcu_synchronize *rcu;
400
401 rcu = container_of(head, struct rcu_synchronize, head);
402 complete(&rcu->completion);
403}
404
405static void srcu_advance_batches(struct srcu_struct *sp, int trycount);
406static void srcu_reschedule(struct srcu_struct *sp);
407
408/*
301 * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). 409 * Helper function for synchronize_srcu() and synchronize_srcu_expedited().
302 */ 410 */
303static void __synchronize_srcu(struct srcu_struct *sp, int trycount) 411static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
304{ 412{
305 int busy_idx; 413 struct rcu_synchronize rcu;
414 struct rcu_head *head = &rcu.head;
415 bool done = false;
306 416
307 rcu_lockdep_assert(!lock_is_held(&sp->dep_map) && 417 rcu_lockdep_assert(!lock_is_held(&sp->dep_map) &&
308 !lock_is_held(&rcu_bh_lock_map) && 418 !lock_is_held(&rcu_bh_lock_map) &&
@@ -310,50 +420,32 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
310 !lock_is_held(&rcu_sched_lock_map), 420 !lock_is_held(&rcu_sched_lock_map),
311 "Illegal synchronize_srcu() in same-type SRCU (or RCU) read-side critical section"); 421 "Illegal synchronize_srcu() in same-type SRCU (or RCU) read-side critical section");
312 422
313 mutex_lock(&sp->mutex); 423 init_completion(&rcu.completion);
314 busy_idx = sp->completed & 0X1UL; 424
315 425 head->next = NULL;
316 /* 426 head->func = wakeme_after_rcu;
317 * If we recently flipped the index, there will be some readers 427 spin_lock_irq(&sp->queue_lock);
318 * using idx=0 and others using idx=1. Therefore, two calls to 428 if (!sp->running) {
319 * wait_idx()s suffice to ensure that all pre-existing readers 429 /* steal the processing owner */
320 * have completed: 430 sp->running = true;
321 * 431 rcu_batch_queue(&sp->batch_check0, head);
322 * __synchronize_srcu() { 432 spin_unlock_irq(&sp->queue_lock);
323 * wait_idx(sp, 0, trycount); 433
324 * wait_idx(sp, 1, trycount); 434 srcu_advance_batches(sp, trycount);
325 * } 435 if (!rcu_batch_empty(&sp->batch_done)) {
326 * 436 BUG_ON(sp->batch_done.head != head);
327 * Starvation is prevented by the fact that we flip the index. 437 rcu_batch_dequeue(&sp->batch_done);
328 * While we wait on one index to clear out, almost all new readers 438 done = true;
329 * will be using the other index. The number of new readers using the 439 }
330 * index we are waiting on is sharply bounded by roughly the number 440 /* give the processing owner to work_struct */
331 * of CPUs. 441 srcu_reschedule(sp);
332 * 442 } else {
333 * How can new readers possibly using the old pre-flip value of 443 rcu_batch_queue(&sp->batch_queue, head);
334 * the index? Consider the following sequence of events: 444 spin_unlock_irq(&sp->queue_lock);
335 * 445 }
336 * Suppose that during the previous grace period, a reader
337 * picked up the old value of the index, but did not increment
338 * its counter until after the previous instance of
339 * __synchronize_srcu() did the counter summation and recheck.
340 * That previous grace period was OK because the reader did
341 * not start until after the grace period started, so the grace
342 * period was not obligated to wait for that reader.
343 *
344 * However, this sequence of events is quite improbable, so
345 * this call to wait_idx(), which waits on really old readers
346 * describe in this comment above, will almost never need to wait.
347 */
348 wait_idx(sp, 1 - busy_idx, trycount);
349
350 /* Flip the index to avoid reader-induced starvation. */
351 srcu_flip(sp);
352
353 /* Wait for recent pre-existing readers. */
354 wait_idx(sp, busy_idx, trycount);
355 446
356 mutex_unlock(&sp->mutex); 447 if (!done)
448 wait_for_completion(&rcu.completion);
357} 449}
358 450
359/** 451/**
@@ -398,15 +490,161 @@ void synchronize_srcu_expedited(struct srcu_struct *sp)
398EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); 490EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
399 491
400/** 492/**
493 * srcu_barrier - Wait until all in-flight call_srcu() callbacks complete.
494 */
495void srcu_barrier(struct srcu_struct *sp)
496{
497 synchronize_srcu(sp);
498}
499EXPORT_SYMBOL_GPL(srcu_barrier);
500
501/**
401 * srcu_batches_completed - return batches completed. 502 * srcu_batches_completed - return batches completed.
402 * @sp: srcu_struct on which to report batch completion. 503 * @sp: srcu_struct on which to report batch completion.
403 * 504 *
404 * Report the number of batches, correlated with, but not necessarily 505 * Report the number of batches, correlated with, but not necessarily
405 * precisely the same as, the number of grace periods that have elapsed. 506 * precisely the same as, the number of grace periods that have elapsed.
406 */ 507 */
407
408long srcu_batches_completed(struct srcu_struct *sp) 508long srcu_batches_completed(struct srcu_struct *sp)
409{ 509{
410 return sp->completed; 510 return sp->completed;
411} 511}
412EXPORT_SYMBOL_GPL(srcu_batches_completed); 512EXPORT_SYMBOL_GPL(srcu_batches_completed);
513
514#define SRCU_CALLBACK_BATCH 10
515#define SRCU_INTERVAL 1
516
517/*
518 * Move any new SRCU callbacks to the first stage of the SRCU grace
519 * period pipeline.
520 */
521static void srcu_collect_new(struct srcu_struct *sp)
522{
523 if (!rcu_batch_empty(&sp->batch_queue)) {
524 spin_lock_irq(&sp->queue_lock);
525 rcu_batch_move(&sp->batch_check0, &sp->batch_queue);
526 spin_unlock_irq(&sp->queue_lock);
527 }
528}
529
530/*
531 * Core SRCU state machine. Advance callbacks from ->batch_check0 to
532 * ->batch_check1 and then to ->batch_done as readers drain.
533 */
534static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
535{
536 int idx = 1 ^ (sp->completed & 1);
537
538 /*
539 * Because readers might be delayed for an extended period after
540 * fetching ->completed for their index, at any point in time there
541 * might well be readers using both idx=0 and idx=1. We therefore
542 * need to wait for readers to clear from both index values before
543 * invoking a callback.
544 */
545
546 if (rcu_batch_empty(&sp->batch_check0) &&
547 rcu_batch_empty(&sp->batch_check1))
548 return; /* no callbacks need to be advanced */
549
550 if (!try_check_zero(sp, idx, trycount))
551 return; /* failed to advance, will try after SRCU_INTERVAL */
552
553 /*
554 * The callbacks in ->batch_check1 have already done with their
555 * first zero check and flip back when they were enqueued on
556 * ->batch_check0 in a previous invocation of srcu_advance_batches().
557 * (Presumably try_check_zero() returned false during that
558 * invocation, leaving the callbacks stranded on ->batch_check1.)
559 * They are therefore ready to invoke, so move them to ->batch_done.
560 */
561 rcu_batch_move(&sp->batch_done, &sp->batch_check1);
562
563 if (rcu_batch_empty(&sp->batch_check0))
564 return; /* no callbacks need to be advanced */
565 srcu_flip(sp);
566
567 /*
568 * The callbacks in ->batch_check0 just finished their
569 * first check zero and flip, so move them to ->batch_check1
570 * for future checking on the other idx.
571 */
572 rcu_batch_move(&sp->batch_check1, &sp->batch_check0);
573
574 /*
575 * SRCU read-side critical sections are normally short, so check
576 * at least twice in quick succession after a flip.
577 */
578 trycount = trycount < 2 ? 2 : trycount;
579 if (!try_check_zero(sp, idx^1, trycount))
580 return; /* failed to advance, will try after SRCU_INTERVAL */
581
582 /*
583 * The callbacks in ->batch_check1 have now waited for all
584 * pre-existing readers using both idx values. They are therefore
585 * ready to invoke, so move them to ->batch_done.
586 */
587 rcu_batch_move(&sp->batch_done, &sp->batch_check1);
588}
589
590/*
591 * Invoke a limited number of SRCU callbacks that have passed through
592 * their grace period. If there are more to do, SRCU will reschedule
593 * the workqueue.
594 */
595static void srcu_invoke_callbacks(struct srcu_struct *sp)
596{
597 int i;
598 struct rcu_head *head;
599
600 for (i = 0; i < SRCU_CALLBACK_BATCH; i++) {
601 head = rcu_batch_dequeue(&sp->batch_done);
602 if (!head)
603 break;
604 local_bh_disable();
605 head->func(head);
606 local_bh_enable();
607 }
608}
609
610/*
611 * Finished one round of SRCU grace period. Start another if there are
612 * more SRCU callbacks queued, otherwise put SRCU into not-running state.
613 */
614static void srcu_reschedule(struct srcu_struct *sp)
615{
616 bool pending = true;
617
618 if (rcu_batch_empty(&sp->batch_done) &&
619 rcu_batch_empty(&sp->batch_check1) &&
620 rcu_batch_empty(&sp->batch_check0) &&
621 rcu_batch_empty(&sp->batch_queue)) {
622 spin_lock_irq(&sp->queue_lock);
623 if (rcu_batch_empty(&sp->batch_done) &&
624 rcu_batch_empty(&sp->batch_check1) &&
625 rcu_batch_empty(&sp->batch_check0) &&
626 rcu_batch_empty(&sp->batch_queue)) {
627 sp->running = false;
628 pending = false;
629 }
630 spin_unlock_irq(&sp->queue_lock);
631 }
632
633 if (pending)
634 queue_delayed_work(system_nrt_wq, &sp->work, SRCU_INTERVAL);
635}
636
637/*
638 * This is the work-queue function that handles SRCU grace periods.
639 */
640static void process_srcu(struct work_struct *work)
641{
642 struct srcu_struct *sp;
643
644 sp = container_of(work, struct srcu_struct, work.work);
645
646 srcu_collect_new(sp);
647 srcu_advance_batches(sp, 1);
648 srcu_invoke_callbacks(sp);
649 srcu_reschedule(sp);
650}