diff options
author | Glenn Elliott <gelliott@cs.unc.edu> | 2014-05-27 20:22:37 -0400 |
---|---|---|
committer | Glenn Elliott <gelliott@cs.unc.edu> | 2014-05-27 20:29:12 -0400 |
commit | 8df9e5e43e5e9b498922e59d897ea7e8c8137396 (patch) | |
tree | 4220ad432de5be2998f38cceb4a43cec8755c21c | |
parent | da4ef079b82721a7f56d80a63a5a81f8a0be5b4b (diff) |
Add ring buffer edge IPC.
This patch adds a ring buffer IPC to implement very fast data
passing between nodes within the same address space (same process).
The ring buffer can be used effectively to pass a primative type
value (e.g., int or pointer) or struct between producer and consumer.
There are restrictions:
1) Producer and consumer must be within the same process.
2) The same amount of data must be produced and consumed each
node invocation. That is, a producer cannot write 4 bytes
one invocation and then 8 bytes the next.
TODO: Support ring buffer IPC on shared memory (allow producer
and consumer to be in different processes).
-rw-r--r-- | Makefile | 9 | ||||
-rw-r--r-- | include/pgm.h | 12 | ||||
-rw-r--r-- | include/ring.h | 321 | ||||
-rw-r--r-- | src/pgm.cpp | 227 | ||||
-rw-r--r-- | tools/ringtest.cpp | 153 |
5 files changed, 694 insertions, 28 deletions
@@ -23,7 +23,7 @@ LIBPGM = . | |||
23 | # compiler flags | 23 | # compiler flags |
24 | flags-std = -std=gnu++11 | 24 | flags-std = -std=gnu++11 |
25 | flags-optim = -O2 -march=native | 25 | flags-optim = -O2 -march=native |
26 | flags-debug = -Wall -Werror -Wno-unused-function | 26 | flags-debug = -Wall -Werror -Wno-unused-function -Wno-sign-compare |
27 | flags-api = -D_XOPEN_SOURCE=600 -D_GNU_SOURCE -pthread | 27 | flags-api = -D_XOPEN_SOURCE=600 -D_GNU_SOURCE -pthread |
28 | 28 | ||
29 | # Comment out 'flags-litmus' to disable Litmus support | 29 | # Comment out 'flags-litmus' to disable Litmus support |
@@ -81,7 +81,7 @@ AR := ${CROSS_COMPILE}${AR} | |||
81 | # Targets | 81 | # Targets |
82 | 82 | ||
83 | all = lib ${tools} | 83 | all = lib ${tools} |
84 | tools = cvtest basictest datapassingtest sockstreamtest pingpong depthtest pgmrt | 84 | tools = cvtest ringtest basictest datapassingtest sockstreamtest pingpong depthtest pgmrt |
85 | 85 | ||
86 | .PHONY: all lib clean dump-config TAGS tags cscope help | 86 | .PHONY: all lib clean dump-config TAGS tags cscope help |
87 | 87 | ||
@@ -130,6 +130,9 @@ vpath %.cpp tools | |||
130 | obj-cvtest = cvtest.o | 130 | obj-cvtest = cvtest.o |
131 | lib-cvtest = -lpthread -lm -lrt -lboost_graph -lboost_filesystem -lboost_system ${liblitmus-flags} | 131 | lib-cvtest = -lpthread -lm -lrt -lboost_graph -lboost_filesystem -lboost_system ${liblitmus-flags} |
132 | 132 | ||
133 | obj-ringtest = ringtest.o | ||
134 | lib-ringtest = -lpthread -lm -lrt -lboost_graph -lboost_filesystem -lboost_system ${liblitmus-flags} | ||
135 | |||
133 | obj-basictest = basictest.o | 136 | obj-basictest = basictest.o |
134 | lib-basictest = -lpthread -lm -lrt -lboost_graph -lboost_filesystem -lboost_system ${liblitmus-flags} | 137 | lib-basictest = -lpthread -lm -lrt -lboost_graph -lboost_filesystem -lboost_system ${liblitmus-flags} |
135 | 138 | ||
@@ -146,7 +149,7 @@ obj-pingpong = pingpong.o | |||
146 | lib-pingpong = -lpthread -lm -lrt -lboost_graph -lboost_system -lboost_thread ${liblitmus-flags} | 149 | lib-pingpong = -lpthread -lm -lrt -lboost_graph -lboost_system -lboost_thread ${liblitmus-flags} |
147 | 150 | ||
148 | obj-depthtest = depthtest.o | 151 | obj-depthtest = depthtest.o |
149 | lib-depthtest = -lm -lrt -lboost_graph -lboost_filesystem -lboost_system ${liblitmus-flags} | 152 | lib-depthtest = -lpthread -lm -lrt -lboost_graph -lboost_filesystem -lboost_system ${liblitmus-flags} |
150 | 153 | ||
151 | # ############################################################################## | 154 | # ############################################################################## |
152 | # Build everything that depends on liblitmus. | 155 | # Build everything that depends on liblitmus. |
diff --git a/include/pgm.h b/include/pgm.h index efd85f8..d570faf 100644 --- a/include/pgm.h +++ b/include/pgm.h | |||
@@ -24,7 +24,7 @@ | |||
24 | #define __PGM_EDGE_CV 0x00000001 | 24 | #define __PGM_EDGE_CV 0x00000001 |
25 | #define __PGM_EDGE_FIFO 0x00000002 | 25 | #define __PGM_EDGE_FIFO 0x00000002 |
26 | #define __PGM_EDGE_MQ 0x00000004 | 26 | #define __PGM_EDGE_MQ 0x00000004 |
27 | #define __PGM_EDGE_CV_BUF 0x00000008 /* reserved */ | 27 | #define __PGM_EDGE_RING 0x00000008 |
28 | #define __PGM_EDGE_SOCK_STREAM 0x00000010 | 28 | #define __PGM_EDGE_SOCK_STREAM 0x00000010 |
29 | 29 | ||
30 | typedef enum | 30 | typedef enum |
@@ -34,6 +34,9 @@ typedef enum | |||
34 | 34 | ||
35 | /* condition variable-based IPC */ | 35 | /* condition variable-based IPC */ |
36 | pgm_cv_edge = (__PGM_EDGE_CV | __PGM_SIGNALED), | 36 | pgm_cv_edge = (__PGM_EDGE_CV | __PGM_SIGNALED), |
37 | /* Very low overhead IPC for passing POD data between nodes. | ||
38 | Currently limited to nodes within the same process. */ | ||
39 | pgm_ring_edge = (__PGM_EDGE_RING | __PGM_SIGNALED | __PGM_DATA_PASSING), | ||
37 | /* named FIFO IPC */ | 40 | /* named FIFO IPC */ |
38 | pgm_fast_fifo_edge = (__PGM_EDGE_FIFO | __PGM_SIGNALED | __PGM_DATA_PASSING), | 41 | pgm_fast_fifo_edge = (__PGM_EDGE_FIFO | __PGM_SIGNALED | __PGM_DATA_PASSING), |
39 | /* POSIX message queue IPC */ | 42 | /* POSIX message queue IPC */ |
@@ -90,6 +93,13 @@ typedef struct pgm_edge_attr | |||
90 | /* edge-type specific params */ | 93 | /* edge-type specific params */ |
91 | union | 94 | union |
92 | { | 95 | { |
96 | struct /* Ring buffer params */ | ||
97 | { | ||
98 | /* nr_produce/nr_consume interpreted as size of members */ | ||
99 | |||
100 | /* Number of elements in ring buffer */ | ||
101 | size_t nmemb; | ||
102 | }; | ||
93 | struct /* POSIX message queue params */ | 103 | struct /* POSIX message queue params */ |
94 | { | 104 | { |
95 | /* Values > 10 may require root privilege. */ | 105 | /* Values > 10 may require root privilege. */ |
diff --git a/include/ring.h b/include/ring.h new file mode 100644 index 0000000..2800bb2 --- /dev/null +++ b/include/ring.h | |||
@@ -0,0 +1,321 @@ | |||
1 | // Copyright (c) 2014, Glenn Elliott | ||
2 | // All rights reserved. | ||
3 | |||
4 | #pragma once | ||
5 | |||
6 | #include <stdlib.h> | ||
7 | #include <string.h> | ||
8 | |||
9 | #ifndef __cplusplus | ||
10 | |||
11 | #define nleading_unset_bits(x) \ | ||
12 | __builtin_choose_expr((sizeof(x) == 8), \ | ||
13 | __builtin_clzl(x), \ | ||
14 | __builtin_clz(x)) | ||
15 | |||
16 | #else | ||
17 | |||
18 | /* | ||
19 | g++ doesn't have __builtin_choose_expr(). | ||
20 | Switch between types using meta programming... | ||
21 | */ | ||
22 | namespace __ring | ||
23 | { | ||
24 | template<bool c, typename T, typename F> | ||
25 | struct conditional { typedef T type; }; | ||
26 | template<typename T, typename F> | ||
27 | struct conditional<false, T, F> { typedef F type; }; | ||
28 | |||
29 | struct clz32 | ||
30 | { | ||
31 | template <class T> | ||
32 | static inline int clz(T x) | ||
33 | { | ||
34 | return __builtin_clz(x); | ||
35 | } | ||
36 | }; | ||
37 | |||
38 | struct clz64 | ||
39 | { | ||
40 | template <class T> | ||
41 | static inline int clz(T x) | ||
42 | { | ||
43 | return __builtin_clzl(x); | ||
44 | } | ||
45 | }; | ||
46 | } | ||
47 | #define nleading_unset_bits(x) \ | ||
48 | __ring::conditional<(sizeof(x) == 8), __ring::clz64, __ring::clz32>::type::clz(x) | ||
49 | |||
50 | #endif | ||
51 | |||
52 | typedef enum | ||
53 | { | ||
54 | SLOT_FREE = 0, /* must be zero */ | ||
55 | SLOT_READY | ||
56 | } slot_state_t; | ||
57 | |||
58 | struct ring | ||
59 | { | ||
60 | size_t nmemb; | ||
61 | size_t memb_sz; | ||
62 | volatile size_t nfree; | ||
63 | |||
64 | size_t widx; | ||
65 | size_t ridx; | ||
66 | |||
67 | char* slots; | ||
68 | char* buf; | ||
69 | |||
70 | char user_managed_buffers; | ||
71 | }; | ||
72 | |||
73 | /* | ||
74 | Initialize a ring buffer struct. | ||
75 | [in] r: Pointer to ring buffer instance. | ||
76 | [in] min_count: Minimum number of elements in ring buffer. | ||
77 | (value is rounded UP to nearest power of two) | ||
78 | [in] size: size of ring buffer element. | ||
79 | |||
80 | Return: 0 on success. -1 on error. | ||
81 | */ | ||
82 | static int init_ring(struct ring* r, size_t min_count, size_t size) | ||
83 | { | ||
84 | size_t count; | ||
85 | |||
86 | if (!r || min_count == 0 || size == 0) | ||
87 | return -1; | ||
88 | |||
89 | /* round min_count up to the nearest power of two */ | ||
90 | count = (min_count > 2) ? | ||
91 | (~((~(size_t)0)>>1)) >> (nleading_unset_bits(min_count - 1) - 1) : | ||
92 | min_count; | ||
93 | |||
94 | /* overflow! too big! */ | ||
95 | if (count == 0) | ||
96 | return -1; | ||
97 | |||
98 | r->nmemb = count; | ||
99 | r->memb_sz = size; | ||
100 | r->nfree = count; | ||
101 | r->widx = 0; | ||
102 | r->ridx = 0; | ||
103 | /* calloc() initializes slots to SLOT_FREE */ | ||
104 | r->slots = (char*)calloc(count, sizeof(char)); | ||
105 | r->buf = (char*)malloc(count*size); | ||
106 | |||
107 | r->user_managed_buffers = 0; | ||
108 | |||
109 | return 0; | ||
110 | } | ||
111 | |||
112 | /* | ||
113 | Varient to allow user to specify their own (possibly static) buffers. | ||
114 | Assumptions: | ||
115 | 1) slot_buf and data_buf have been allocated and are of sufficient size. | ||
116 | 2) 'count' is a power of two. | ||
117 | */ | ||
118 | static void __init_ring(struct ring* r, size_t count, size_t size, | ||
119 | char* slot_buf, void* data_buf) | ||
120 | { | ||
121 | r->nmemb = count; | ||
122 | r->memb_sz = size; | ||
123 | r->nfree = count; | ||
124 | r->widx = 0; | ||
125 | r->ridx = 0; | ||
126 | r->slots = slot_buf; | ||
127 | r->buf = (char*)data_buf; | ||
128 | |||
129 | /* init slots to SLOT_FREE */ | ||
130 | memset(r->slots, 0, count); | ||
131 | |||
132 | r->user_managed_buffers = 1; | ||
133 | } | ||
134 | |||
135 | /* | ||
136 | Free ring buffer resources. | ||
137 | */ | ||
138 | static void free_ring(struct ring* r) | ||
139 | { | ||
140 | if (!r) | ||
141 | return; | ||
142 | if (r->user_managed_buffers) | ||
143 | return; | ||
144 | |||
145 | free(r->slots); | ||
146 | free(r->buf); | ||
147 | } | ||
148 | |||
149 | static inline int is_ring_empty(struct ring* r) | ||
150 | { | ||
151 | return (r->nfree == r->nmemb); | ||
152 | } | ||
153 | |||
154 | static inline int is_ring_full(struct ring* r) | ||
155 | { | ||
156 | return (r->nfree == 0); | ||
157 | } | ||
158 | |||
159 | #if 0 | ||
160 | /* multi-writer varients for adding elements to ring buf */ | ||
161 | static inline void* __begin_write_ring(struct ring* r) | ||
162 | { | ||
163 | ssize_t nfree; | ||
164 | size_t idx; | ||
165 | void* dst; | ||
166 | |||
167 | if(*((volatile ssize_t*)(&r->nfree)) <= 0) | ||
168 | return NULL; | ||
169 | |||
170 | nfree = __sync_fetch_and_sub(&r->nfree, 1); | ||
171 | if (nfree <= 0) | ||
172 | { | ||
173 | __sync_fetch_and_add(&r->nfree, 1); | ||
174 | return NULL; | ||
175 | } | ||
176 | |||
177 | /* roll over idx */ | ||
178 | idx = __sync_fetch_and_add(&r->widx, 1) % r->nmemb; | ||
179 | dst = r->buf + idx*r->memb_sz; | ||
180 | |||
181 | return (void*)dst; | ||
182 | } | ||
183 | |||
184 | static inline void __end_ring_write(struct ring* r, void* addr) | ||
185 | { | ||
186 | size_t idx = ((char*)addr - r->buf) / r->memb_sz; | ||
187 | r->slots[idx] = SLOT_READY; | ||
188 | __sync_synchronize(); | ||
189 | } | ||
190 | #endif | ||
191 | |||
192 | |||
193 | /* single-writer varients */ | ||
194 | |||
195 | static inline void* __begin_write_ring(struct ring* r) | ||
196 | { | ||
197 | size_t idx; | ||
198 | void* dst; | ||
199 | |||
200 | if(r->nfree <= 0) | ||
201 | return NULL; | ||
202 | |||
203 | /* roll over idx */ | ||
204 | idx = r->widx++ % r->nmemb; | ||
205 | dst = r->buf + idx*r->memb_sz; | ||
206 | |||
207 | return (void*)dst; | ||
208 | } | ||
209 | |||
210 | static inline void __end_ring_write(struct ring* r, void* addr) | ||
211 | { | ||
212 | size_t idx = ((char*)addr - r->buf) / r->memb_sz; | ||
213 | r->slots[idx] = SLOT_READY; | ||
214 | __sync_fetch_and_sub(&r->nfree, 1); /* memory barrier */ | ||
215 | } | ||
216 | |||
217 | |||
218 | /* single-reader varients */ | ||
219 | |||
220 | static inline void* __begin_read_ring(struct ring* r) | ||
221 | { | ||
222 | size_t idx; | ||
223 | |||
224 | if (r->nfree == r->nmemb) | ||
225 | return NULL; | ||
226 | |||
227 | idx = r->ridx % r->nmemb; | ||
228 | if (*(volatile char*)&(r->slots[idx]) == SLOT_READY) | ||
229 | return r->buf + idx * r->memb_sz; | ||
230 | else | ||
231 | return NULL; | ||
232 | } | ||
233 | |||
234 | static inline void __end_read_ring(struct ring* r, void* addr) | ||
235 | { | ||
236 | size_t idx = ((char*)addr - r->buf) / r->memb_sz; | ||
237 | r->slots[idx] = SLOT_FREE; | ||
238 | r->ridx++; | ||
239 | __sync_fetch_and_add(&r->nfree, 1); /* memory barrier */ | ||
240 | } | ||
241 | |||
242 | |||
243 | #ifdef NDEBUG | ||
244 | #define check_size(r, ptr) \ | ||
245 | do { assert(sizeof(*ptr) == r->memb_sz); } while(0) | ||
246 | #define check_size_vec(r, sz) \ | ||
247 | do { assert(sz == r->memb_sz); } while(0) | ||
248 | #else | ||
249 | #define check_size(r, size) | ||
250 | #define check_size_vec(r, sz) | ||
251 | #endif | ||
252 | |||
253 | /* | ||
254 | The following macros are used for enqueuing and dequeuing data from | ||
255 | the provided ring buffer | ||
256 | |||
257 | BEWARE!!!! Reads and writes are done by simple assignment (by the '=' | ||
258 | operator) for the sake of performance (vs using memcpy). Types are | ||
259 | deduced from the second parameter to each macro. You must be consistent | ||
260 | with your types. For example, mixing int32_t with int64_t will result | ||
261 | in corrupted ring buffers. | ||
262 | |||
263 | ----> Be very careful when passing literals!!! <---- | ||
264 | |||
265 | TODO: Make C++-template versions for safer typing. | ||
266 | */ | ||
267 | |||
268 | /* | ||
269 | Macro for enqueuing an element to the ring buffer. | ||
270 | [in] r: Pointer to struct ring | ||
271 | [in] src: Value to write (not pointer to value!) | ||
272 | */ | ||
273 | #define write_ring(r, src) \ | ||
274 | do{ \ | ||
275 | struct ring* __r = (r); \ | ||
276 | typeof((src))* __dst; \ | ||
277 | check_size(__r, __dst); \ | ||
278 | do { __dst = (typeof(__dst)) __begin_write_ring(__r); } \ | ||
279 | while (__dst == NULL); \ | ||
280 | *__dst = (src); \ | ||
281 | __end_ring_write(__r, __dst); \ | ||
282 | }while(0) | ||
283 | |||
284 | /* | ||
285 | Macro for dequeuing an element from the ring buffer. | ||
286 | [in] r: Pointer to struct ring | ||
287 | [in/out] dst_ptr: Pointer to where read element is to be written. | ||
288 | */ | ||
289 | #define read_ring(r, dst_ptr) \ | ||
290 | do{ \ | ||
291 | struct ring* __r = (r); \ | ||
292 | typeof((dst_ptr)) __src; \ | ||
293 | check_size(__r, __src); \ | ||
294 | do { __src = (typeof(__src)) __begin_read_ring(__r); } \ | ||
295 | while (__src == NULL); \ | ||
296 | *(dst_ptr) = *__src; \ | ||
297 | __end_read_ring(__r, __src); \ | ||
298 | }while(0) | ||
299 | |||
300 | |||
301 | /* Write/Read routines for plain vector/array types. These use memcpy. */ | ||
302 | |||
303 | #define write_vec_ring(r, src_vec, sz) \ | ||
304 | do{ \ | ||
305 | struct ring* __r = (r); \ | ||
306 | void* __dst; \ | ||
307 | check_size_vec(__r, sz); \ | ||
308 | do { __dst = __begin_write_ring(__r); } while (__dst == NULL); \ | ||
309 | memcpy(__dst, src_vec, sz); \ | ||
310 | __end_ring_write(__r, __dst); \ | ||
311 | }while(0) | ||
312 | |||
313 | #define read_vec_ring(r, dst_vec, sz) \ | ||
314 | do{ \ | ||
315 | struct ring* __r = (r); \ | ||
316 | void* __src; \ | ||
317 | check_size_vec(__r, sz); \ | ||
318 | do { __src = __begin_read_ring(__r); } while (__src == NULL); \ | ||
319 | memcpy(dst_vec, __src, sz); \ | ||
320 | __end_read_ring(__r, __src); \ | ||
321 | }while(0) | ||
diff --git a/src/pgm.cpp b/src/pgm.cpp index c841fbc..8057c36 100644 --- a/src/pgm.cpp +++ b/src/pgm.cpp | |||
@@ -37,6 +37,8 @@ typedef ticketlock_t pgm_lock_t; | |||
37 | typedef cv_t pgm_cv_t; | 37 | typedef cv_t pgm_cv_t; |
38 | #endif | 38 | #endif |
39 | 39 | ||
40 | #include "ring.h" | ||
41 | |||
40 | using namespace std; | 42 | using namespace std; |
41 | using namespace boost; | 43 | using namespace boost; |
42 | using namespace boost::interprocess; | 44 | using namespace boost::interprocess; |
@@ -141,21 +143,34 @@ struct pgm_edge | |||
141 | 143 | ||
142 | // the remaining fields are used by data-passing edges | 144 | // the remaining fields are used by data-passing edges |
143 | 145 | ||
144 | // fd_out and fd_in may be the same | 146 | union |
145 | // if different ends of FIFOs are | 147 | { |
146 | // opened by different processes. | 148 | // fields for standard data-passing IPCs |
147 | int fd_out; | 149 | struct |
148 | int fd_in; | 150 | { |
151 | // fd_out and fd_in may be the same | ||
152 | // if different ends of FIFOs are | ||
153 | // opened by different processes. | ||
154 | int fd_out; | ||
155 | int fd_in; | ||
156 | |||
157 | // counter for determining location of the message | ||
158 | // header contained within received data. | ||
159 | size_t next_tag; | ||
160 | }; | ||
161 | // fields for ring buffer IPC | ||
162 | struct | ||
163 | { | ||
164 | struct ring ringbuf; | ||
165 | volatile pgm_command_t ring_cmd; | ||
166 | }; | ||
167 | }; | ||
149 | 168 | ||
150 | // buffer for sending data | 169 | // buffer for sending data |
151 | struct pgm_memory_hdr* buf_out; | 170 | struct pgm_memory_hdr* buf_out; |
152 | 171 | ||
153 | // buffer for receiving data | 172 | // buffer for receiving data |
154 | struct pgm_memory_hdr* buf_in; | 173 | struct pgm_memory_hdr* buf_in; |
155 | |||
156 | // counter for determining location of the message | ||
157 | // header contained within received data. | ||
158 | size_t next_tag; | ||
159 | }; | 174 | }; |
160 | 175 | ||
161 | static inline bool is_signal_driven(const struct pgm_edge_attr* attr) | 176 | static inline bool is_signal_driven(const struct pgm_edge_attr* attr) |
@@ -267,6 +282,126 @@ static ssize_t dummy_edge_write(struct pgm_edge* e, const void* buf, size_t nbyt | |||
267 | return 0; | 282 | return 0; |
268 | } | 283 | } |
269 | 284 | ||
285 | /************* RING IPC ROUTINES *****************/ | ||
286 | static int ring_init(pgm_graph* g, | ||
287 | pgm_node* producer, pgm_node* consumer, | ||
288 | pgm_edge* edge) | ||
289 | { | ||
290 | int ret = -1; | ||
291 | |||
292 | /* nr_produce/nr_consume interpreted as bytes passed between | ||
293 | producer and consumer, per invocation. Impl. requires that | ||
294 | nr_produce == nr_consume. */ | ||
295 | if (edge->attr.nr_produce != edge->attr.nr_consume) | ||
296 | goto out; | ||
297 | |||
298 | ret = init_ring(&edge->ringbuf, edge->attr.nmemb, edge->attr.nr_produce); | ||
299 | |||
300 | out: | ||
301 | return ret; | ||
302 | } | ||
303 | |||
304 | static int ring_open_consumer(pgm_graph* g, | ||
305 | pgm_node* producer, pgm_node* consumer, | ||
306 | pgm_edge* edge) | ||
307 | { | ||
308 | edge->buf_in = __pgm_malloc_edge_buf(g, edge, false); | ||
309 | return 0; | ||
310 | } | ||
311 | |||
312 | static int ring_open_producer(pgm_graph* g, | ||
313 | pgm_node* producer, pgm_node* consumer, | ||
314 | pgm_edge* edge) | ||
315 | { | ||
316 | edge->buf_out = __pgm_malloc_edge_buf(g, edge, true); | ||
317 | return 0; | ||
318 | } | ||
319 | |||
320 | static int ring_close_consumer(pgm_edge* edge) | ||
321 | { | ||
322 | free(edge->buf_in); | ||
323 | edge->buf_in = 0; | ||
324 | return 0; | ||
325 | } | ||
326 | |||
327 | static int ring_close_producer(pgm_edge* edge) | ||
328 | { | ||
329 | free(edge->buf_out); | ||
330 | edge->buf_out = 0; | ||
331 | return 0; | ||
332 | } | ||
333 | |||
334 | static int ring_destroy(pgm_graph* g, | ||
335 | pgm_node* producer, pgm_node* consumer, | ||
336 | pgm_edge* edge) | ||
337 | { | ||
338 | free_ring(&edge->ringbuf); | ||
339 | return 0; | ||
340 | } | ||
341 | |||
342 | static ssize_t ring_read(struct pgm_edge* e, void* buf, size_t nbytes) | ||
343 | { | ||
344 | assert(nbytes == e->attr.nr_produce); | ||
345 | assert(nbytes == e->attr.nr_consume); | ||
346 | |||
347 | switch(nbytes) | ||
348 | { | ||
349 | case 8: | ||
350 | read_ring(&e->ringbuf, (uint64_t*)buf); | ||
351 | break; | ||
352 | case 4: | ||
353 | read_ring(&e->ringbuf, (uint32_t*)buf); | ||
354 | break; | ||
355 | case 2: | ||
356 | read_ring(&e->ringbuf, (uint16_t*)buf); | ||
357 | break; | ||
358 | case 1: | ||
359 | read_ring(&e->ringbuf, (uint8_t*)buf); | ||
360 | break; | ||
361 | default: | ||
362 | read_vec_ring(&e->ringbuf, buf, nbytes); | ||
363 | } | ||
364 | return nbytes; /* assume always successful */ | ||
365 | } | ||
366 | |||
367 | static ssize_t ring_write(struct pgm_edge* e, const void* buf, size_t nbytes) | ||
368 | { | ||
369 | assert(nbytes == e->attr.nr_produce); | ||
370 | assert(nbytes == e->attr.nr_consume); | ||
371 | |||
372 | switch(nbytes) | ||
373 | { | ||
374 | case 8: | ||
375 | write_ring(&e->ringbuf, *(uint64_t*)buf); | ||
376 | break; | ||
377 | case 4: | ||
378 | write_ring(&e->ringbuf, *(uint32_t*)buf); | ||
379 | break; | ||
380 | case 2: | ||
381 | write_ring(&e->ringbuf, *(uint16_t*)buf); | ||
382 | break; | ||
383 | case 1: | ||
384 | write_ring(&e->ringbuf, *(uint8_t*)buf); | ||
385 | break; | ||
386 | default: | ||
387 | write_vec_ring(&e->ringbuf, buf, nbytes); | ||
388 | } | ||
389 | return nbytes; /* assume always successful */ | ||
390 | } | ||
391 | |||
392 | static const struct pgm_edge_ops pgm_ring_edge_ops = | ||
393 | { | ||
394 | .init = ring_init, | ||
395 | .open_consumer = ring_open_consumer, | ||
396 | .open_producer = ring_open_producer, | ||
397 | .close_consumer = ring_close_consumer, | ||
398 | .close_producer = ring_close_producer, | ||
399 | .destroy = ring_destroy, | ||
400 | .read = ring_read, | ||
401 | .write = ring_write, | ||
402 | }; | ||
403 | |||
404 | |||
270 | /************* FIFO IPC ROUTINES *****************/ | 405 | /************* FIFO IPC ROUTINES *****************/ |
271 | static std::string fifo_name(pgm_graph* g, | 406 | static std::string fifo_name(pgm_graph* g, |
272 | pgm_node* producer, pgm_node* consumer, | 407 | pgm_node* producer, pgm_node* consumer, |
@@ -858,7 +993,7 @@ static inline int is_producer_buf(void* userptr) | |||
858 | if(!ptr) | 993 | if(!ptr) |
859 | return 0; | 994 | return 0; |
860 | if(ptr->assigned_edge.graph == BAD_EDGE.graph && | 995 | if(ptr->assigned_edge.graph == BAD_EDGE.graph && |
861 | ptr->assigned_edge.edge == BAD_EDGE.edge) | 996 | ptr->assigned_edge.edge == BAD_EDGE.edge) |
862 | return 0; | 997 | return 0; |
863 | return (ptr->producer_flag == 1); | 998 | return (ptr->producer_flag == 1); |
864 | } | 999 | } |
@@ -869,7 +1004,7 @@ static inline int is_consumer_buf(void* userptr) | |||
869 | if(!ptr) | 1004 | if(!ptr) |
870 | return 0; | 1005 | return 0; |
871 | if(ptr->assigned_edge.graph == BAD_EDGE.graph && | 1006 | if(ptr->assigned_edge.graph == BAD_EDGE.graph && |
872 | ptr->assigned_edge.edge == BAD_EDGE.edge) | 1007 | ptr->assigned_edge.edge == BAD_EDGE.edge) |
873 | return 0; | 1008 | return 0; |
874 | return (ptr->producer_flag == 0); | 1009 | return (ptr->producer_flag == 0); |
875 | } | 1010 | } |
@@ -880,7 +1015,7 @@ static inline int is_buf_assigned(void* userptr, edge_t* e = NULL) | |||
880 | if(!ptr) | 1015 | if(!ptr) |
881 | return 0; | 1016 | return 0; |
882 | if(ptr->assigned_edge.graph == BAD_EDGE.graph && | 1017 | if(ptr->assigned_edge.graph == BAD_EDGE.graph && |
883 | ptr->assigned_edge.edge == BAD_EDGE.edge) | 1018 | ptr->assigned_edge.edge == BAD_EDGE.edge) |
884 | return 0; | 1019 | return 0; |
885 | if(e) | 1020 | if(e) |
886 | *e = ptr->assigned_edge; | 1021 | *e = ptr->assigned_edge; |
@@ -1081,7 +1216,7 @@ void* __pgm_swap_edge_buf(edge_t edge, void* new_uptr, bool swap_producer) | |||
1081 | goto out; | 1216 | goto out; |
1082 | } | 1217 | } |
1083 | if(hdr->assigned_edge.graph != BAD_EDGE.graph && | 1218 | if(hdr->assigned_edge.graph != BAD_EDGE.graph && |
1084 | hdr->assigned_edge.edge != BAD_EDGE.edge) | 1219 | hdr->assigned_edge.edge != BAD_EDGE.edge) |
1085 | { | 1220 | { |
1086 | E("%p is already in use by an edge.\n", new_uptr); | 1221 | E("%p is already in use by an edge.\n", new_uptr); |
1087 | goto out; | 1222 | goto out; |
@@ -1093,7 +1228,7 @@ void* __pgm_swap_edge_buf(edge_t edge, void* new_uptr, bool swap_producer) | |||
1093 | if(hdr->usersize != old_hdr->usersize) | 1228 | if(hdr->usersize != old_hdr->usersize) |
1094 | { | 1229 | { |
1095 | E("Buffer %p is the wrong size. (is %lu, expected %lu)\n", | 1230 | E("Buffer %p is the wrong size. (is %lu, expected %lu)\n", |
1096 | new_uptr, hdr->usersize, old_hdr->usersize); | 1231 | new_uptr, hdr->usersize, old_hdr->usersize); |
1097 | goto out; | 1232 | goto out; |
1098 | } | 1233 | } |
1099 | 1234 | ||
@@ -1151,7 +1286,7 @@ int pgm_swap_edge_bufs(void* a, void* b) | |||
1151 | if(hdra->usersize != hdrb->usersize) | 1286 | if(hdra->usersize != hdrb->usersize) |
1152 | { | 1287 | { |
1153 | E("Buffers are not the same size: %p:%lu vs %p:%lu\n", | 1288 | E("Buffers are not the same size: %p:%lu vs %p:%lu\n", |
1154 | a, hdra->usersize, b, hdrb->usersize); | 1289 | a, hdra->usersize, b, hdrb->usersize); |
1155 | goto out; | 1290 | goto out; |
1156 | } | 1291 | } |
1157 | 1292 | ||
@@ -1731,6 +1866,19 @@ int pgm_init_edge(edge_t* edge, | |||
1731 | E("Produce amnt. must equal consume amnt. for POSIX msg queues.\n"); | 1866 | E("Produce amnt. must equal consume amnt. for POSIX msg queues.\n"); |
1732 | goto out; | 1867 | goto out; |
1733 | } | 1868 | } |
1869 | if(attr->type & __PGM_EDGE_RING) | ||
1870 | { | ||
1871 | if(attr->nr_produce != attr->nr_consume) | ||
1872 | { | ||
1873 | E("Produce amnt. must equal consume amnt. for ring buffers.\n"); | ||
1874 | goto out; | ||
1875 | } | ||
1876 | if(attr->nmemb == 0) | ||
1877 | { | ||
1878 | E("nmemb cannot be zero.\n"); | ||
1879 | goto out; | ||
1880 | } | ||
1881 | } | ||
1734 | 1882 | ||
1735 | if(attr->nr_threshold < attr->nr_consume) | 1883 | if(attr->nr_threshold < attr->nr_consume) |
1736 | goto out; | 1884 | goto out; |
@@ -1784,14 +1932,18 @@ int pgm_init_edge(edge_t* edge, | |||
1784 | e->consumer = consumer.node; | 1932 | e->consumer = consumer.node; |
1785 | e->attr = *attr; | 1933 | e->attr = *attr; |
1786 | 1934 | ||
1787 | if(attr->type & __PGM_EDGE_FIFO) | 1935 | if (attr->type & __PGM_EDGE_CV) |
1936 | e->ops = &pgm_cv_edge_ops; | ||
1937 | else if(attr->type & __PGM_EDGE_FIFO) | ||
1788 | e->ops = &pgm_fifo_edge_ops; | 1938 | e->ops = &pgm_fifo_edge_ops; |
1789 | else if(attr->type & __PGM_EDGE_MQ) | 1939 | else if(attr->type & __PGM_EDGE_MQ) |
1790 | e->ops = &pgm_mq_edge_ops; | 1940 | e->ops = &pgm_mq_edge_ops; |
1941 | else if(attr->type & __PGM_EDGE_RING) | ||
1942 | e->ops = &pgm_ring_edge_ops; | ||
1791 | else if(attr->type & __PGM_EDGE_SOCK_STREAM) | 1943 | else if(attr->type & __PGM_EDGE_SOCK_STREAM) |
1792 | e->ops = &pgm_sock_stream_edge_ops; | 1944 | e->ops = &pgm_sock_stream_edge_ops; |
1793 | else | 1945 | else |
1794 | e->ops = &pgm_cv_edge_ops; | 1946 | goto out_unlock; |
1795 | 1947 | ||
1796 | ret = e->ops->init(g, np, nc, e); | 1948 | ret = e->ops->init(g, np, nc, e); |
1797 | 1949 | ||
@@ -2738,7 +2890,7 @@ static eWaitStatus pgm_wait_for_tokens(struct pgm_graph* g, struct pgm_node* n) | |||
2738 | pgm_lock(&n->lock, flags); | 2890 | pgm_lock(&n->lock, flags); |
2739 | do | 2891 | do |
2740 | { | 2892 | { |
2741 | // recheck the condition | 2893 | // recheck the condition |
2742 | nr_ready = pgm_nr_ready_edges(g, n); | 2894 | nr_ready = pgm_nr_ready_edges(g, n); |
2743 | if(nr_ready == n->nr_in_signaled) | 2895 | if(nr_ready == n->nr_in_signaled) |
2744 | break; | 2896 | break; |
@@ -2760,9 +2912,9 @@ static void pgm_consume_tokens(struct pgm_graph* g, struct pgm_node* n) | |||
2760 | { | 2912 | { |
2761 | for(int i = 0; i < n->nr_in; ++i) | 2913 | for(int i = 0; i < n->nr_in; ++i) |
2762 | { | 2914 | { |
2763 | struct pgm_edge* e = &g->edges[n->in[i]]; | 2915 | struct pgm_edge* e = &g->edges[n->in[i]]; |
2764 | if(is_signal_driven(e)) | 2916 | if(is_signal_driven(e)) |
2765 | __sync_fetch_and_sub(&e->nr_pending, e->attr.nr_consume); | 2917 | __sync_fetch_and_sub(&e->nr_pending, e->attr.nr_consume); |
2766 | } | 2918 | } |
2767 | } | 2919 | } |
2768 | 2920 | ||
@@ -2790,7 +2942,7 @@ typedef uint32_t pgm_fd_mask_t; | |||
2790 | 2942 | ||
2791 | static const unsigned char PGM_NORMAL = 0x01; | 2943 | static const unsigned char PGM_NORMAL = 0x01; |
2792 | 2944 | ||
2793 | static int pgm_send_data(struct pgm_edge* e, pgm_command_t tag = PGM_NORMAL) | 2945 | static int pgm_send_std_data(struct pgm_edge* e, pgm_command_t tag) |
2794 | { | 2946 | { |
2795 | // only the tag is sent if this is a terminate message | 2947 | // only the tag is sent if this is a terminate message |
2796 | 2948 | ||
@@ -2834,6 +2986,23 @@ static int pgm_send_data(struct pgm_edge* e, pgm_command_t tag = PGM_NORMAL) | |||
2834 | return ret; | 2986 | return ret; |
2835 | } | 2987 | } |
2836 | 2988 | ||
2989 | static int pgm_send_ring_data(struct pgm_edge* e, pgm_command_t tag) | ||
2990 | { | ||
2991 | if(!(tag & PGM_TERMINATE)) | ||
2992 | e->ops->write(e, pgm_get_user_ptr(e->buf_out), e->attr.nr_produce); | ||
2993 | else | ||
2994 | e->ring_cmd = tag; | ||
2995 | return 0; | ||
2996 | } | ||
2997 | |||
2998 | static int pgm_send_data(struct pgm_edge* e, pgm_command_t tag = PGM_NORMAL) | ||
2999 | { | ||
3000 | if(!(e->attr.type & __PGM_EDGE_RING)) | ||
3001 | return pgm_send_std_data(e, tag); | ||
3002 | else | ||
3003 | return pgm_send_ring_data(e, tag); | ||
3004 | } | ||
3005 | |||
2837 | static eWaitStatus pgm_wait_for_data(pgm_fd_mask_t* to_wait, | 3006 | static eWaitStatus pgm_wait_for_data(pgm_fd_mask_t* to_wait, |
2838 | struct pgm_graph* g, struct pgm_node* n) | 3007 | struct pgm_graph* g, struct pgm_node* n) |
2839 | { | 3008 | { |
@@ -2880,7 +3049,7 @@ static eWaitStatus pgm_wait_for_data(pgm_fd_mask_t* to_wait, | |||
2880 | ++scanned; | 3049 | ++scanned; |
2881 | } | 3050 | } |
2882 | } | 3051 | } |
2883 | ++num_looped; | 3052 | ++num_looped; |
2884 | } | 3053 | } |
2885 | 3054 | ||
2886 | return wait_status; | 3055 | return wait_status; |
@@ -2946,6 +3115,16 @@ wait_for_data: // jump here if we would block on read | |||
2946 | if(!is_data_passing(e)) | 3115 | if(!is_data_passing(e)) |
2947 | continue; | 3116 | continue; |
2948 | 3117 | ||
3118 | if(e->attr.type & __PGM_EDGE_RING) | ||
3119 | { | ||
3120 | /* short-cut for the simple ring buffer IPC */ | ||
3121 | if(!((e->ring_cmd & PGM_TERMINATE) && is_ring_empty(&e->ringbuf))) | ||
3122 | e->ops->read(e, dest_ptrs[i], e->attr.nr_consume); | ||
3123 | else | ||
3124 | n->nr_terminate_msgs++; | ||
3125 | continue; | ||
3126 | } | ||
3127 | |||
2949 | read_more: // jump to here if we need to read more bytes into our buffer | 3128 | read_more: // jump to here if we need to read more bytes into our buffer |
2950 | 3129 | ||
2951 | remaining = e->attr.nr_consume - (dest_ptrs[i] - (char*)pgm_get_user_ptr(e->buf_in)); | 3130 | remaining = e->attr.nr_consume - (dest_ptrs[i] - (char*)pgm_get_user_ptr(e->buf_in)); |
@@ -3094,14 +3273,14 @@ int pgm_wait(node_t node) | |||
3094 | } | 3273 | } |
3095 | 3274 | ||
3096 | if(token_status == WaitExhaustedAndTerminate && | 3275 | if(token_status == WaitExhaustedAndTerminate && |
3097 | data_status == WaitExhaustedAndTerminate) | 3276 | data_status == WaitExhaustedAndTerminate) |
3098 | { | 3277 | { |
3099 | ret = PGM_TERMINATE; | 3278 | ret = PGM_TERMINATE; |
3100 | } | 3279 | } |
3101 | else | 3280 | else |
3102 | { | 3281 | { |
3103 | ret = (token_status == WaitTimeout || token_status == WaitError || | 3282 | ret = (token_status == WaitTimeout || token_status == WaitError || |
3104 | data_status == WaitTimeout || data_status == WaitError) ? | 3283 | data_status == WaitTimeout || data_status == WaitError) ? |
3105 | -1 : 0; | 3284 | -1 : 0; |
3106 | } | 3285 | } |
3107 | 3286 | ||
diff --git a/tools/ringtest.cpp b/tools/ringtest.cpp new file mode 100644 index 0000000..d6850ed --- /dev/null +++ b/tools/ringtest.cpp | |||
@@ -0,0 +1,153 @@ | |||
1 | // Copyright (c) 2014, Glenn Elliott | ||
2 | // All rights reserved. | ||
3 | |||
4 | /* A program for testing the basic ring-based edge. */ | ||
5 | |||
6 | #include <iostream> | ||
7 | #include <unistd.h> | ||
8 | #include <errno.h> | ||
9 | #include <pthread.h> | ||
10 | #include <string.h> | ||
11 | #include <stdlib.h> | ||
12 | |||
13 | #include "pgm.h" | ||
14 | |||
15 | int errors = 0; | ||
16 | pthread_barrier_t init_barrier; | ||
17 | |||
18 | __thread char __errstr[80] = {0}; | ||
19 | |||
20 | #define CheckError(e) \ | ||
21 | do { int __ret = (e); \ | ||
22 | if(__ret < 0) { \ | ||
23 | errors++; \ | ||
24 | char* errstr = strerror_r(errno, __errstr, sizeof(errstr)); \ | ||
25 | fprintf(stderr, "%lu: Error %d (%s (%d)) @ %s:%s:%d\n", \ | ||
26 | pthread_self(), __ret, errstr, errno, __FILE__, __FUNCTION__, __LINE__); \ | ||
27 | }}while(0) | ||
28 | |||
29 | int TOTAL_ITERATIONS = 10000000; | ||
30 | |||
31 | void* thread(void* _graph_t) | ||
32 | { | ||
33 | char tabbuf[] = "\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t"; | ||
34 | int iterations = 0; | ||
35 | int ret = 0; | ||
36 | |||
37 | const graph_t& graph = *((graph_t*)_graph_t); | ||
38 | node_t node; | ||
39 | |||
40 | CheckError(pgm_claim_any_node(graph, &node)); | ||
41 | tabbuf[node.node] = '\0'; | ||
42 | |||
43 | int out_degree = pgm_get_degree_out(node); | ||
44 | int in_degree = pgm_get_degree_in(node); | ||
45 | edge_t* out_edges = (edge_t*)calloc(out_degree, sizeof(edge_t)); | ||
46 | edge_t* in_edges = (edge_t*)calloc(in_degree, sizeof(edge_t)); | ||
47 | |||
48 | bool is_src = (in_degree == 0); | ||
49 | uint32_t* buf; | ||
50 | if(is_src) | ||
51 | buf = (uint32_t*)pgm_get_edge_buf_p(out_edges[0]); | ||
52 | else | ||
53 | buf = (uint32_t*)pgm_get_edge_buf_c(in_edges[0]); | ||
54 | |||
55 | uint64_t sum = 0; | ||
56 | |||
57 | pthread_barrier_wait(&init_barrier); | ||
58 | |||
59 | if(!errors) | ||
60 | { | ||
61 | do { | ||
62 | if(!is_src) | ||
63 | { | ||
64 | ret = pgm_wait(node); | ||
65 | } | ||
66 | else | ||
67 | { | ||
68 | if(iterations > TOTAL_ITERATIONS) | ||
69 | ret = PGM_TERMINATE; | ||
70 | } | ||
71 | |||
72 | if(ret != PGM_TERMINATE) | ||
73 | { | ||
74 | CheckError(ret); | ||
75 | |||
76 | if(is_src) | ||
77 | { | ||
78 | // usleep(500*1000); | ||
79 | // fprintf(stdout, "%s%d fires\n", tabbuf, node.node); | ||
80 | *buf = (uint32_t)iterations; | ||
81 | sum += iterations; | ||
82 | } | ||
83 | else | ||
84 | { | ||
85 | // fprintf(stdout, "%s%d fires. read:%d\n", tabbuf, node.node, *buf); | ||
86 | sum += *buf; | ||
87 | |||
88 | // slow down the consumer a little bit to induce backlog in token buffer | ||
89 | if(rand()%5 == 0) | ||
90 | sched_yield(); | ||
91 | } | ||
92 | |||
93 | CheckError(pgm_complete(node)); | ||
94 | iterations++; | ||
95 | } | ||
96 | else | ||
97 | { | ||
98 | fprintf(stdout, "%s%d terminates: sum: %lu\n", tabbuf, node.node, sum); | ||
99 | |||
100 | if(is_src) | ||
101 | CheckError(pgm_terminate(node)); | ||
102 | } | ||
103 | |||
104 | } while(ret != PGM_TERMINATE); | ||
105 | } | ||
106 | |||
107 | pthread_barrier_wait(&init_barrier); | ||
108 | |||
109 | CheckError(pgm_release_node(node)); | ||
110 | |||
111 | free(out_edges); | ||
112 | free(in_edges); | ||
113 | |||
114 | pthread_exit(0); | ||
115 | } | ||
116 | |||
117 | int main(void) | ||
118 | { | ||
119 | graph_t g; | ||
120 | node_t n0, n1; | ||
121 | edge_t e0_1; | ||
122 | |||
123 | pthread_t t0, t1; | ||
124 | |||
125 | edge_attr_t ring_attr; | ||
126 | memset(&ring_attr, 0, sizeof(ring_attr)); | ||
127 | ring_attr.type = pgm_ring_edge; | ||
128 | ring_attr.nr_produce = sizeof(uint32_t); | ||
129 | ring_attr.nr_consume = sizeof(uint32_t); | ||
130 | ring_attr.nr_threshold = sizeof(uint32_t); | ||
131 | ring_attr.nmemb = 32; | ||
132 | |||
133 | CheckError(pgm_init("/tmp/graphs", 1)); | ||
134 | CheckError(pgm_init_graph(&g, "demo")); | ||
135 | |||
136 | CheckError(pgm_init_node(&n0, g, "n0")); | ||
137 | CheckError(pgm_init_node(&n1, g, "n1")); | ||
138 | |||
139 | CheckError(pgm_init_edge(&e0_1, n0, n1, "e0_1", &ring_attr)); | ||
140 | |||
141 | pthread_barrier_init(&init_barrier, 0, 1); | ||
142 | pthread_create(&t0, 0, thread, &g); | ||
143 | pthread_create(&t1, 0, thread, &g); | ||
144 | |||
145 | pthread_join(t0, 0); | ||
146 | pthread_join(t1, 0); | ||
147 | |||
148 | CheckError(pgm_destroy_graph(g)); | ||
149 | |||
150 | CheckError(pgm_destroy()); | ||
151 | |||
152 | return 0; | ||
153 | } | ||