diff options
author | Nathan Otterness <otternes@cs.unc.edu> | 2016-04-12 11:19:20 -0400 |
---|---|---|
committer | Nathan Otterness <otternes@cs.unc.edu> | 2016-04-12 11:19:20 -0400 |
commit | bb59578c09ed5decd69f6e99975d1f3234ccc306 (patch) | |
tree | 445a02e301613850a221cdeb63febb635176d429 | |
parent | 53fd6432b2db18cf772900a031fe8e6fc916e8c3 (diff) |
Enable termination in the wait free buffer
This change adds a field to the shared wait free buffer analogous to the
ring_cmd field in the ring buffer IPC mechanism. It is set to
PGM_TERMINATE when the consumer has no more data to send.
-rw-r--r-- | include/wait_free.h | 5 | ||||
-rw-r--r-- | src/pgm.cpp | 32 | ||||
-rw-r--r-- | tools/wait_free_test.cpp | 10 |
3 files changed, 32 insertions, 15 deletions
diff --git a/include/wait_free.h b/include/wait_free.h index 38e2442..1a366b2 100644 --- a/include/wait_free.h +++ b/include/wait_free.h | |||
@@ -7,6 +7,10 @@ | |||
7 | 7 | ||
8 | // Implements a wait-free buffer where each buffer is an 8 byte uint64_t. | 8 | // Implements a wait-free buffer where each buffer is an 8 byte uint64_t. |
9 | struct wait_free { | 9 | struct wait_free { |
10 | // This fulfills the same purpose as edge->ring_cmd in pgm.cpp. It must only | ||
11 | // ever be 0 or PGM_TERMINATE. However, it must reside in the same colored | ||
12 | // shared memory as the ring buffer. | ||
13 | pgm_command_t command; | ||
10 | std::atomic_uchar latest; | 14 | std::atomic_uchar latest; |
11 | std::atomic_uchar reading; | 15 | std::atomic_uchar reading; |
12 | int wdcnt; | 16 | int wdcnt; |
@@ -20,6 +24,7 @@ static inline int InitWaitFree(struct wait_free *b) { | |||
20 | b->latest = 0; | 24 | b->latest = 0; |
21 | b->wdcnt = -1; | 25 | b->wdcnt = -1; |
22 | b->reading = 0; | 26 | b->reading = 0; |
27 | b->command = 0; | ||
23 | return 0; | 28 | return 0; |
24 | } | 29 | } |
25 | 30 | ||
diff --git a/src/pgm.cpp b/src/pgm.cpp index 3a2c463..93c8533 100644 --- a/src/pgm.cpp +++ b/src/pgm.cpp | |||
@@ -3561,12 +3561,20 @@ static int pgm_send_ring_data(struct pgm_edge* e, pgm_command_t tag) | |||
3561 | return 0; | 3561 | return 0; |
3562 | } | 3562 | } |
3563 | 3563 | ||
3564 | static int pgm_send_waitfree_data(struct pgm_edge *e, pgm_command_t tag) { | ||
3565 | if (tag & PGM_TERMINATE) { | ||
3566 | e->wait_free_buffer->command = tag; | ||
3567 | } else { | ||
3568 | e->ops->write(e, pgm_get_user_ptr(e->buf_out), e->attr.nr_produce); | ||
3569 | } | ||
3570 | return 0; | ||
3571 | } | ||
3572 | |||
3564 | static int pgm_send_data(struct pgm_edge* e, pgm_command_t tag = PGM_NORMAL) | 3573 | static int pgm_send_data(struct pgm_edge* e, pgm_command_t tag = PGM_NORMAL) |
3565 | { | 3574 | { |
3566 | if(!(e->attr.type & __PGM_EDGE_RING)) | 3575 | if (e->attr.type & __PGM_EDGE_RING) return pgm_send_ring_data(e, tag); |
3567 | return pgm_send_std_data(e, tag); | 3576 | if (e->attr.type & __PGM_EDGE_WAITFREE) return pgm_send_waitfree_data(e, tag); |
3568 | else | 3577 | return pgm_send_std_data(e, tag); |
3569 | return pgm_send_ring_data(e, tag); | ||
3570 | } | 3578 | } |
3571 | 3579 | ||
3572 | static eWaitStatus pgm_wait_for_data(pgm_fd_mask_t* to_wait, | 3580 | static eWaitStatus pgm_wait_for_data(pgm_fd_mask_t* to_wait, |
@@ -3707,6 +3715,20 @@ wait_for_data: // jump here if we would block on read | |||
3707 | continue; | 3715 | continue; |
3708 | } | 3716 | } |
3709 | 3717 | ||
3718 | if (e->attr.type & __PGM_EDGE_WAITFREE) { | ||
3719 | // Shortcut for the wait-free buffer IPC, similar to the previous block | ||
3720 | // for ring buffers. (The only difference is that the wait free buffer | ||
3721 | // has no notion of being full or empty) | ||
3722 | if (e->wait_free_buffer->command & PGM_TERMINATE) { | ||
3723 | n->nr_terminate_msgs++; | ||
3724 | } | ||
3725 | // This is INTENTIONALLY re-read if a terminate is received; to make sure | ||
3726 | // that the final data is read. Since it will only re-read the final data | ||
3727 | // this can't result in anything being read that wasn't written. | ||
3728 | e->ops->read(e, dest_ptrs[i], e->attr.nr_consume); | ||
3729 | continue; | ||
3730 | } | ||
3731 | |||
3710 | read_more: // jump to here if we need to read more bytes into our buffer | 3732 | read_more: // jump to here if we need to read more bytes into our buffer |
3711 | 3733 | ||
3712 | remaining = e->attr.nr_consume - (dest_ptrs[i] - (char*)pgm_get_user_ptr(e->buf_in)); | 3734 | remaining = e->attr.nr_consume - (dest_ptrs[i] - (char*)pgm_get_user_ptr(e->buf_in)); |
@@ -3830,7 +3852,7 @@ read_more: // jump to here if we need to read more bytes into our buffer | |||
3830 | out: | 3852 | out: |
3831 | // signal terminte if everyone has checked in | 3853 | // signal terminte if everyone has checked in |
3832 | if(n->nr_terminate_msgs && | 3854 | if(n->nr_terminate_msgs && |
3833 | (n->nr_terminate_msgs == (n->nr_in_data - n->nr_in_data_backedges))) | 3855 | (n->nr_terminate_msgs >= (n->nr_in_data - n->nr_in_data_backedges))) |
3834 | wait_status = WaitExhaustedAndTerminate; | 3856 | wait_status = WaitExhaustedAndTerminate; |
3835 | 3857 | ||
3836 | return wait_status; | 3858 | return wait_status; |
diff --git a/tools/wait_free_test.cpp b/tools/wait_free_test.cpp index f4e3376..f62f952 100644 --- a/tools/wait_free_test.cpp +++ b/tools/wait_free_test.cpp | |||
@@ -107,16 +107,6 @@ int main(void) { | |||
107 | waitfree_attr.nr_consume = sizeof(uint32_t); | 107 | waitfree_attr.nr_consume = sizeof(uint32_t); |
108 | waitfree_attr.nr_threshold = sizeof(uint32_t); | 108 | waitfree_attr.nr_threshold = sizeof(uint32_t); |
109 | 109 | ||
110 | /* | ||
111 | edge_attr_t ring_attr; | ||
112 | memset(&ring_attr, 0, sizeof(ring_attr)); | ||
113 | ring_attr.type = pgm_ring_edge; | ||
114 | ring_attr.nr_produce = sizeof(uint32_t); | ||
115 | ring_attr.nr_consume = sizeof(uint32_t); | ||
116 | ring_attr.nr_threshold = sizeof(uint32_t); | ||
117 | ring_attr.nmemb = 32; | ||
118 | */ | ||
119 | |||
120 | CheckError(pgm_init_process_local()); | 110 | CheckError(pgm_init_process_local()); |
121 | CheckError(pgm_init_graph(&g, "demo")); | 111 | CheckError(pgm_init_graph(&g, "demo")); |
122 | 112 | ||