aboutsummaryrefslogtreecommitdiffstats
path: root/kernel
diff options
context:
space:
mode:
authorLai Jiangshan <laijs@cn.fujitsu.com>2008-07-06 05:23:59 -0400
committerIngo Molnar <mingo@elte.hu>2008-07-18 10:07:33 -0400
commit5127bed588a2f8f3a1f732de2a8a190b7df5dce3 (patch)
treebf79321ffa4c1b7c1071bea8ad1a3eb6eb7b888e /kernel
parent3cac97cbb14aed00d83eb33d4613b0fe3aaea863 (diff)
rcu classic: new algorithm for callbacks-processing(v2)
This is v2, it's a little deference from v1 that I had send to lkml. use ACCESS_ONCE use rcu_batch_after/rcu_batch_before for batch # comparison. rcutorture test result: (hotplugs: do cpu-online/offline once per second) No CONFIG_NO_HZ: OK, 12hours No CONFIG_NO_HZ, hotplugs: OK, 12hours CONFIG_NO_HZ=y: OK, 24hours CONFIG_NO_HZ=y, hotplugs: Failed. (Failed also without my patch applied, exactly the same bug occurred, http://lkml.org/lkml/2008/7/3/24) v1's email thread: http://lkml.org/lkml/2008/6/2/539 v1's description: The code/algorithm of the implement of current callbacks-processing is very efficient and technical. But when I studied it and I found a disadvantage: In multi-CPU systems, when a new RCU callback is being queued(call_rcu[_bh]), this callback will be invoked after the grace period for the batch with batch number = rcp->cur+2 has completed very very likely in current implement. Actually, this callback can be invoked after the grace period for the batch with batch number = rcp->cur+1 has completed. The delay of invocation means that latency of synchronize_rcu() is extended. But more important thing is that the callbacks usually free memory, and these works are delayed too! it's necessary for reclaimer to free memory as soon as possible when left memory is few. A very simple way can solve this problem: a field(struct rcu_head::batch) is added to record the batch number for the RCU callback. And when a new RCU callback is being queued, we determine the batch number for this callback(head->batch = rcp->cur+1) and we move this callback to rdp->donelist if we find that head->batch <= rcp->completed when we process callbacks. This simple way reduces the wait time for invocation a lot. (about 2.5Grace Period -> 1.5Grace Period in average in multi-CPU systems) This is my algorithm. But I do not add any field for struct rcu_head in my implement. We just need to memorize the last 2 batches and their batch number, because these 2 batches include all entries that for whom the grace period hasn't completed. So we use a special linked-list rather than add a field. Please see the comment of struct rcu_data. Signed-off-by: Lai Jiangshan <laijs@cn.fujitsu.com> Cc: "Paul E. McKenney" <paulmck@linux.vnet.ibm.com> Cc: Dipankar Sarma <dipankar@in.ibm.com> Cc: Gautham Shenoy <ego@in.ibm.com> Cc: Dhaval Giani <dhaval@linux.vnet.ibm.com> Cc: Peter Zijlstra <a.p.zijlstra@chello.nl> Signed-off-by: Ingo Molnar <mingo@elte.hu>
Diffstat (limited to 'kernel')
-rw-r--r--kernel/rcuclassic.c157
1 files changed, 97 insertions, 60 deletions
diff --git a/kernel/rcuclassic.c b/kernel/rcuclassic.c
index 03726eb95193..d3553ee55f64 100644
--- a/kernel/rcuclassic.c
+++ b/kernel/rcuclassic.c
@@ -120,6 +120,43 @@ static inline void force_quiescent_state(struct rcu_data *rdp,
120} 120}
121#endif 121#endif
122 122
123static void __call_rcu(struct rcu_head *head, struct rcu_ctrlblk *rcp,
124 struct rcu_data *rdp)
125{
126 long batch;
127 smp_mb(); /* reads the most recently updated value of rcu->cur. */
128
129 /*
130 * Determine the batch number of this callback.
131 *
132 * Using ACCESS_ONCE to avoid the following error when gcc eliminates
133 * local variable "batch" and emits codes like this:
134 * 1) rdp->batch = rcp->cur + 1 # gets old value
135 * ......
136 * 2)rcu_batch_after(rcp->cur + 1, rdp->batch) # gets new value
137 * then [*nxttail[0], *nxttail[1]) may contain callbacks
138 * that batch# = rdp->batch, see the comment of struct rcu_data.
139 */
140 batch = ACCESS_ONCE(rcp->cur) + 1;
141
142 if (rdp->nxtlist && rcu_batch_after(batch, rdp->batch)) {
143 /* process callbacks */
144 rdp->nxttail[0] = rdp->nxttail[1];
145 rdp->nxttail[1] = rdp->nxttail[2];
146 if (rcu_batch_after(batch - 1, rdp->batch))
147 rdp->nxttail[0] = rdp->nxttail[2];
148 }
149
150 rdp->batch = batch;
151 *rdp->nxttail[2] = head;
152 rdp->nxttail[2] = &head->next;
153
154 if (unlikely(++rdp->qlen > qhimark)) {
155 rdp->blimit = INT_MAX;
156 force_quiescent_state(rdp, &rcu_ctrlblk);
157 }
158}
159
123/** 160/**
124 * call_rcu - Queue an RCU callback for invocation after a grace period. 161 * call_rcu - Queue an RCU callback for invocation after a grace period.
125 * @head: structure to be used for queueing the RCU updates. 162 * @head: structure to be used for queueing the RCU updates.
@@ -135,18 +172,11 @@ void call_rcu(struct rcu_head *head,
135 void (*func)(struct rcu_head *rcu)) 172 void (*func)(struct rcu_head *rcu))
136{ 173{
137 unsigned long flags; 174 unsigned long flags;
138 struct rcu_data *rdp;
139 175
140 head->func = func; 176 head->func = func;
141 head->next = NULL; 177 head->next = NULL;
142 local_irq_save(flags); 178 local_irq_save(flags);
143 rdp = &__get_cpu_var(rcu_data); 179 __call_rcu(head, &rcu_ctrlblk, &__get_cpu_var(rcu_data));
144 *rdp->nxttail = head;
145 rdp->nxttail = &head->next;
146 if (unlikely(++rdp->qlen > qhimark)) {
147 rdp->blimit = INT_MAX;
148 force_quiescent_state(rdp, &rcu_ctrlblk);
149 }
150 local_irq_restore(flags); 180 local_irq_restore(flags);
151} 181}
152EXPORT_SYMBOL_GPL(call_rcu); 182EXPORT_SYMBOL_GPL(call_rcu);
@@ -171,20 +201,11 @@ void call_rcu_bh(struct rcu_head *head,
171 void (*func)(struct rcu_head *rcu)) 201 void (*func)(struct rcu_head *rcu))
172{ 202{
173 unsigned long flags; 203 unsigned long flags;
174 struct rcu_data *rdp;
175 204
176 head->func = func; 205 head->func = func;
177 head->next = NULL; 206 head->next = NULL;
178 local_irq_save(flags); 207 local_irq_save(flags);
179 rdp = &__get_cpu_var(rcu_bh_data); 208 __call_rcu(head, &rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
180 *rdp->nxttail = head;
181 rdp->nxttail = &head->next;
182
183 if (unlikely(++rdp->qlen > qhimark)) {
184 rdp->blimit = INT_MAX;
185 force_quiescent_state(rdp, &rcu_bh_ctrlblk);
186 }
187
188 local_irq_restore(flags); 209 local_irq_restore(flags);
189} 210}
190EXPORT_SYMBOL_GPL(call_rcu_bh); 211EXPORT_SYMBOL_GPL(call_rcu_bh);
@@ -213,12 +234,6 @@ EXPORT_SYMBOL_GPL(rcu_batches_completed_bh);
213static inline void raise_rcu_softirq(void) 234static inline void raise_rcu_softirq(void)
214{ 235{
215 raise_softirq(RCU_SOFTIRQ); 236 raise_softirq(RCU_SOFTIRQ);
216 /*
217 * The smp_mb() here is required to ensure that this cpu's
218 * __rcu_process_callbacks() reads the most recently updated
219 * value of rcu->cur.
220 */
221 smp_mb();
222} 237}
223 238
224/* 239/*
@@ -360,13 +375,15 @@ static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
360 * which is dead and hence not processing interrupts. 375 * which is dead and hence not processing interrupts.
361 */ 376 */
362static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list, 377static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
363 struct rcu_head **tail) 378 struct rcu_head **tail, long batch)
364{ 379{
365 local_irq_disable(); 380 if (list) {
366 *this_rdp->nxttail = list; 381 local_irq_disable();
367 if (list) 382 this_rdp->batch = batch;
368 this_rdp->nxttail = tail; 383 *this_rdp->nxttail[2] = list;
369 local_irq_enable(); 384 this_rdp->nxttail[2] = tail;
385 local_irq_enable();
386 }
370} 387}
371 388
372static void __rcu_offline_cpu(struct rcu_data *this_rdp, 389static void __rcu_offline_cpu(struct rcu_data *this_rdp,
@@ -380,9 +397,9 @@ static void __rcu_offline_cpu(struct rcu_data *this_rdp,
380 if (rcp->cur != rcp->completed) 397 if (rcp->cur != rcp->completed)
381 cpu_quiet(rdp->cpu, rcp); 398 cpu_quiet(rdp->cpu, rcp);
382 spin_unlock_bh(&rcp->lock); 399 spin_unlock_bh(&rcp->lock);
383 rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail); 400 /* spin_lock implies smp_mb() */
384 rcu_move_batch(this_rdp, rdp->curlist, rdp->curtail); 401 rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail, rcp->cur + 1);
385 rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail); 402 rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail[2], rcp->cur + 1);
386 403
387 local_irq_disable(); 404 local_irq_disable();
388 this_rdp->qlen += rdp->qlen; 405 this_rdp->qlen += rdp->qlen;
@@ -416,27 +433,37 @@ static void rcu_offline_cpu(int cpu)
416static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp, 433static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
417 struct rcu_data *rdp) 434 struct rcu_data *rdp)
418{ 435{
419 if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) { 436 if (rdp->nxtlist) {
420 *rdp->donetail = rdp->curlist;
421 rdp->donetail = rdp->curtail;
422 rdp->curlist = NULL;
423 rdp->curtail = &rdp->curlist;
424 }
425
426 if (rdp->nxtlist && !rdp->curlist) {
427 local_irq_disable(); 437 local_irq_disable();
428 rdp->curlist = rdp->nxtlist;
429 rdp->curtail = rdp->nxttail;
430 rdp->nxtlist = NULL;
431 rdp->nxttail = &rdp->nxtlist;
432 local_irq_enable();
433 438
434 /* 439 /*
435 * start the next batch of callbacks 440 * move the other grace-period-completed entries to
441 * [rdp->nxtlist, *rdp->nxttail[0]) temporarily
442 */
443 if (!rcu_batch_before(rcp->completed, rdp->batch))
444 rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2];
445 else if (!rcu_batch_before(rcp->completed, rdp->batch - 1))
446 rdp->nxttail[0] = rdp->nxttail[1];
447
448 /*
449 * the grace period for entries in
450 * [rdp->nxtlist, *rdp->nxttail[0]) has completed and
451 * move these entries to donelist
436 */ 452 */
453 if (rdp->nxttail[0] != &rdp->nxtlist) {
454 *rdp->donetail = rdp->nxtlist;
455 rdp->donetail = rdp->nxttail[0];
456 rdp->nxtlist = *rdp->nxttail[0];
457 *rdp->donetail = NULL;
458
459 if (rdp->nxttail[1] == rdp->nxttail[0])
460 rdp->nxttail[1] = &rdp->nxtlist;
461 if (rdp->nxttail[2] == rdp->nxttail[0])
462 rdp->nxttail[2] = &rdp->nxtlist;
463 rdp->nxttail[0] = &rdp->nxtlist;
464 }
437 465
438 /* determine batch number */ 466 local_irq_enable();
439 rdp->batch = rcp->cur + 1;
440 467
441 if (rcu_batch_after(rdp->batch, rcp->pending)) { 468 if (rcu_batch_after(rdp->batch, rcp->pending)) {
442 /* and start it/schedule start if it's a new batch */ 469 /* and start it/schedule start if it's a new batch */
@@ -462,15 +489,26 @@ static void rcu_process_callbacks(struct softirq_action *unused)
462 489
463static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp) 490static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
464{ 491{
465 /* This cpu has pending rcu entries and the grace period 492 if (rdp->nxtlist) {
466 * for them has completed. 493 /*
467 */ 494 * This cpu has pending rcu entries and the grace period
468 if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) 495 * for them has completed.
469 return 1; 496 */
497 if (!rcu_batch_before(rcp->completed, rdp->batch))
498 return 1;
499 if (!rcu_batch_before(rcp->completed, rdp->batch - 1) &&
500 rdp->nxttail[0] != rdp->nxttail[1])
501 return 1;
502 if (rdp->nxttail[0] != &rdp->nxtlist)
503 return 1;
470 504
471 /* This cpu has no pending entries, but there are new entries */ 505 /*
472 if (!rdp->curlist && rdp->nxtlist) 506 * This cpu has pending rcu entries and the new batch
473 return 1; 507 * for then hasn't been started nor scheduled start
508 */
509 if (rcu_batch_after(rdp->batch, rcp->pending))
510 return 1;
511 }
474 512
475 /* This cpu has finished callbacks to invoke */ 513 /* This cpu has finished callbacks to invoke */
476 if (rdp->donelist) 514 if (rdp->donelist)
@@ -506,7 +544,7 @@ int rcu_needs_cpu(int cpu)
506 struct rcu_data *rdp = &per_cpu(rcu_data, cpu); 544 struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
507 struct rcu_data *rdp_bh = &per_cpu(rcu_bh_data, cpu); 545 struct rcu_data *rdp_bh = &per_cpu(rcu_bh_data, cpu);
508 546
509 return (!!rdp->curlist || !!rdp_bh->curlist || rcu_pending(cpu)); 547 return !!rdp->nxtlist || !!rdp_bh->nxtlist || rcu_pending(cpu);
510} 548}
511 549
512void rcu_check_callbacks(int cpu, int user) 550void rcu_check_callbacks(int cpu, int user)
@@ -553,8 +591,7 @@ static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
553 struct rcu_data *rdp) 591 struct rcu_data *rdp)
554{ 592{
555 memset(rdp, 0, sizeof(*rdp)); 593 memset(rdp, 0, sizeof(*rdp));
556 rdp->curtail = &rdp->curlist; 594 rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2] = &rdp->nxtlist;
557 rdp->nxttail = &rdp->nxtlist;
558 rdp->donetail = &rdp->donelist; 595 rdp->donetail = &rdp->donelist;
559 rdp->quiescbatch = rcp->completed; 596 rdp->quiescbatch = rcp->completed;
560 rdp->qs_pending = 0; 597 rdp->qs_pending = 0;