aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/linux/ptr_ring.h63
1 files changed, 54 insertions, 9 deletions
diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
index 6c70444da3b9..6b2e0dd88569 100644
--- a/include/linux/ptr_ring.h
+++ b/include/linux/ptr_ring.h
@@ -34,11 +34,13 @@
34struct ptr_ring { 34struct ptr_ring {
35 int producer ____cacheline_aligned_in_smp; 35 int producer ____cacheline_aligned_in_smp;
36 spinlock_t producer_lock; 36 spinlock_t producer_lock;
37 int consumer ____cacheline_aligned_in_smp; 37 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
38 int consumer_tail; /* next entry to invalidate */
38 spinlock_t consumer_lock; 39 spinlock_t consumer_lock;
39 /* Shared consumer/producer data */ 40 /* Shared consumer/producer data */
40 /* Read-only by both the producer and the consumer */ 41 /* Read-only by both the producer and the consumer */
41 int size ____cacheline_aligned_in_smp; /* max entries in queue */ 42 int size ____cacheline_aligned_in_smp; /* max entries in queue */
43 int batch; /* number of entries to consume in a batch */
42 void **queue; 44 void **queue;
43}; 45};
44 46
@@ -170,7 +172,7 @@ static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
170static inline void *__ptr_ring_peek(struct ptr_ring *r) 172static inline void *__ptr_ring_peek(struct ptr_ring *r)
171{ 173{
172 if (likely(r->size)) 174 if (likely(r->size))
173 return r->queue[r->consumer]; 175 return r->queue[r->consumer_head];
174 return NULL; 176 return NULL;
175} 177}
176 178
@@ -231,9 +233,38 @@ static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
231/* Must only be called after __ptr_ring_peek returned !NULL */ 233/* Must only be called after __ptr_ring_peek returned !NULL */
232static inline void __ptr_ring_discard_one(struct ptr_ring *r) 234static inline void __ptr_ring_discard_one(struct ptr_ring *r)
233{ 235{
234 r->queue[r->consumer++] = NULL; 236 /* Fundamentally, what we want to do is update consumer
235 if (unlikely(r->consumer >= r->size)) 237 * index and zero out the entry so producer can reuse it.
236 r->consumer = 0; 238 * Doing it naively at each consume would be as simple as:
239 * r->queue[r->consumer++] = NULL;
240 * if (unlikely(r->consumer >= r->size))
241 * r->consumer = 0;
242 * but that is suboptimal when the ring is full as producer is writing
243 * out new entries in the same cache line. Defer these updates until a
244 * batch of entries has been consumed.
245 */
246 int head = r->consumer_head++;
247
248 /* Once we have processed enough entries invalidate them in
249 * the ring all at once so producer can reuse their space in the ring.
250 * We also do this when we reach end of the ring - not mandatory
251 * but helps keep the implementation simple.
252 */
253 if (unlikely(r->consumer_head - r->consumer_tail >= r->batch ||
254 r->consumer_head >= r->size)) {
255 /* Zero out entries in the reverse order: this way we touch the
256 * cache line that producer might currently be reading the last;
257 * producer won't make progress and touch other cache lines
258 * besides the first one until we write out all entries.
259 */
260 while (likely(head >= r->consumer_tail))
261 r->queue[head--] = NULL;
262 r->consumer_tail = r->consumer_head;
263 }
264 if (unlikely(r->consumer_head >= r->size)) {
265 r->consumer_head = 0;
266 r->consumer_tail = 0;
267 }
237} 268}
238 269
239static inline void *__ptr_ring_consume(struct ptr_ring *r) 270static inline void *__ptr_ring_consume(struct ptr_ring *r)
@@ -345,14 +376,27 @@ static inline void **__ptr_ring_init_queue_alloc(int size, gfp_t gfp)
345 return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp); 376 return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp);
346} 377}
347 378
379static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
380{
381 r->size = size;
382 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
383 /* We need to set batch at least to 1 to make logic
384 * in __ptr_ring_discard_one work correctly.
385 * Batching too much (because ring is small) would cause a lot of
386 * burstiness. Needs tuning, for now disable batching.
387 */
388 if (r->batch > r->size / 2 || !r->batch)
389 r->batch = 1;
390}
391
348static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) 392static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
349{ 393{
350 r->queue = __ptr_ring_init_queue_alloc(size, gfp); 394 r->queue = __ptr_ring_init_queue_alloc(size, gfp);
351 if (!r->queue) 395 if (!r->queue)
352 return -ENOMEM; 396 return -ENOMEM;
353 397
354 r->size = size; 398 __ptr_ring_set_size(r, size);
355 r->producer = r->consumer = 0; 399 r->producer = r->consumer_head = r->consumer_tail = 0;
356 spin_lock_init(&r->producer_lock); 400 spin_lock_init(&r->producer_lock);
357 spin_lock_init(&r->consumer_lock); 401 spin_lock_init(&r->consumer_lock);
358 402
@@ -373,9 +417,10 @@ static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
373 else if (destroy) 417 else if (destroy)
374 destroy(ptr); 418 destroy(ptr);
375 419
376 r->size = size; 420 __ptr_ring_set_size(r, size);
377 r->producer = producer; 421 r->producer = producer;
378 r->consumer = 0; 422 r->consumer_head = 0;
423 r->consumer_tail = 0;
379 old = r->queue; 424 old = r->queue;
380 r->queue = queue; 425 r->queue = queue;
381 426