aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGlenn Elliott <gelliott@cs.unc.edu>2014-06-11 13:15:35 -0400
committerGlenn Elliott <gelliott@cs.unc.edu>2014-06-11 14:25:29 -0400
commit5692d325b5c1b443fe91b126e686b314cc313976 (patch)
treeef5d6e8310df22c1d221df358b6e78ac89b73ce1
parentbefa9bb7fb4c041891e52243dbad064fbcb05123 (diff)
Enable multi-writer support underlying ring buf.
This patch adds APIs to support multi-writer ring buffers. This only affects ring.h, and does not change how pgm ring-based edges use ring buffers.
-rw-r--r--include/ring.h53
1 files changed, 40 insertions, 13 deletions
diff --git a/include/ring.h b/include/ring.h
index 2800bb2..8a8368f 100644
--- a/include/ring.h
+++ b/include/ring.h
@@ -79,7 +79,7 @@ struct ring
79 79
80 Return: 0 on success. -1 on error. 80 Return: 0 on success. -1 on error.
81 */ 81 */
82static int init_ring(struct ring* r, size_t min_count, size_t size) 82static inline int init_ring(struct ring* r, size_t min_count, size_t size)
83{ 83{
84 size_t count; 84 size_t count;
85 85
@@ -115,7 +115,7 @@ static int init_ring(struct ring* r, size_t min_count, size_t size)
115 1) slot_buf and data_buf have been allocated and are of sufficient size. 115 1) slot_buf and data_buf have been allocated and are of sufficient size.
116 2) 'count' is a power of two. 116 2) 'count' is a power of two.
117*/ 117*/
118static void __init_ring(struct ring* r, size_t count, size_t size, 118static inline void __init_ring(struct ring* r, size_t count, size_t size,
119 char* slot_buf, void* data_buf) 119 char* slot_buf, void* data_buf)
120{ 120{
121 r->nmemb = count; 121 r->nmemb = count;
@@ -135,7 +135,7 @@ static void __init_ring(struct ring* r, size_t count, size_t size,
135/* 135/*
136 Free ring buffer resources. 136 Free ring buffer resources.
137 */ 137 */
138static void free_ring(struct ring* r) 138static inline void free_ring(struct ring* r)
139{ 139{
140 if (!r) 140 if (!r)
141 return; 141 return;
@@ -156,15 +156,16 @@ static inline int is_ring_full(struct ring* r)
156 return (r->nfree == 0); 156 return (r->nfree == 0);
157} 157}
158 158
159#if 0 159
160/* multi-writer varients for adding elements to ring buf */ 160/* multi-writer varients for adding elements to ring buf */
161static inline void* __begin_write_ring(struct ring* r) 161
162static inline void* __begin_mwrite_ring(struct ring* r)
162{ 163{
163 ssize_t nfree; 164 ssize_t nfree;
164 size_t idx; 165 size_t idx;
165 void* dst; 166 void* dst;
166 167
167 if(*((volatile ssize_t*)(&r->nfree)) <= 0) 168 if(r->nfree <= 0)
168 return NULL; 169 return NULL;
169 170
170 nfree = __sync_fetch_and_sub(&r->nfree, 1); 171 nfree = __sync_fetch_and_sub(&r->nfree, 1);
@@ -181,16 +182,15 @@ static inline void* __begin_write_ring(struct ring* r)
181 return (void*)dst; 182 return (void*)dst;
182} 183}
183 184
184static inline void __end_ring_write(struct ring* r, void* addr) 185static inline void __end_mwrite_ring(struct ring* r, void* addr)
185{ 186{
186 size_t idx = ((char*)addr - r->buf) / r->memb_sz; 187 size_t idx = ((char*)addr - r->buf) / r->memb_sz;
187 r->slots[idx] = SLOT_READY; 188 r->slots[idx] = SLOT_READY;
188 __sync_synchronize(); 189 __sync_synchronize(); /* memory barrier */
189} 190}
190#endif
191 191
192 192
193/* single-writer varients */ 193/* single-writer varients (optimized for single writer) */
194 194
195static inline void* __begin_write_ring(struct ring* r) 195static inline void* __begin_write_ring(struct ring* r)
196{ 196{
@@ -207,7 +207,7 @@ static inline void* __begin_write_ring(struct ring* r)
207 return (void*)dst; 207 return (void*)dst;
208} 208}
209 209
210static inline void __end_ring_write(struct ring* r, void* addr) 210static inline void __end_write_ring(struct ring* r, void* addr)
211{ 211{
212 size_t idx = ((char*)addr - r->buf) / r->memb_sz; 212 size_t idx = ((char*)addr - r->buf) / r->memb_sz;
213 r->slots[idx] = SLOT_READY; 213 r->slots[idx] = SLOT_READY;
@@ -266,6 +266,23 @@ static inline void __end_read_ring(struct ring* r, void* addr)
266*/ 266*/
267 267
268/* 268/*
269 Macro for enqueuing an element to the ring buffer,
270 with multi-writer support.
271 [in] r: Pointer to struct ring
272 [in] src: Value to write (not pointer to value!)
273 */
274#define mwrite_ring(r, src) \
275do{ \
276 struct ring* __r = (r); \
277 typeof((src))* __dst; \
278 check_size(__r, __dst); \
279 do { __dst = (typeof(__dst)) __begin_mwrite_ring(__r); } \
280 while (__dst == NULL); \
281 *__dst = (src); \
282 __end_mwrite_ring(__r, __dst); \
283}while(0)
284
285/*
269 Macro for enqueuing an element to the ring buffer. 286 Macro for enqueuing an element to the ring buffer.
270 [in] r: Pointer to struct ring 287 [in] r: Pointer to struct ring
271 [in] src: Value to write (not pointer to value!) 288 [in] src: Value to write (not pointer to value!)
@@ -278,7 +295,7 @@ do{ \
278 do { __dst = (typeof(__dst)) __begin_write_ring(__r); } \ 295 do { __dst = (typeof(__dst)) __begin_write_ring(__r); } \
279 while (__dst == NULL); \ 296 while (__dst == NULL); \
280 *__dst = (src); \ 297 *__dst = (src); \
281 __end_ring_write(__r, __dst); \ 298 __end_write_ring(__r, __dst); \
282}while(0) 299}while(0)
283 300
284/* 301/*
@@ -300,6 +317,16 @@ do{ \
300 317
301/* Write/Read routines for plain vector/array types. These use memcpy. */ 318/* Write/Read routines for plain vector/array types. These use memcpy. */
302 319
320#define mwrite_vec_ring(r, src_vec, sz) \
321do{ \
322 struct ring* __r = (r); \
323 void* __dst; \
324 check_size_vec(__r, sz); \
325 do { __dst = __begin_mwrite_ring(__r); } while (__dst == NULL); \
326 memcpy(__dst, src_vec, sz); \
327 __end_mwrite_ring(__r, __dst); \
328}while(0)
329
303#define write_vec_ring(r, src_vec, sz) \ 330#define write_vec_ring(r, src_vec, sz) \
304do{ \ 331do{ \
305 struct ring* __r = (r); \ 332 struct ring* __r = (r); \
@@ -307,7 +334,7 @@ do{ \
307 check_size_vec(__r, sz); \ 334 check_size_vec(__r, sz); \
308 do { __dst = __begin_write_ring(__r); } while (__dst == NULL); \ 335 do { __dst = __begin_write_ring(__r); } while (__dst == NULL); \
309 memcpy(__dst, src_vec, sz); \ 336 memcpy(__dst, src_vec, sz); \
310 __end_ring_write(__r, __dst); \ 337 __end_write_ring(__r, __dst); \
311}while(0) 338}while(0)
312 339
313#define read_vec_ring(r, dst_vec, sz) \ 340#define read_vec_ring(r, dst_vec, sz) \