aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGlenn Elliott <gelliott@cs.unc.edu>2014-05-27 20:22:37 -0400
committerGlenn Elliott <gelliott@cs.unc.edu>2014-05-27 20:29:12 -0400
commit8df9e5e43e5e9b498922e59d897ea7e8c8137396 (patch)
tree4220ad432de5be2998f38cceb4a43cec8755c21c
parentda4ef079b82721a7f56d80a63a5a81f8a0be5b4b (diff)
Add ring buffer edge IPC.
This patch adds a ring buffer IPC to implement very fast data passing between nodes within the same address space (same process). The ring buffer can be used effectively to pass a primative type value (e.g., int or pointer) or struct between producer and consumer. There are restrictions: 1) Producer and consumer must be within the same process. 2) The same amount of data must be produced and consumed each node invocation. That is, a producer cannot write 4 bytes one invocation and then 8 bytes the next. TODO: Support ring buffer IPC on shared memory (allow producer and consumer to be in different processes).
-rw-r--r--Makefile9
-rw-r--r--include/pgm.h12
-rw-r--r--include/ring.h321
-rw-r--r--src/pgm.cpp227
-rw-r--r--tools/ringtest.cpp153
5 files changed, 694 insertions, 28 deletions
diff --git a/Makefile b/Makefile
index 571eedb..8bf0591 100644
--- a/Makefile
+++ b/Makefile
@@ -23,7 +23,7 @@ LIBPGM = .
23# compiler flags 23# compiler flags
24flags-std = -std=gnu++11 24flags-std = -std=gnu++11
25flags-optim = -O2 -march=native 25flags-optim = -O2 -march=native
26flags-debug = -Wall -Werror -Wno-unused-function 26flags-debug = -Wall -Werror -Wno-unused-function -Wno-sign-compare
27flags-api = -D_XOPEN_SOURCE=600 -D_GNU_SOURCE -pthread 27flags-api = -D_XOPEN_SOURCE=600 -D_GNU_SOURCE -pthread
28 28
29# Comment out 'flags-litmus' to disable Litmus support 29# Comment out 'flags-litmus' to disable Litmus support
@@ -81,7 +81,7 @@ AR := ${CROSS_COMPILE}${AR}
81# Targets 81# Targets
82 82
83all = lib ${tools} 83all = lib ${tools}
84tools = cvtest basictest datapassingtest sockstreamtest pingpong depthtest pgmrt 84tools = cvtest ringtest basictest datapassingtest sockstreamtest pingpong depthtest pgmrt
85 85
86.PHONY: all lib clean dump-config TAGS tags cscope help 86.PHONY: all lib clean dump-config TAGS tags cscope help
87 87
@@ -130,6 +130,9 @@ vpath %.cpp tools
130obj-cvtest = cvtest.o 130obj-cvtest = cvtest.o
131lib-cvtest = -lpthread -lm -lrt -lboost_graph -lboost_filesystem -lboost_system ${liblitmus-flags} 131lib-cvtest = -lpthread -lm -lrt -lboost_graph -lboost_filesystem -lboost_system ${liblitmus-flags}
132 132
133obj-ringtest = ringtest.o
134lib-ringtest = -lpthread -lm -lrt -lboost_graph -lboost_filesystem -lboost_system ${liblitmus-flags}
135
133obj-basictest = basictest.o 136obj-basictest = basictest.o
134lib-basictest = -lpthread -lm -lrt -lboost_graph -lboost_filesystem -lboost_system ${liblitmus-flags} 137lib-basictest = -lpthread -lm -lrt -lboost_graph -lboost_filesystem -lboost_system ${liblitmus-flags}
135 138
@@ -146,7 +149,7 @@ obj-pingpong = pingpong.o
146lib-pingpong = -lpthread -lm -lrt -lboost_graph -lboost_system -lboost_thread ${liblitmus-flags} 149lib-pingpong = -lpthread -lm -lrt -lboost_graph -lboost_system -lboost_thread ${liblitmus-flags}
147 150
148obj-depthtest = depthtest.o 151obj-depthtest = depthtest.o
149lib-depthtest = -lm -lrt -lboost_graph -lboost_filesystem -lboost_system ${liblitmus-flags} 152lib-depthtest = -lpthread -lm -lrt -lboost_graph -lboost_filesystem -lboost_system ${liblitmus-flags}
150 153
151# ############################################################################## 154# ##############################################################################
152# Build everything that depends on liblitmus. 155# Build everything that depends on liblitmus.
diff --git a/include/pgm.h b/include/pgm.h
index efd85f8..d570faf 100644
--- a/include/pgm.h
+++ b/include/pgm.h
@@ -24,7 +24,7 @@
24#define __PGM_EDGE_CV 0x00000001 24#define __PGM_EDGE_CV 0x00000001
25#define __PGM_EDGE_FIFO 0x00000002 25#define __PGM_EDGE_FIFO 0x00000002
26#define __PGM_EDGE_MQ 0x00000004 26#define __PGM_EDGE_MQ 0x00000004
27#define __PGM_EDGE_CV_BUF 0x00000008 /* reserved */ 27#define __PGM_EDGE_RING 0x00000008
28#define __PGM_EDGE_SOCK_STREAM 0x00000010 28#define __PGM_EDGE_SOCK_STREAM 0x00000010
29 29
30typedef enum 30typedef enum
@@ -34,6 +34,9 @@ typedef enum
34 34
35 /* condition variable-based IPC */ 35 /* condition variable-based IPC */
36 pgm_cv_edge = (__PGM_EDGE_CV | __PGM_SIGNALED), 36 pgm_cv_edge = (__PGM_EDGE_CV | __PGM_SIGNALED),
37 /* Very low overhead IPC for passing POD data between nodes.
38 Currently limited to nodes within the same process. */
39 pgm_ring_edge = (__PGM_EDGE_RING | __PGM_SIGNALED | __PGM_DATA_PASSING),
37 /* named FIFO IPC */ 40 /* named FIFO IPC */
38 pgm_fast_fifo_edge = (__PGM_EDGE_FIFO | __PGM_SIGNALED | __PGM_DATA_PASSING), 41 pgm_fast_fifo_edge = (__PGM_EDGE_FIFO | __PGM_SIGNALED | __PGM_DATA_PASSING),
39 /* POSIX message queue IPC */ 42 /* POSIX message queue IPC */
@@ -90,6 +93,13 @@ typedef struct pgm_edge_attr
90 /* edge-type specific params */ 93 /* edge-type specific params */
91 union 94 union
92 { 95 {
96 struct /* Ring buffer params */
97 {
98 /* nr_produce/nr_consume interpreted as size of members */
99
100 /* Number of elements in ring buffer */
101 size_t nmemb;
102 };
93 struct /* POSIX message queue params */ 103 struct /* POSIX message queue params */
94 { 104 {
95 /* Values > 10 may require root privilege. */ 105 /* Values > 10 may require root privilege. */
diff --git a/include/ring.h b/include/ring.h
new file mode 100644
index 0000000..2800bb2
--- /dev/null
+++ b/include/ring.h
@@ -0,0 +1,321 @@
1// Copyright (c) 2014, Glenn Elliott
2// All rights reserved.
3
4#pragma once
5
6#include <stdlib.h>
7#include <string.h>
8
9#ifndef __cplusplus
10
11#define nleading_unset_bits(x) \
12__builtin_choose_expr((sizeof(x) == 8), \
13 __builtin_clzl(x), \
14 __builtin_clz(x))
15
16#else
17
18/*
19 g++ doesn't have __builtin_choose_expr().
20 Switch between types using meta programming...
21*/
22namespace __ring
23{
24 template<bool c, typename T, typename F>
25 struct conditional { typedef T type; };
26 template<typename T, typename F>
27 struct conditional<false, T, F> { typedef F type; };
28
29 struct clz32
30 {
31 template <class T>
32 static inline int clz(T x)
33 {
34 return __builtin_clz(x);
35 }
36 };
37
38 struct clz64
39 {
40 template <class T>
41 static inline int clz(T x)
42 {
43 return __builtin_clzl(x);
44 }
45 };
46}
47#define nleading_unset_bits(x) \
48 __ring::conditional<(sizeof(x) == 8), __ring::clz64, __ring::clz32>::type::clz(x)
49
50#endif
51
52typedef enum
53{
54 SLOT_FREE = 0, /* must be zero */
55 SLOT_READY
56} slot_state_t;
57
58struct ring
59{
60 size_t nmemb;
61 size_t memb_sz;
62 volatile size_t nfree;
63
64 size_t widx;
65 size_t ridx;
66
67 char* slots;
68 char* buf;
69
70 char user_managed_buffers;
71};
72
73/*
74 Initialize a ring buffer struct.
75 [in] r: Pointer to ring buffer instance.
76 [in] min_count: Minimum number of elements in ring buffer.
77 (value is rounded UP to nearest power of two)
78 [in] size: size of ring buffer element.
79
80 Return: 0 on success. -1 on error.
81 */
82static int init_ring(struct ring* r, size_t min_count, size_t size)
83{
84 size_t count;
85
86 if (!r || min_count == 0 || size == 0)
87 return -1;
88
89 /* round min_count up to the nearest power of two */
90 count = (min_count > 2) ?
91 (~((~(size_t)0)>>1)) >> (nleading_unset_bits(min_count - 1) - 1) :
92 min_count;
93
94 /* overflow! too big! */
95 if (count == 0)
96 return -1;
97
98 r->nmemb = count;
99 r->memb_sz = size;
100 r->nfree = count;
101 r->widx = 0;
102 r->ridx = 0;
103 /* calloc() initializes slots to SLOT_FREE */
104 r->slots = (char*)calloc(count, sizeof(char));
105 r->buf = (char*)malloc(count*size);
106
107 r->user_managed_buffers = 0;
108
109 return 0;
110}
111
112/*
113 Varient to allow user to specify their own (possibly static) buffers.
114 Assumptions:
115 1) slot_buf and data_buf have been allocated and are of sufficient size.
116 2) 'count' is a power of two.
117*/
118static void __init_ring(struct ring* r, size_t count, size_t size,
119 char* slot_buf, void* data_buf)
120{
121 r->nmemb = count;
122 r->memb_sz = size;
123 r->nfree = count;
124 r->widx = 0;
125 r->ridx = 0;
126 r->slots = slot_buf;
127 r->buf = (char*)data_buf;
128
129 /* init slots to SLOT_FREE */
130 memset(r->slots, 0, count);
131
132 r->user_managed_buffers = 1;
133}
134
135/*
136 Free ring buffer resources.
137 */
138static void free_ring(struct ring* r)
139{
140 if (!r)
141 return;
142 if (r->user_managed_buffers)
143 return;
144
145 free(r->slots);
146 free(r->buf);
147}
148
149static inline int is_ring_empty(struct ring* r)
150{
151 return (r->nfree == r->nmemb);
152}
153
154static inline int is_ring_full(struct ring* r)
155{
156 return (r->nfree == 0);
157}
158
159#if 0
160/* multi-writer varients for adding elements to ring buf */
161static inline void* __begin_write_ring(struct ring* r)
162{
163 ssize_t nfree;
164 size_t idx;
165 void* dst;
166
167 if(*((volatile ssize_t*)(&r->nfree)) <= 0)
168 return NULL;
169
170 nfree = __sync_fetch_and_sub(&r->nfree, 1);
171 if (nfree <= 0)
172 {
173 __sync_fetch_and_add(&r->nfree, 1);
174 return NULL;
175 }
176
177 /* roll over idx */
178 idx = __sync_fetch_and_add(&r->widx, 1) % r->nmemb;
179 dst = r->buf + idx*r->memb_sz;
180
181 return (void*)dst;
182}
183
184static inline void __end_ring_write(struct ring* r, void* addr)
185{
186 size_t idx = ((char*)addr - r->buf) / r->memb_sz;
187 r->slots[idx] = SLOT_READY;
188 __sync_synchronize();
189}
190#endif
191
192
193/* single-writer varients */
194
195static inline void* __begin_write_ring(struct ring* r)
196{
197 size_t idx;
198 void* dst;
199
200 if(r->nfree <= 0)
201 return NULL;
202
203 /* roll over idx */
204 idx = r->widx++ % r->nmemb;
205 dst = r->buf + idx*r->memb_sz;
206
207 return (void*)dst;
208}
209
210static inline void __end_ring_write(struct ring* r, void* addr)
211{
212 size_t idx = ((char*)addr - r->buf) / r->memb_sz;
213 r->slots[idx] = SLOT_READY;
214 __sync_fetch_and_sub(&r->nfree, 1); /* memory barrier */
215}
216
217
218/* single-reader varients */
219
220static inline void* __begin_read_ring(struct ring* r)
221{
222 size_t idx;
223
224 if (r->nfree == r->nmemb)
225 return NULL;
226
227 idx = r->ridx % r->nmemb;
228 if (*(volatile char*)&(r->slots[idx]) == SLOT_READY)
229 return r->buf + idx * r->memb_sz;
230 else
231 return NULL;
232}
233
234static inline void __end_read_ring(struct ring* r, void* addr)
235{
236 size_t idx = ((char*)addr - r->buf) / r->memb_sz;
237 r->slots[idx] = SLOT_FREE;
238 r->ridx++;
239 __sync_fetch_and_add(&r->nfree, 1); /* memory barrier */
240}
241
242
243#ifdef NDEBUG
244#define check_size(r, ptr) \
245 do { assert(sizeof(*ptr) == r->memb_sz); } while(0)
246#define check_size_vec(r, sz) \
247 do { assert(sz == r->memb_sz); } while(0)
248#else
249#define check_size(r, size)
250#define check_size_vec(r, sz)
251#endif
252
253/*
254 The following macros are used for enqueuing and dequeuing data from
255 the provided ring buffer
256
257 BEWARE!!!! Reads and writes are done by simple assignment (by the '='
258 operator) for the sake of performance (vs using memcpy). Types are
259 deduced from the second parameter to each macro. You must be consistent
260 with your types. For example, mixing int32_t with int64_t will result
261 in corrupted ring buffers.
262
263 ----> Be very careful when passing literals!!! <----
264
265 TODO: Make C++-template versions for safer typing.
266*/
267
268/*
269 Macro for enqueuing an element to the ring buffer.
270 [in] r: Pointer to struct ring
271 [in] src: Value to write (not pointer to value!)
272 */
273#define write_ring(r, src) \
274do{ \
275 struct ring* __r = (r); \
276 typeof((src))* __dst; \
277 check_size(__r, __dst); \
278 do { __dst = (typeof(__dst)) __begin_write_ring(__r); } \
279 while (__dst == NULL); \
280 *__dst = (src); \
281 __end_ring_write(__r, __dst); \
282}while(0)
283
284/*
285 Macro for dequeuing an element from the ring buffer.
286 [in] r: Pointer to struct ring
287 [in/out] dst_ptr: Pointer to where read element is to be written.
288 */
289#define read_ring(r, dst_ptr) \
290do{ \
291 struct ring* __r = (r); \
292 typeof((dst_ptr)) __src; \
293 check_size(__r, __src); \
294 do { __src = (typeof(__src)) __begin_read_ring(__r); } \
295 while (__src == NULL); \
296 *(dst_ptr) = *__src; \
297 __end_read_ring(__r, __src); \
298}while(0)
299
300
301/* Write/Read routines for plain vector/array types. These use memcpy. */
302
303#define write_vec_ring(r, src_vec, sz) \
304do{ \
305 struct ring* __r = (r); \
306 void* __dst; \
307 check_size_vec(__r, sz); \
308 do { __dst = __begin_write_ring(__r); } while (__dst == NULL); \
309 memcpy(__dst, src_vec, sz); \
310 __end_ring_write(__r, __dst); \
311}while(0)
312
313#define read_vec_ring(r, dst_vec, sz) \
314do{ \
315 struct ring* __r = (r); \
316 void* __src; \
317 check_size_vec(__r, sz); \
318 do { __src = __begin_read_ring(__r); } while (__src == NULL); \
319 memcpy(dst_vec, __src, sz); \
320 __end_read_ring(__r, __src); \
321}while(0)
diff --git a/src/pgm.cpp b/src/pgm.cpp
index c841fbc..8057c36 100644
--- a/src/pgm.cpp
+++ b/src/pgm.cpp
@@ -37,6 +37,8 @@ typedef ticketlock_t pgm_lock_t;
37typedef cv_t pgm_cv_t; 37typedef cv_t pgm_cv_t;
38#endif 38#endif
39 39
40#include "ring.h"
41
40using namespace std; 42using namespace std;
41using namespace boost; 43using namespace boost;
42using namespace boost::interprocess; 44using namespace boost::interprocess;
@@ -141,21 +143,34 @@ struct pgm_edge
141 143
142 // the remaining fields are used by data-passing edges 144 // the remaining fields are used by data-passing edges
143 145
144 // fd_out and fd_in may be the same 146 union
145 // if different ends of FIFOs are 147 {
146 // opened by different processes. 148 // fields for standard data-passing IPCs
147 int fd_out; 149 struct
148 int fd_in; 150 {
151 // fd_out and fd_in may be the same
152 // if different ends of FIFOs are
153 // opened by different processes.
154 int fd_out;
155 int fd_in;
156
157 // counter for determining location of the message
158 // header contained within received data.
159 size_t next_tag;
160 };
161 // fields for ring buffer IPC
162 struct
163 {
164 struct ring ringbuf;
165 volatile pgm_command_t ring_cmd;
166 };
167 };
149 168
150 // buffer for sending data 169 // buffer for sending data
151 struct pgm_memory_hdr* buf_out; 170 struct pgm_memory_hdr* buf_out;
152 171
153 // buffer for receiving data 172 // buffer for receiving data
154 struct pgm_memory_hdr* buf_in; 173 struct pgm_memory_hdr* buf_in;
155
156 // counter for determining location of the message
157 // header contained within received data.
158 size_t next_tag;
159}; 174};
160 175
161static inline bool is_signal_driven(const struct pgm_edge_attr* attr) 176static inline bool is_signal_driven(const struct pgm_edge_attr* attr)
@@ -267,6 +282,126 @@ static ssize_t dummy_edge_write(struct pgm_edge* e, const void* buf, size_t nbyt
267 return 0; 282 return 0;
268} 283}
269 284
285/************* RING IPC ROUTINES *****************/
286static int ring_init(pgm_graph* g,
287 pgm_node* producer, pgm_node* consumer,
288 pgm_edge* edge)
289{
290 int ret = -1;
291
292 /* nr_produce/nr_consume interpreted as bytes passed between
293 producer and consumer, per invocation. Impl. requires that
294 nr_produce == nr_consume. */
295 if (edge->attr.nr_produce != edge->attr.nr_consume)
296 goto out;
297
298 ret = init_ring(&edge->ringbuf, edge->attr.nmemb, edge->attr.nr_produce);
299
300out:
301 return ret;
302}
303
304static int ring_open_consumer(pgm_graph* g,
305 pgm_node* producer, pgm_node* consumer,
306 pgm_edge* edge)
307{
308 edge->buf_in = __pgm_malloc_edge_buf(g, edge, false);
309 return 0;
310}
311
312static int ring_open_producer(pgm_graph* g,
313 pgm_node* producer, pgm_node* consumer,
314 pgm_edge* edge)
315{
316 edge->buf_out = __pgm_malloc_edge_buf(g, edge, true);
317 return 0;
318}
319
320static int ring_close_consumer(pgm_edge* edge)
321{
322 free(edge->buf_in);
323 edge->buf_in = 0;
324 return 0;
325}
326
327static int ring_close_producer(pgm_edge* edge)
328{
329 free(edge->buf_out);
330 edge->buf_out = 0;
331 return 0;
332}
333
334static int ring_destroy(pgm_graph* g,
335 pgm_node* producer, pgm_node* consumer,
336 pgm_edge* edge)
337{
338 free_ring(&edge->ringbuf);
339 return 0;
340}
341
342static ssize_t ring_read(struct pgm_edge* e, void* buf, size_t nbytes)
343{
344 assert(nbytes == e->attr.nr_produce);
345 assert(nbytes == e->attr.nr_consume);
346
347 switch(nbytes)
348 {
349 case 8:
350 read_ring(&e->ringbuf, (uint64_t*)buf);
351 break;
352 case 4:
353 read_ring(&e->ringbuf, (uint32_t*)buf);
354 break;
355 case 2:
356 read_ring(&e->ringbuf, (uint16_t*)buf);
357 break;
358 case 1:
359 read_ring(&e->ringbuf, (uint8_t*)buf);
360 break;
361 default:
362 read_vec_ring(&e->ringbuf, buf, nbytes);
363 }
364 return nbytes; /* assume always successful */
365}
366
367static ssize_t ring_write(struct pgm_edge* e, const void* buf, size_t nbytes)
368{
369 assert(nbytes == e->attr.nr_produce);
370 assert(nbytes == e->attr.nr_consume);
371
372 switch(nbytes)
373 {
374 case 8:
375 write_ring(&e->ringbuf, *(uint64_t*)buf);
376 break;
377 case 4:
378 write_ring(&e->ringbuf, *(uint32_t*)buf);
379 break;
380 case 2:
381 write_ring(&e->ringbuf, *(uint16_t*)buf);
382 break;
383 case 1:
384 write_ring(&e->ringbuf, *(uint8_t*)buf);
385 break;
386 default:
387 write_vec_ring(&e->ringbuf, buf, nbytes);
388 }
389 return nbytes; /* assume always successful */
390}
391
392static const struct pgm_edge_ops pgm_ring_edge_ops =
393{
394 .init = ring_init,
395 .open_consumer = ring_open_consumer,
396 .open_producer = ring_open_producer,
397 .close_consumer = ring_close_consumer,
398 .close_producer = ring_close_producer,
399 .destroy = ring_destroy,
400 .read = ring_read,
401 .write = ring_write,
402};
403
404
270/************* FIFO IPC ROUTINES *****************/ 405/************* FIFO IPC ROUTINES *****************/
271static std::string fifo_name(pgm_graph* g, 406static std::string fifo_name(pgm_graph* g,
272 pgm_node* producer, pgm_node* consumer, 407 pgm_node* producer, pgm_node* consumer,
@@ -858,7 +993,7 @@ static inline int is_producer_buf(void* userptr)
858 if(!ptr) 993 if(!ptr)
859 return 0; 994 return 0;
860 if(ptr->assigned_edge.graph == BAD_EDGE.graph && 995 if(ptr->assigned_edge.graph == BAD_EDGE.graph &&
861 ptr->assigned_edge.edge == BAD_EDGE.edge) 996 ptr->assigned_edge.edge == BAD_EDGE.edge)
862 return 0; 997 return 0;
863 return (ptr->producer_flag == 1); 998 return (ptr->producer_flag == 1);
864} 999}
@@ -869,7 +1004,7 @@ static inline int is_consumer_buf(void* userptr)
869 if(!ptr) 1004 if(!ptr)
870 return 0; 1005 return 0;
871 if(ptr->assigned_edge.graph == BAD_EDGE.graph && 1006 if(ptr->assigned_edge.graph == BAD_EDGE.graph &&
872 ptr->assigned_edge.edge == BAD_EDGE.edge) 1007 ptr->assigned_edge.edge == BAD_EDGE.edge)
873 return 0; 1008 return 0;
874 return (ptr->producer_flag == 0); 1009 return (ptr->producer_flag == 0);
875} 1010}
@@ -880,7 +1015,7 @@ static inline int is_buf_assigned(void* userptr, edge_t* e = NULL)
880 if(!ptr) 1015 if(!ptr)
881 return 0; 1016 return 0;
882 if(ptr->assigned_edge.graph == BAD_EDGE.graph && 1017 if(ptr->assigned_edge.graph == BAD_EDGE.graph &&
883 ptr->assigned_edge.edge == BAD_EDGE.edge) 1018 ptr->assigned_edge.edge == BAD_EDGE.edge)
884 return 0; 1019 return 0;
885 if(e) 1020 if(e)
886 *e = ptr->assigned_edge; 1021 *e = ptr->assigned_edge;
@@ -1081,7 +1216,7 @@ void* __pgm_swap_edge_buf(edge_t edge, void* new_uptr, bool swap_producer)
1081 goto out; 1216 goto out;
1082 } 1217 }
1083 if(hdr->assigned_edge.graph != BAD_EDGE.graph && 1218 if(hdr->assigned_edge.graph != BAD_EDGE.graph &&
1084 hdr->assigned_edge.edge != BAD_EDGE.edge) 1219 hdr->assigned_edge.edge != BAD_EDGE.edge)
1085 { 1220 {
1086 E("%p is already in use by an edge.\n", new_uptr); 1221 E("%p is already in use by an edge.\n", new_uptr);
1087 goto out; 1222 goto out;
@@ -1093,7 +1228,7 @@ void* __pgm_swap_edge_buf(edge_t edge, void* new_uptr, bool swap_producer)
1093 if(hdr->usersize != old_hdr->usersize) 1228 if(hdr->usersize != old_hdr->usersize)
1094 { 1229 {
1095 E("Buffer %p is the wrong size. (is %lu, expected %lu)\n", 1230 E("Buffer %p is the wrong size. (is %lu, expected %lu)\n",
1096 new_uptr, hdr->usersize, old_hdr->usersize); 1231 new_uptr, hdr->usersize, old_hdr->usersize);
1097 goto out; 1232 goto out;
1098 } 1233 }
1099 1234
@@ -1151,7 +1286,7 @@ int pgm_swap_edge_bufs(void* a, void* b)
1151 if(hdra->usersize != hdrb->usersize) 1286 if(hdra->usersize != hdrb->usersize)
1152 { 1287 {
1153 E("Buffers are not the same size: %p:%lu vs %p:%lu\n", 1288 E("Buffers are not the same size: %p:%lu vs %p:%lu\n",
1154 a, hdra->usersize, b, hdrb->usersize); 1289 a, hdra->usersize, b, hdrb->usersize);
1155 goto out; 1290 goto out;
1156 } 1291 }
1157 1292
@@ -1731,6 +1866,19 @@ int pgm_init_edge(edge_t* edge,
1731 E("Produce amnt. must equal consume amnt. for POSIX msg queues.\n"); 1866 E("Produce amnt. must equal consume amnt. for POSIX msg queues.\n");
1732 goto out; 1867 goto out;
1733 } 1868 }
1869 if(attr->type & __PGM_EDGE_RING)
1870 {
1871 if(attr->nr_produce != attr->nr_consume)
1872 {
1873 E("Produce amnt. must equal consume amnt. for ring buffers.\n");
1874 goto out;
1875 }
1876 if(attr->nmemb == 0)
1877 {
1878 E("nmemb cannot be zero.\n");
1879 goto out;
1880 }
1881 }
1734 1882
1735 if(attr->nr_threshold < attr->nr_consume) 1883 if(attr->nr_threshold < attr->nr_consume)
1736 goto out; 1884 goto out;
@@ -1784,14 +1932,18 @@ int pgm_init_edge(edge_t* edge,
1784 e->consumer = consumer.node; 1932 e->consumer = consumer.node;
1785 e->attr = *attr; 1933 e->attr = *attr;
1786 1934
1787 if(attr->type & __PGM_EDGE_FIFO) 1935 if (attr->type & __PGM_EDGE_CV)
1936 e->ops = &pgm_cv_edge_ops;
1937 else if(attr->type & __PGM_EDGE_FIFO)
1788 e->ops = &pgm_fifo_edge_ops; 1938 e->ops = &pgm_fifo_edge_ops;
1789 else if(attr->type & __PGM_EDGE_MQ) 1939 else if(attr->type & __PGM_EDGE_MQ)
1790 e->ops = &pgm_mq_edge_ops; 1940 e->ops = &pgm_mq_edge_ops;
1941 else if(attr->type & __PGM_EDGE_RING)
1942 e->ops = &pgm_ring_edge_ops;
1791 else if(attr->type & __PGM_EDGE_SOCK_STREAM) 1943 else if(attr->type & __PGM_EDGE_SOCK_STREAM)
1792 e->ops = &pgm_sock_stream_edge_ops; 1944 e->ops = &pgm_sock_stream_edge_ops;
1793 else 1945 else
1794 e->ops = &pgm_cv_edge_ops; 1946 goto out_unlock;
1795 1947
1796 ret = e->ops->init(g, np, nc, e); 1948 ret = e->ops->init(g, np, nc, e);
1797 1949
@@ -2738,7 +2890,7 @@ static eWaitStatus pgm_wait_for_tokens(struct pgm_graph* g, struct pgm_node* n)
2738 pgm_lock(&n->lock, flags); 2890 pgm_lock(&n->lock, flags);
2739 do 2891 do
2740 { 2892 {
2741 // recheck the condition 2893 // recheck the condition
2742 nr_ready = pgm_nr_ready_edges(g, n); 2894 nr_ready = pgm_nr_ready_edges(g, n);
2743 if(nr_ready == n->nr_in_signaled) 2895 if(nr_ready == n->nr_in_signaled)
2744 break; 2896 break;
@@ -2760,9 +2912,9 @@ static void pgm_consume_tokens(struct pgm_graph* g, struct pgm_node* n)
2760{ 2912{
2761 for(int i = 0; i < n->nr_in; ++i) 2913 for(int i = 0; i < n->nr_in; ++i)
2762 { 2914 {
2763 struct pgm_edge* e = &g->edges[n->in[i]]; 2915 struct pgm_edge* e = &g->edges[n->in[i]];
2764 if(is_signal_driven(e)) 2916 if(is_signal_driven(e))
2765 __sync_fetch_and_sub(&e->nr_pending, e->attr.nr_consume); 2917 __sync_fetch_and_sub(&e->nr_pending, e->attr.nr_consume);
2766 } 2918 }
2767} 2919}
2768 2920
@@ -2790,7 +2942,7 @@ typedef uint32_t pgm_fd_mask_t;
2790 2942
2791static const unsigned char PGM_NORMAL = 0x01; 2943static const unsigned char PGM_NORMAL = 0x01;
2792 2944
2793static int pgm_send_data(struct pgm_edge* e, pgm_command_t tag = PGM_NORMAL) 2945static int pgm_send_std_data(struct pgm_edge* e, pgm_command_t tag)
2794{ 2946{
2795 // only the tag is sent if this is a terminate message 2947 // only the tag is sent if this is a terminate message
2796 2948
@@ -2834,6 +2986,23 @@ static int pgm_send_data(struct pgm_edge* e, pgm_command_t tag = PGM_NORMAL)
2834 return ret; 2986 return ret;
2835} 2987}
2836 2988
2989static int pgm_send_ring_data(struct pgm_edge* e, pgm_command_t tag)
2990{
2991 if(!(tag & PGM_TERMINATE))
2992 e->ops->write(e, pgm_get_user_ptr(e->buf_out), e->attr.nr_produce);
2993 else
2994 e->ring_cmd = tag;
2995 return 0;
2996}
2997
2998static int pgm_send_data(struct pgm_edge* e, pgm_command_t tag = PGM_NORMAL)
2999{
3000 if(!(e->attr.type & __PGM_EDGE_RING))
3001 return pgm_send_std_data(e, tag);
3002 else
3003 return pgm_send_ring_data(e, tag);
3004}
3005
2837static eWaitStatus pgm_wait_for_data(pgm_fd_mask_t* to_wait, 3006static eWaitStatus pgm_wait_for_data(pgm_fd_mask_t* to_wait,
2838 struct pgm_graph* g, struct pgm_node* n) 3007 struct pgm_graph* g, struct pgm_node* n)
2839{ 3008{
@@ -2880,7 +3049,7 @@ static eWaitStatus pgm_wait_for_data(pgm_fd_mask_t* to_wait,
2880 ++scanned; 3049 ++scanned;
2881 } 3050 }
2882 } 3051 }
2883 ++num_looped; 3052 ++num_looped;
2884 } 3053 }
2885 3054
2886 return wait_status; 3055 return wait_status;
@@ -2946,6 +3115,16 @@ wait_for_data: // jump here if we would block on read
2946 if(!is_data_passing(e)) 3115 if(!is_data_passing(e))
2947 continue; 3116 continue;
2948 3117
3118 if(e->attr.type & __PGM_EDGE_RING)
3119 {
3120 /* short-cut for the simple ring buffer IPC */
3121 if(!((e->ring_cmd & PGM_TERMINATE) && is_ring_empty(&e->ringbuf)))
3122 e->ops->read(e, dest_ptrs[i], e->attr.nr_consume);
3123 else
3124 n->nr_terminate_msgs++;
3125 continue;
3126 }
3127
2949read_more: // jump to here if we need to read more bytes into our buffer 3128read_more: // jump to here if we need to read more bytes into our buffer
2950 3129
2951 remaining = e->attr.nr_consume - (dest_ptrs[i] - (char*)pgm_get_user_ptr(e->buf_in)); 3130 remaining = e->attr.nr_consume - (dest_ptrs[i] - (char*)pgm_get_user_ptr(e->buf_in));
@@ -3094,14 +3273,14 @@ int pgm_wait(node_t node)
3094 } 3273 }
3095 3274
3096 if(token_status == WaitExhaustedAndTerminate && 3275 if(token_status == WaitExhaustedAndTerminate &&
3097 data_status == WaitExhaustedAndTerminate) 3276 data_status == WaitExhaustedAndTerminate)
3098 { 3277 {
3099 ret = PGM_TERMINATE; 3278 ret = PGM_TERMINATE;
3100 } 3279 }
3101 else 3280 else
3102 { 3281 {
3103 ret = (token_status == WaitTimeout || token_status == WaitError || 3282 ret = (token_status == WaitTimeout || token_status == WaitError ||
3104 data_status == WaitTimeout || data_status == WaitError) ? 3283 data_status == WaitTimeout || data_status == WaitError) ?
3105 -1 : 0; 3284 -1 : 0;
3106 } 3285 }
3107 3286
diff --git a/tools/ringtest.cpp b/tools/ringtest.cpp
new file mode 100644
index 0000000..d6850ed
--- /dev/null
+++ b/tools/ringtest.cpp
@@ -0,0 +1,153 @@
1// Copyright (c) 2014, Glenn Elliott
2// All rights reserved.
3
4/* A program for testing the basic ring-based edge. */
5
6#include <iostream>
7#include <unistd.h>
8#include <errno.h>
9#include <pthread.h>
10#include <string.h>
11#include <stdlib.h>
12
13#include "pgm.h"
14
15int errors = 0;
16pthread_barrier_t init_barrier;
17
18__thread char __errstr[80] = {0};
19
20#define CheckError(e) \
21do { int __ret = (e); \
22if(__ret < 0) { \
23 errors++; \
24 char* errstr = strerror_r(errno, __errstr, sizeof(errstr)); \
25 fprintf(stderr, "%lu: Error %d (%s (%d)) @ %s:%s:%d\n", \
26 pthread_self(), __ret, errstr, errno, __FILE__, __FUNCTION__, __LINE__); \
27}}while(0)
28
29int TOTAL_ITERATIONS = 10000000;
30
31void* thread(void* _graph_t)
32{
33 char tabbuf[] = "\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t";
34 int iterations = 0;
35 int ret = 0;
36
37 const graph_t& graph = *((graph_t*)_graph_t);
38 node_t node;
39
40 CheckError(pgm_claim_any_node(graph, &node));
41 tabbuf[node.node] = '\0';
42
43 int out_degree = pgm_get_degree_out(node);
44 int in_degree = pgm_get_degree_in(node);
45 edge_t* out_edges = (edge_t*)calloc(out_degree, sizeof(edge_t));
46 edge_t* in_edges = (edge_t*)calloc(in_degree, sizeof(edge_t));
47
48 bool is_src = (in_degree == 0);
49 uint32_t* buf;
50 if(is_src)
51 buf = (uint32_t*)pgm_get_edge_buf_p(out_edges[0]);
52 else
53 buf = (uint32_t*)pgm_get_edge_buf_c(in_edges[0]);
54
55 uint64_t sum = 0;
56
57 pthread_barrier_wait(&init_barrier);
58
59 if(!errors)
60 {
61 do {
62 if(!is_src)
63 {
64 ret = pgm_wait(node);
65 }
66 else
67 {
68 if(iterations > TOTAL_ITERATIONS)
69 ret = PGM_TERMINATE;
70 }
71
72 if(ret != PGM_TERMINATE)
73 {
74 CheckError(ret);
75
76 if(is_src)
77 {
78// usleep(500*1000);
79// fprintf(stdout, "%s%d fires\n", tabbuf, node.node);
80 *buf = (uint32_t)iterations;
81 sum += iterations;
82 }
83 else
84 {
85// fprintf(stdout, "%s%d fires. read:%d\n", tabbuf, node.node, *buf);
86 sum += *buf;
87
88 // slow down the consumer a little bit to induce backlog in token buffer
89 if(rand()%5 == 0)
90 sched_yield();
91 }
92
93 CheckError(pgm_complete(node));
94 iterations++;
95 }
96 else
97 {
98 fprintf(stdout, "%s%d terminates: sum: %lu\n", tabbuf, node.node, sum);
99
100 if(is_src)
101 CheckError(pgm_terminate(node));
102 }
103
104 } while(ret != PGM_TERMINATE);
105 }
106
107 pthread_barrier_wait(&init_barrier);
108
109 CheckError(pgm_release_node(node));
110
111 free(out_edges);
112 free(in_edges);
113
114 pthread_exit(0);
115}
116
117int main(void)
118{
119 graph_t g;
120 node_t n0, n1;
121 edge_t e0_1;
122
123 pthread_t t0, t1;
124
125 edge_attr_t ring_attr;
126 memset(&ring_attr, 0, sizeof(ring_attr));
127 ring_attr.type = pgm_ring_edge;
128 ring_attr.nr_produce = sizeof(uint32_t);
129 ring_attr.nr_consume = sizeof(uint32_t);
130 ring_attr.nr_threshold = sizeof(uint32_t);
131 ring_attr.nmemb = 32;
132
133 CheckError(pgm_init("/tmp/graphs", 1));
134 CheckError(pgm_init_graph(&g, "demo"));
135
136 CheckError(pgm_init_node(&n0, g, "n0"));
137 CheckError(pgm_init_node(&n1, g, "n1"));
138
139 CheckError(pgm_init_edge(&e0_1, n0, n1, "e0_1", &ring_attr));
140
141 pthread_barrier_init(&init_barrier, 0, 1);
142 pthread_create(&t0, 0, thread, &g);
143 pthread_create(&t1, 0, thread, &g);
144
145 pthread_join(t0, 0);
146 pthread_join(t1, 0);
147
148 CheckError(pgm_destroy_graph(g));
149
150 CheckError(pgm_destroy());
151
152 return 0;
153}