diff options
-rw-r--r-- | include/linux/ptr_ring.h | 63 |
1 files changed, 54 insertions, 9 deletions
diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h index 6c70444da3b9..6b2e0dd88569 100644 --- a/include/linux/ptr_ring.h +++ b/include/linux/ptr_ring.h | |||
@@ -34,11 +34,13 @@ | |||
34 | struct ptr_ring { | 34 | struct ptr_ring { |
35 | int producer ____cacheline_aligned_in_smp; | 35 | int producer ____cacheline_aligned_in_smp; |
36 | spinlock_t producer_lock; | 36 | spinlock_t producer_lock; |
37 | int consumer ____cacheline_aligned_in_smp; | 37 | int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ |
38 | int consumer_tail; /* next entry to invalidate */ | ||
38 | spinlock_t consumer_lock; | 39 | spinlock_t consumer_lock; |
39 | /* Shared consumer/producer data */ | 40 | /* Shared consumer/producer data */ |
40 | /* Read-only by both the producer and the consumer */ | 41 | /* Read-only by both the producer and the consumer */ |
41 | int size ____cacheline_aligned_in_smp; /* max entries in queue */ | 42 | int size ____cacheline_aligned_in_smp; /* max entries in queue */ |
43 | int batch; /* number of entries to consume in a batch */ | ||
42 | void **queue; | 44 | void **queue; |
43 | }; | 45 | }; |
44 | 46 | ||
@@ -170,7 +172,7 @@ static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) | |||
170 | static inline void *__ptr_ring_peek(struct ptr_ring *r) | 172 | static inline void *__ptr_ring_peek(struct ptr_ring *r) |
171 | { | 173 | { |
172 | if (likely(r->size)) | 174 | if (likely(r->size)) |
173 | return r->queue[r->consumer]; | 175 | return r->queue[r->consumer_head]; |
174 | return NULL; | 176 | return NULL; |
175 | } | 177 | } |
176 | 178 | ||
@@ -231,9 +233,38 @@ static inline bool ptr_ring_empty_bh(struct ptr_ring *r) | |||
231 | /* Must only be called after __ptr_ring_peek returned !NULL */ | 233 | /* Must only be called after __ptr_ring_peek returned !NULL */ |
232 | static inline void __ptr_ring_discard_one(struct ptr_ring *r) | 234 | static inline void __ptr_ring_discard_one(struct ptr_ring *r) |
233 | { | 235 | { |
234 | r->queue[r->consumer++] = NULL; | 236 | /* Fundamentally, what we want to do is update consumer |
235 | if (unlikely(r->consumer >= r->size)) | 237 | * index and zero out the entry so producer can reuse it. |
236 | r->consumer = 0; | 238 | * Doing it naively at each consume would be as simple as: |
239 | * r->queue[r->consumer++] = NULL; | ||
240 | * if (unlikely(r->consumer >= r->size)) | ||
241 | * r->consumer = 0; | ||
242 | * but that is suboptimal when the ring is full as producer is writing | ||
243 | * out new entries in the same cache line. Defer these updates until a | ||
244 | * batch of entries has been consumed. | ||
245 | */ | ||
246 | int head = r->consumer_head++; | ||
247 | |||
248 | /* Once we have processed enough entries invalidate them in | ||
249 | * the ring all at once so producer can reuse their space in the ring. | ||
250 | * We also do this when we reach end of the ring - not mandatory | ||
251 | * but helps keep the implementation simple. | ||
252 | */ | ||
253 | if (unlikely(r->consumer_head - r->consumer_tail >= r->batch || | ||
254 | r->consumer_head >= r->size)) { | ||
255 | /* Zero out entries in the reverse order: this way we touch the | ||
256 | * cache line that producer might currently be reading the last; | ||
257 | * producer won't make progress and touch other cache lines | ||
258 | * besides the first one until we write out all entries. | ||
259 | */ | ||
260 | while (likely(head >= r->consumer_tail)) | ||
261 | r->queue[head--] = NULL; | ||
262 | r->consumer_tail = r->consumer_head; | ||
263 | } | ||
264 | if (unlikely(r->consumer_head >= r->size)) { | ||
265 | r->consumer_head = 0; | ||
266 | r->consumer_tail = 0; | ||
267 | } | ||
237 | } | 268 | } |
238 | 269 | ||
239 | static inline void *__ptr_ring_consume(struct ptr_ring *r) | 270 | static inline void *__ptr_ring_consume(struct ptr_ring *r) |
@@ -345,14 +376,27 @@ static inline void **__ptr_ring_init_queue_alloc(int size, gfp_t gfp) | |||
345 | return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp); | 376 | return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp); |
346 | } | 377 | } |
347 | 378 | ||
379 | static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) | ||
380 | { | ||
381 | r->size = size; | ||
382 | r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); | ||
383 | /* We need to set batch at least to 1 to make logic | ||
384 | * in __ptr_ring_discard_one work correctly. | ||
385 | * Batching too much (because ring is small) would cause a lot of | ||
386 | * burstiness. Needs tuning, for now disable batching. | ||
387 | */ | ||
388 | if (r->batch > r->size / 2 || !r->batch) | ||
389 | r->batch = 1; | ||
390 | } | ||
391 | |||
348 | static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) | 392 | static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) |
349 | { | 393 | { |
350 | r->queue = __ptr_ring_init_queue_alloc(size, gfp); | 394 | r->queue = __ptr_ring_init_queue_alloc(size, gfp); |
351 | if (!r->queue) | 395 | if (!r->queue) |
352 | return -ENOMEM; | 396 | return -ENOMEM; |
353 | 397 | ||
354 | r->size = size; | 398 | __ptr_ring_set_size(r, size); |
355 | r->producer = r->consumer = 0; | 399 | r->producer = r->consumer_head = r->consumer_tail = 0; |
356 | spin_lock_init(&r->producer_lock); | 400 | spin_lock_init(&r->producer_lock); |
357 | spin_lock_init(&r->consumer_lock); | 401 | spin_lock_init(&r->consumer_lock); |
358 | 402 | ||
@@ -373,9 +417,10 @@ static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, | |||
373 | else if (destroy) | 417 | else if (destroy) |
374 | destroy(ptr); | 418 | destroy(ptr); |
375 | 419 | ||
376 | r->size = size; | 420 | __ptr_ring_set_size(r, size); |
377 | r->producer = producer; | 421 | r->producer = producer; |
378 | r->consumer = 0; | 422 | r->consumer_head = 0; |
423 | r->consumer_tail = 0; | ||
379 | old = r->queue; | 424 | old = r->queue; |
380 | r->queue = queue; | 425 | r->queue = queue; |
381 | 426 | ||