aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael S. Tsirkin <mst@redhat.com>2018-01-25 18:36:27 -0500
committerDavid S. Miller <davem@davemloft.net>2018-01-29 12:02:53 -0500
commit406de7555424d93849166684d0bd172743d2a30c (patch)
tree8f440597d97aade025b56f2127d0a39bb60fd124
parent7ece54a60ee2ba7a386308cae73c790bd580589c (diff)
ptr_ring: keep consumer_head valid at all times
The comment near __ptr_ring_peek says: * If ring is never resized, and if the pointer is merely * tested, there's no need to take the lock - see e.g. __ptr_ring_empty. but this was in fact never possible since consumer_head would sometimes point outside the ring. Refactor the code so that it's always pointing within a ring. Fixes: c5ad119fb6c09 ("net: sched: pfifo_fast use skb_array") Signed-off-by: Michael S. Tsirkin <mst@redhat.com> Acked-by: John Fastabend <john.fastabend@gmail.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--include/linux/ptr_ring.h25
1 files changed, 16 insertions, 9 deletions
diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
index 9ca1726ff963..5ebcdd40df99 100644
--- a/include/linux/ptr_ring.h
+++ b/include/linux/ptr_ring.h
@@ -248,22 +248,28 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
248 /* Fundamentally, what we want to do is update consumer 248 /* Fundamentally, what we want to do is update consumer
249 * index and zero out the entry so producer can reuse it. 249 * index and zero out the entry so producer can reuse it.
250 * Doing it naively at each consume would be as simple as: 250 * Doing it naively at each consume would be as simple as:
251 * r->queue[r->consumer++] = NULL; 251 * consumer = r->consumer;
252 * if (unlikely(r->consumer >= r->size)) 252 * r->queue[consumer++] = NULL;
253 * r->consumer = 0; 253 * if (unlikely(consumer >= r->size))
254 * consumer = 0;
255 * r->consumer = consumer;
254 * but that is suboptimal when the ring is full as producer is writing 256 * but that is suboptimal when the ring is full as producer is writing
255 * out new entries in the same cache line. Defer these updates until a 257 * out new entries in the same cache line. Defer these updates until a
256 * batch of entries has been consumed. 258 * batch of entries has been consumed.
257 */ 259 */
258 int head = r->consumer_head++; 260 /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
261 * to work correctly.
262 */
263 int consumer_head = r->consumer_head;
264 int head = consumer_head++;
259 265
260 /* Once we have processed enough entries invalidate them in 266 /* Once we have processed enough entries invalidate them in
261 * the ring all at once so producer can reuse their space in the ring. 267 * the ring all at once so producer can reuse their space in the ring.
262 * We also do this when we reach end of the ring - not mandatory 268 * We also do this when we reach end of the ring - not mandatory
263 * but helps keep the implementation simple. 269 * but helps keep the implementation simple.
264 */ 270 */
265 if (unlikely(r->consumer_head - r->consumer_tail >= r->batch || 271 if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
266 r->consumer_head >= r->size)) { 272 consumer_head >= r->size)) {
267 /* Zero out entries in the reverse order: this way we touch the 273 /* Zero out entries in the reverse order: this way we touch the
268 * cache line that producer might currently be reading the last; 274 * cache line that producer might currently be reading the last;
269 * producer won't make progress and touch other cache lines 275 * producer won't make progress and touch other cache lines
@@ -271,12 +277,13 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
271 */ 277 */
272 while (likely(head >= r->consumer_tail)) 278 while (likely(head >= r->consumer_tail))
273 r->queue[head--] = NULL; 279 r->queue[head--] = NULL;
274 r->consumer_tail = r->consumer_head; 280 r->consumer_tail = consumer_head;
275 } 281 }
276 if (unlikely(r->consumer_head >= r->size)) { 282 if (unlikely(consumer_head >= r->size)) {
277 r->consumer_head = 0; 283 consumer_head = 0;
278 r->consumer_tail = 0; 284 r->consumer_tail = 0;
279 } 285 }
286 r->consumer_head = consumer_head;
280} 287}
281 288
282static inline void *__ptr_ring_consume(struct ptr_ring *r) 289static inline void *__ptr_ring_consume(struct ptr_ring *r)