aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael S. Tsirkin <mst@redhat.com>2016-06-13 16:54:45 -0400
committerDavid S. Miller <davem@davemloft.net>2016-06-15 16:58:27 -0400
commit5d49de532002f02755decd1758aac53063a68625 (patch)
treec021cc8b8f5a729624c39a1cc38234ae17ea3f29
parentad69f35d1dc0a79f86627ca56e01f86512602a49 (diff)
ptr_ring: resize support
This adds ring resize support. Seems to be necessary as users such as tun allow userspace control over queue size. If resize is used, this costs us ability to peek at queue without consumer lock - should not be a big deal as peek and consumer are usually run on the same CPU. If ring is made bigger, ring contents is preserved. If ring is made smaller, extra pointers are passed to an optional destructor callback. Cleanup function also gains destructor callback such that all pointers in queue can be cleaned up. This changes some APIs but we don't have any users yet, so it won't break bisect. Signed-off-by: Michael S. Tsirkin <mst@redhat.com> Acked-by: Jesper Dangaard Brouer <brouer@redhat.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--include/linux/ptr_ring.h157
1 files changed, 143 insertions, 14 deletions
diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
index 633406f9af8e..562a65e8bcc0 100644
--- a/include/linux/ptr_ring.h
+++ b/include/linux/ptr_ring.h
@@ -43,9 +43,9 @@ struct ptr_ring {
43}; 43};
44 44
45/* Note: callers invoking this in a loop must use a compiler barrier, 45/* Note: callers invoking this in a loop must use a compiler barrier,
46 * for example cpu_relax(). 46 * for example cpu_relax(). If ring is ever resized, callers must hold
47 * Callers don't need to take producer lock - if they don't 47 * producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold
48 * the next call to __ptr_ring_produce may fail. 48 * producer_lock, the next call to __ptr_ring_produce may fail.
49 */ 49 */
50static inline bool __ptr_ring_full(struct ptr_ring *r) 50static inline bool __ptr_ring_full(struct ptr_ring *r)
51{ 51{
@@ -54,16 +54,55 @@ static inline bool __ptr_ring_full(struct ptr_ring *r)
54 54
55static inline bool ptr_ring_full(struct ptr_ring *r) 55static inline bool ptr_ring_full(struct ptr_ring *r)
56{ 56{
57 barrier(); 57 bool ret;
58 return __ptr_ring_full(r); 58
59 spin_lock(&r->producer_lock);
60 ret = __ptr_ring_full(r);
61 spin_unlock(&r->producer_lock);
62
63 return ret;
64}
65
66static inline bool ptr_ring_full_irq(struct ptr_ring *r)
67{
68 bool ret;
69
70 spin_lock_irq(&r->producer_lock);
71 ret = __ptr_ring_full(r);
72 spin_unlock_irq(&r->producer_lock);
73
74 return ret;
75}
76
77static inline bool ptr_ring_full_any(struct ptr_ring *r)
78{
79 unsigned long flags;
80 bool ret;
81
82 spin_lock_irqsave(&r->producer_lock, flags);
83 ret = __ptr_ring_full(r);
84 spin_unlock_irqrestore(&r->producer_lock, flags);
85
86 return ret;
87}
88
89static inline bool ptr_ring_full_bh(struct ptr_ring *r)
90{
91 bool ret;
92
93 spin_lock_bh(&r->producer_lock);
94 ret = __ptr_ring_full(r);
95 spin_unlock_bh(&r->producer_lock);
96
97 return ret;
59} 98}
60 99
61/* Note: callers invoking this in a loop must use a compiler barrier, 100/* Note: callers invoking this in a loop must use a compiler barrier,
62 * for example cpu_relax(). 101 * for example cpu_relax(). Callers must hold producer_lock.
63 */ 102 */
64static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) 103static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
65{ 104{
66 if (__ptr_ring_full(r)) 105 if (r->queue[r->producer])
67 return -ENOSPC; 106 return -ENOSPC;
68 107
69 r->queue[r->producer++] = ptr; 108 r->queue[r->producer++] = ptr;
@@ -120,20 +159,68 @@ static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
120/* Note: callers invoking this in a loop must use a compiler barrier, 159/* Note: callers invoking this in a loop must use a compiler barrier,
121 * for example cpu_relax(). Callers must take consumer_lock 160 * for example cpu_relax(). Callers must take consumer_lock
122 * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL. 161 * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
123 * There's no need for a lock if pointer is merely tested - see e.g. 162 * If ring is never resized, and if the pointer is merely
124 * ptr_ring_empty. 163 * tested, there's no need to take the lock - see e.g. __ptr_ring_empty.
125 */ 164 */
126static inline void *__ptr_ring_peek(struct ptr_ring *r) 165static inline void *__ptr_ring_peek(struct ptr_ring *r)
127{ 166{
128 return r->queue[r->consumer]; 167 return r->queue[r->consumer];
129} 168}
130 169
131static inline bool ptr_ring_empty(struct ptr_ring *r) 170/* Note: callers invoking this in a loop must use a compiler barrier,
171 * for example cpu_relax(). Callers must take consumer_lock
172 * if the ring is ever resized - see e.g. ptr_ring_empty.
173 */
174static inline bool __ptr_ring_empty(struct ptr_ring *r)
132{ 175{
133 barrier();
134 return !__ptr_ring_peek(r); 176 return !__ptr_ring_peek(r);
135} 177}
136 178
179static inline bool ptr_ring_empty(struct ptr_ring *r)
180{
181 bool ret;
182
183 spin_lock(&r->consumer_lock);
184 ret = __ptr_ring_empty(r);
185 spin_unlock(&r->consumer_lock);
186
187 return ret;
188}
189
190static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
191{
192 bool ret;
193
194 spin_lock_irq(&r->consumer_lock);
195 ret = __ptr_ring_empty(r);
196 spin_unlock_irq(&r->consumer_lock);
197
198 return ret;
199}
200
201static inline bool ptr_ring_empty_any(struct ptr_ring *r)
202{
203 unsigned long flags;
204 bool ret;
205
206 spin_lock_irqsave(&r->consumer_lock, flags);
207 ret = __ptr_ring_empty(r);
208 spin_unlock_irqrestore(&r->consumer_lock, flags);
209
210 return ret;
211}
212
213static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
214{
215 bool ret;
216
217 spin_lock_bh(&r->consumer_lock);
218 ret = __ptr_ring_empty(r);
219 spin_unlock_bh(&r->consumer_lock);
220
221 return ret;
222}
223
137/* Must only be called after __ptr_ring_peek returned !NULL */ 224/* Must only be called after __ptr_ring_peek returned !NULL */
138static inline void __ptr_ring_discard_one(struct ptr_ring *r) 225static inline void __ptr_ring_discard_one(struct ptr_ring *r)
139{ 226{
@@ -241,10 +328,14 @@ static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
241 __PTR_RING_PEEK_CALL_v; \ 328 __PTR_RING_PEEK_CALL_v; \
242}) 329})
243 330
331static inline void **__ptr_ring_init_queue_alloc(int size, gfp_t gfp)
332{
333 return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp);
334}
335
244static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) 336static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
245{ 337{
246 r->queue = kzalloc(ALIGN(size * sizeof *(r->queue), SMP_CACHE_BYTES), 338 r->queue = __ptr_ring_init_queue_alloc(size, gfp);
247 gfp);
248 if (!r->queue) 339 if (!r->queue)
249 return -ENOMEM; 340 return -ENOMEM;
250 341
@@ -256,8 +347,46 @@ static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
256 return 0; 347 return 0;
257} 348}
258 349
259static inline void ptr_ring_cleanup(struct ptr_ring *r) 350static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
351 void (*destroy)(void *))
352{
353 unsigned long flags;
354 int producer = 0;
355 void **queue = __ptr_ring_init_queue_alloc(size, gfp);
356 void **old;
357 void *ptr;
358
359 if (!queue)
360 return -ENOMEM;
361
362 spin_lock_irqsave(&(r)->producer_lock, flags);
363
364 while ((ptr = ptr_ring_consume(r)))
365 if (producer < size)
366 queue[producer++] = ptr;
367 else if (destroy)
368 destroy(ptr);
369
370 r->size = size;
371 r->producer = producer;
372 r->consumer = 0;
373 old = r->queue;
374 r->queue = queue;
375
376 spin_unlock_irqrestore(&(r)->producer_lock, flags);
377
378 kfree(old);
379
380 return 0;
381}
382
383static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
260{ 384{
385 void *ptr;
386
387 if (destroy)
388 while ((ptr = ptr_ring_consume(r)))
389 destroy(ptr);
261 kfree(r->queue); 390 kfree(r->queue);
262} 391}
263 392