aboutsummaryrefslogtreecommitdiffstats
path: root/include/linux/ptr_ring.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/linux/ptr_ring.h')
-rw-r--r--include/linux/ptr_ring.h79
1 files changed, 47 insertions, 32 deletions
diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
index d72b2e7dd500..1883d6137e9b 100644
--- a/include/linux/ptr_ring.h
+++ b/include/linux/ptr_ring.h
@@ -45,9 +45,10 @@ struct ptr_ring {
45}; 45};
46 46
47/* Note: callers invoking this in a loop must use a compiler barrier, 47/* Note: callers invoking this in a loop must use a compiler barrier,
48 * for example cpu_relax(). If ring is ever resized, callers must hold 48 * for example cpu_relax().
49 * producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold 49 *
50 * producer_lock, the next call to __ptr_ring_produce may fail. 50 * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
51 * see e.g. ptr_ring_full.
51 */ 52 */
52static inline bool __ptr_ring_full(struct ptr_ring *r) 53static inline bool __ptr_ring_full(struct ptr_ring *r)
53{ 54{
@@ -113,7 +114,7 @@ static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
113 /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */ 114 /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */
114 smp_wmb(); 115 smp_wmb();
115 116
116 r->queue[r->producer++] = ptr; 117 WRITE_ONCE(r->queue[r->producer++], ptr);
117 if (unlikely(r->producer >= r->size)) 118 if (unlikely(r->producer >= r->size))
118 r->producer = 0; 119 r->producer = 0;
119 return 0; 120 return 0;
@@ -169,32 +170,36 @@ static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
169 return ret; 170 return ret;
170} 171}
171 172
172/* Note: callers invoking this in a loop must use a compiler barrier,
173 * for example cpu_relax(). Callers must take consumer_lock
174 * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
175 * If ring is never resized, and if the pointer is merely
176 * tested, there's no need to take the lock - see e.g. __ptr_ring_empty.
177 * However, if called outside the lock, and if some other CPU
178 * consumes ring entries at the same time, the value returned
179 * is not guaranteed to be correct.
180 * In this case - to avoid incorrectly detecting the ring
181 * as empty - the CPU consuming the ring entries is responsible
182 * for either consuming all ring entries until the ring is empty,
183 * or synchronizing with some other CPU and causing it to
184 * execute __ptr_ring_peek and/or consume the ring enteries
185 * after the synchronization point.
186 */
187static inline void *__ptr_ring_peek(struct ptr_ring *r) 173static inline void *__ptr_ring_peek(struct ptr_ring *r)
188{ 174{
189 if (likely(r->size)) 175 if (likely(r->size))
190 return r->queue[r->consumer_head]; 176 return READ_ONCE(r->queue[r->consumer_head]);
191 return NULL; 177 return NULL;
192} 178}
193 179
194/* See __ptr_ring_peek above for locking rules. */ 180/*
181 * Test ring empty status without taking any locks.
182 *
183 * NB: This is only safe to call if ring is never resized.
184 *
185 * However, if some other CPU consumes ring entries at the same time, the value
186 * returned is not guaranteed to be correct.
187 *
188 * In this case - to avoid incorrectly detecting the ring
189 * as empty - the CPU consuming the ring entries is responsible
190 * for either consuming all ring entries until the ring is empty,
191 * or synchronizing with some other CPU and causing it to
192 * re-test __ptr_ring_empty and/or consume the ring enteries
193 * after the synchronization point.
194 *
195 * Note: callers invoking this in a loop must use a compiler barrier,
196 * for example cpu_relax().
197 */
195static inline bool __ptr_ring_empty(struct ptr_ring *r) 198static inline bool __ptr_ring_empty(struct ptr_ring *r)
196{ 199{
197 return !__ptr_ring_peek(r); 200 if (likely(r->size))
201 return !r->queue[READ_ONCE(r->consumer_head)];
202 return true;
198} 203}
199 204
200static inline bool ptr_ring_empty(struct ptr_ring *r) 205static inline bool ptr_ring_empty(struct ptr_ring *r)
@@ -248,22 +253,28 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
248 /* Fundamentally, what we want to do is update consumer 253 /* Fundamentally, what we want to do is update consumer
249 * index and zero out the entry so producer can reuse it. 254 * index and zero out the entry so producer can reuse it.
250 * Doing it naively at each consume would be as simple as: 255 * Doing it naively at each consume would be as simple as:
251 * r->queue[r->consumer++] = NULL; 256 * consumer = r->consumer;
252 * if (unlikely(r->consumer >= r->size)) 257 * r->queue[consumer++] = NULL;
253 * r->consumer = 0; 258 * if (unlikely(consumer >= r->size))
259 * consumer = 0;
260 * r->consumer = consumer;
254 * but that is suboptimal when the ring is full as producer is writing 261 * but that is suboptimal when the ring is full as producer is writing
255 * out new entries in the same cache line. Defer these updates until a 262 * out new entries in the same cache line. Defer these updates until a
256 * batch of entries has been consumed. 263 * batch of entries has been consumed.
257 */ 264 */
258 int head = r->consumer_head++; 265 /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
266 * to work correctly.
267 */
268 int consumer_head = r->consumer_head;
269 int head = consumer_head++;
259 270
260 /* Once we have processed enough entries invalidate them in 271 /* Once we have processed enough entries invalidate them in
261 * the ring all at once so producer can reuse their space in the ring. 272 * the ring all at once so producer can reuse their space in the ring.
262 * We also do this when we reach end of the ring - not mandatory 273 * We also do this when we reach end of the ring - not mandatory
263 * but helps keep the implementation simple. 274 * but helps keep the implementation simple.
264 */ 275 */
265 if (unlikely(r->consumer_head - r->consumer_tail >= r->batch || 276 if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
266 r->consumer_head >= r->size)) { 277 consumer_head >= r->size)) {
267 /* Zero out entries in the reverse order: this way we touch the 278 /* Zero out entries in the reverse order: this way we touch the
268 * cache line that producer might currently be reading the last; 279 * cache line that producer might currently be reading the last;
269 * producer won't make progress and touch other cache lines 280 * producer won't make progress and touch other cache lines
@@ -271,12 +282,14 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
271 */ 282 */
272 while (likely(head >= r->consumer_tail)) 283 while (likely(head >= r->consumer_tail))
273 r->queue[head--] = NULL; 284 r->queue[head--] = NULL;
274 r->consumer_tail = r->consumer_head; 285 r->consumer_tail = consumer_head;
275 } 286 }
276 if (unlikely(r->consumer_head >= r->size)) { 287 if (unlikely(consumer_head >= r->size)) {
277 r->consumer_head = 0; 288 consumer_head = 0;
278 r->consumer_tail = 0; 289 r->consumer_tail = 0;
279 } 290 }
291 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
292 WRITE_ONCE(r->consumer_head, consumer_head);
280} 293}
281 294
282static inline void *__ptr_ring_consume(struct ptr_ring *r) 295static inline void *__ptr_ring_consume(struct ptr_ring *r)
@@ -527,7 +540,9 @@ static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
527 goto done; 540 goto done;
528 } 541 }
529 r->queue[head] = batch[--n]; 542 r->queue[head] = batch[--n];
530 r->consumer_tail = r->consumer_head = head; 543 r->consumer_tail = head;
544 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
545 WRITE_ONCE(r->consumer_head, head);
531 } 546 }
532 547
533done: 548done: