aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNathan Otterness <otternes@cs.unc.edu>2016-04-12 11:19:20 -0400
committerNathan Otterness <otternes@cs.unc.edu>2016-04-12 11:19:20 -0400
commitbb59578c09ed5decd69f6e99975d1f3234ccc306 (patch)
tree445a02e301613850a221cdeb63febb635176d429
parent53fd6432b2db18cf772900a031fe8e6fc916e8c3 (diff)
Enable termination in the wait free buffer
This change adds a field to the shared wait free buffer analogous to the ring_cmd field in the ring buffer IPC mechanism. It is set to PGM_TERMINATE when the consumer has no more data to send.
-rw-r--r--include/wait_free.h5
-rw-r--r--src/pgm.cpp32
-rw-r--r--tools/wait_free_test.cpp10
3 files changed, 32 insertions, 15 deletions
diff --git a/include/wait_free.h b/include/wait_free.h
index 38e2442..1a366b2 100644
--- a/include/wait_free.h
+++ b/include/wait_free.h
@@ -7,6 +7,10 @@
7 7
8// Implements a wait-free buffer where each buffer is an 8 byte uint64_t. 8// Implements a wait-free buffer where each buffer is an 8 byte uint64_t.
9struct wait_free { 9struct wait_free {
10 // This fulfills the same purpose as edge->ring_cmd in pgm.cpp. It must only
11 // ever be 0 or PGM_TERMINATE. However, it must reside in the same colored
12 // shared memory as the ring buffer.
13 pgm_command_t command;
10 std::atomic_uchar latest; 14 std::atomic_uchar latest;
11 std::atomic_uchar reading; 15 std::atomic_uchar reading;
12 int wdcnt; 16 int wdcnt;
@@ -20,6 +24,7 @@ static inline int InitWaitFree(struct wait_free *b) {
20 b->latest = 0; 24 b->latest = 0;
21 b->wdcnt = -1; 25 b->wdcnt = -1;
22 b->reading = 0; 26 b->reading = 0;
27 b->command = 0;
23 return 0; 28 return 0;
24} 29}
25 30
diff --git a/src/pgm.cpp b/src/pgm.cpp
index 3a2c463..93c8533 100644
--- a/src/pgm.cpp
+++ b/src/pgm.cpp
@@ -3561,12 +3561,20 @@ static int pgm_send_ring_data(struct pgm_edge* e, pgm_command_t tag)
3561 return 0; 3561 return 0;
3562} 3562}
3563 3563
3564static int pgm_send_waitfree_data(struct pgm_edge *e, pgm_command_t tag) {
3565 if (tag & PGM_TERMINATE) {
3566 e->wait_free_buffer->command = tag;
3567 } else {
3568 e->ops->write(e, pgm_get_user_ptr(e->buf_out), e->attr.nr_produce);
3569 }
3570 return 0;
3571}
3572
3564static int pgm_send_data(struct pgm_edge* e, pgm_command_t tag = PGM_NORMAL) 3573static int pgm_send_data(struct pgm_edge* e, pgm_command_t tag = PGM_NORMAL)
3565{ 3574{
3566 if(!(e->attr.type & __PGM_EDGE_RING)) 3575 if (e->attr.type & __PGM_EDGE_RING) return pgm_send_ring_data(e, tag);
3567 return pgm_send_std_data(e, tag); 3576 if (e->attr.type & __PGM_EDGE_WAITFREE) return pgm_send_waitfree_data(e, tag);
3568 else 3577 return pgm_send_std_data(e, tag);
3569 return pgm_send_ring_data(e, tag);
3570} 3578}
3571 3579
3572static eWaitStatus pgm_wait_for_data(pgm_fd_mask_t* to_wait, 3580static eWaitStatus pgm_wait_for_data(pgm_fd_mask_t* to_wait,
@@ -3707,6 +3715,20 @@ wait_for_data: // jump here if we would block on read
3707 continue; 3715 continue;
3708 } 3716 }
3709 3717
3718 if (e->attr.type & __PGM_EDGE_WAITFREE) {
3719 // Shortcut for the wait-free buffer IPC, similar to the previous block
3720 // for ring buffers. (The only difference is that the wait free buffer
3721 // has no notion of being full or empty)
3722 if (e->wait_free_buffer->command & PGM_TERMINATE) {
3723 n->nr_terminate_msgs++;
3724 }
3725 // This is INTENTIONALLY re-read if a terminate is received; to make sure
3726 // that the final data is read. Since it will only re-read the final data
3727 // this can't result in anything being read that wasn't written.
3728 e->ops->read(e, dest_ptrs[i], e->attr.nr_consume);
3729 continue;
3730 }
3731
3710read_more: // jump to here if we need to read more bytes into our buffer 3732read_more: // jump to here if we need to read more bytes into our buffer
3711 3733
3712 remaining = e->attr.nr_consume - (dest_ptrs[i] - (char*)pgm_get_user_ptr(e->buf_in)); 3734 remaining = e->attr.nr_consume - (dest_ptrs[i] - (char*)pgm_get_user_ptr(e->buf_in));
@@ -3830,7 +3852,7 @@ read_more: // jump to here if we need to read more bytes into our buffer
3830out: 3852out:
3831 // signal terminte if everyone has checked in 3853 // signal terminte if everyone has checked in
3832 if(n->nr_terminate_msgs && 3854 if(n->nr_terminate_msgs &&
3833 (n->nr_terminate_msgs == (n->nr_in_data - n->nr_in_data_backedges))) 3855 (n->nr_terminate_msgs >= (n->nr_in_data - n->nr_in_data_backedges)))
3834 wait_status = WaitExhaustedAndTerminate; 3856 wait_status = WaitExhaustedAndTerminate;
3835 3857
3836 return wait_status; 3858 return wait_status;
diff --git a/tools/wait_free_test.cpp b/tools/wait_free_test.cpp
index f4e3376..f62f952 100644
--- a/tools/wait_free_test.cpp
+++ b/tools/wait_free_test.cpp
@@ -107,16 +107,6 @@ int main(void) {
107 waitfree_attr.nr_consume = sizeof(uint32_t); 107 waitfree_attr.nr_consume = sizeof(uint32_t);
108 waitfree_attr.nr_threshold = sizeof(uint32_t); 108 waitfree_attr.nr_threshold = sizeof(uint32_t);
109 109
110 /*
111 edge_attr_t ring_attr;
112 memset(&ring_attr, 0, sizeof(ring_attr));
113 ring_attr.type = pgm_ring_edge;
114 ring_attr.nr_produce = sizeof(uint32_t);
115 ring_attr.nr_consume = sizeof(uint32_t);
116 ring_attr.nr_threshold = sizeof(uint32_t);
117 ring_attr.nmemb = 32;
118 */
119
120 CheckError(pgm_init_process_local()); 110 CheckError(pgm_init_process_local());
121 CheckError(pgm_init_graph(&g, "demo")); 111 CheckError(pgm_init_graph(&g, "demo"));
122 112