aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSteffen Klassert <steffen.klassert@secunet.com>2010-07-07 09:32:39 -0400
committerHerbert Xu <herbert@gondor.apana.org.au>2010-07-14 08:29:30 -0400
commit5f1a8c1bc724498ff32acbd59ed5263275676b9d (patch)
tree0aa917ae98ebf20ec865930b75d0b16841b0b28f
parent83f619f3c8abb82cac9158cf23c656ec5c184607 (diff)
padata: simplify serialization mechanism
We count the number of processed objects on a percpu basis, so we need to go through all the percpu reorder queues to calculate the sequence number of the next object that needs serialization. This patch changes this to count the number of processed objects global. So we can calculate the sequence number and the percpu reorder queue of the next object that needs serialization without searching through the percpu reorder queues. This avoids some accesses to memory of foreign cpus. Signed-off-by: Steffen Klassert <steffen.klassert@secunet.com> Signed-off-by: Herbert Xu <herbert@gondor.apana.org.au>
-rw-r--r--include/linux/padata.h6
-rw-r--r--kernel/padata.c71
2 files changed, 22 insertions, 55 deletions
diff --git a/include/linux/padata.h b/include/linux/padata.h
index e4c17f9b7c9e..8844b851191e 100644
--- a/include/linux/padata.h
+++ b/include/linux/padata.h
@@ -67,7 +67,6 @@ struct padata_list {
67 * @pwork: work struct for parallelization. 67 * @pwork: work struct for parallelization.
68 * @swork: work struct for serialization. 68 * @swork: work struct for serialization.
69 * @pd: Backpointer to the internal control structure. 69 * @pd: Backpointer to the internal control structure.
70 * @num_obj: Number of objects that are processed by this cpu.
71 * @cpu_index: Index of the cpu. 70 * @cpu_index: Index of the cpu.
72 */ 71 */
73struct padata_queue { 72struct padata_queue {
@@ -77,7 +76,6 @@ struct padata_queue {
77 struct work_struct pwork; 76 struct work_struct pwork;
78 struct work_struct swork; 77 struct work_struct swork;
79 struct parallel_data *pd; 78 struct parallel_data *pd;
80 atomic_t num_obj;
81 int cpu_index; 79 int cpu_index;
82}; 80};
83 81
@@ -93,6 +91,7 @@ struct padata_queue {
93 * @max_seq_nr: Maximal used sequence number. 91 * @max_seq_nr: Maximal used sequence number.
94 * @cpumask: cpumask in use. 92 * @cpumask: cpumask in use.
95 * @lock: Reorder lock. 93 * @lock: Reorder lock.
94 * @processed: Number of already processed objects.
96 * @timer: Reorder timer. 95 * @timer: Reorder timer.
97 */ 96 */
98struct parallel_data { 97struct parallel_data {
@@ -103,7 +102,8 @@ struct parallel_data {
103 atomic_t refcnt; 102 atomic_t refcnt;
104 unsigned int max_seq_nr; 103 unsigned int max_seq_nr;
105 cpumask_var_t cpumask; 104 cpumask_var_t cpumask;
106 spinlock_t lock; 105 spinlock_t lock ____cacheline_aligned;
106 unsigned int processed;
107 struct timer_list timer; 107 struct timer_list timer;
108}; 108};
109 109
diff --git a/kernel/padata.c b/kernel/padata.c
index ae8defcf0622..450d67d394b0 100644
--- a/kernel/padata.c
+++ b/kernel/padata.c
@@ -170,79 +170,47 @@ EXPORT_SYMBOL(padata_do_parallel);
170 */ 170 */
171static struct padata_priv *padata_get_next(struct parallel_data *pd) 171static struct padata_priv *padata_get_next(struct parallel_data *pd)
172{ 172{
173 int cpu, num_cpus, empty, calc_seq_nr; 173 int cpu, num_cpus;
174 int seq_nr, next_nr, overrun, next_overrun; 174 int next_nr, next_index;
175 struct padata_queue *queue, *next_queue; 175 struct padata_queue *queue, *next_queue;
176 struct padata_priv *padata; 176 struct padata_priv *padata;
177 struct padata_list *reorder; 177 struct padata_list *reorder;
178 178
179 empty = 0;
180 next_nr = -1;
181 next_overrun = 0;
182 next_queue = NULL;
183
184 num_cpus = cpumask_weight(pd->cpumask); 179 num_cpus = cpumask_weight(pd->cpumask);
185 180
186 for_each_cpu(cpu, pd->cpumask) { 181 /*
187 queue = per_cpu_ptr(pd->queue, cpu); 182 * Calculate the percpu reorder queue and the sequence
188 reorder = &queue->reorder; 183 * number of the next object.
189 184 */
190 /* 185 next_nr = pd->processed;
191 * Calculate the seq_nr of the object that should be 186 next_index = next_nr % num_cpus;
192 * next in this reorder queue. 187 cpu = padata_index_to_cpu(pd, next_index);
193 */ 188 next_queue = per_cpu_ptr(pd->queue, cpu);
194 overrun = 0; 189
195 calc_seq_nr = (atomic_read(&queue->num_obj) * num_cpus) 190 if (unlikely(next_nr > pd->max_seq_nr)) {
196 + queue->cpu_index; 191 next_nr = next_nr - pd->max_seq_nr - 1;
197 192 next_index = next_nr % num_cpus;
198 if (unlikely(calc_seq_nr > pd->max_seq_nr)) { 193 cpu = padata_index_to_cpu(pd, next_index);
199 calc_seq_nr = calc_seq_nr - pd->max_seq_nr - 1; 194 next_queue = per_cpu_ptr(pd->queue, cpu);
200 overrun = 1; 195 pd->processed = 0;
201 }
202
203 if (!list_empty(&reorder->list)) {
204 padata = list_entry(reorder->list.next,
205 struct padata_priv, list);
206
207 seq_nr = padata->seq_nr;
208 BUG_ON(calc_seq_nr != seq_nr);
209 } else {
210 seq_nr = calc_seq_nr;
211 empty++;
212 }
213
214 if (next_nr < 0 || seq_nr < next_nr
215 || (next_overrun && !overrun)) {
216 next_nr = seq_nr;
217 next_overrun = overrun;
218 next_queue = queue;
219 }
220 } 196 }
221 197
222 padata = NULL; 198 padata = NULL;
223 199
224 if (empty == num_cpus)
225 goto out;
226
227 reorder = &next_queue->reorder; 200 reorder = &next_queue->reorder;
228 201
229 if (!list_empty(&reorder->list)) { 202 if (!list_empty(&reorder->list)) {
230 padata = list_entry(reorder->list.next, 203 padata = list_entry(reorder->list.next,
231 struct padata_priv, list); 204 struct padata_priv, list);
232 205
233 if (unlikely(next_overrun)) { 206 BUG_ON(next_nr != padata->seq_nr);
234 for_each_cpu(cpu, pd->cpumask) {
235 queue = per_cpu_ptr(pd->queue, cpu);
236 atomic_set(&queue->num_obj, 0);
237 }
238 }
239 207
240 spin_lock(&reorder->lock); 208 spin_lock(&reorder->lock);
241 list_del_init(&padata->list); 209 list_del_init(&padata->list);
242 atomic_dec(&pd->reorder_objects); 210 atomic_dec(&pd->reorder_objects);
243 spin_unlock(&reorder->lock); 211 spin_unlock(&reorder->lock);
244 212
245 atomic_inc(&next_queue->num_obj); 213 pd->processed++;
246 214
247 goto out; 215 goto out;
248 } 216 }
@@ -430,7 +398,6 @@ static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
430 398
431 INIT_WORK(&queue->pwork, padata_parallel_worker); 399 INIT_WORK(&queue->pwork, padata_parallel_worker);
432 INIT_WORK(&queue->swork, padata_serial_worker); 400 INIT_WORK(&queue->swork, padata_serial_worker);
433 atomic_set(&queue->num_obj, 0);
434 } 401 }
435 402
436 num_cpus = cpumask_weight(pd->cpumask); 403 num_cpus = cpumask_weight(pd->cpumask);