diff options
author | Glenn Elliott <gelliott@cs.unc.edu> | 2014-06-02 13:23:07 -0400 |
---|---|---|
committer | Glenn Elliott <gelliott@cs.unc.edu> | 2014-06-02 13:23:07 -0400 |
commit | fb476b1a4112608acfe8c6ba822a490d3df295b7 (patch) | |
tree | ad51760e6c3d29237b92a3d6c167f6f1c8ef364c | |
parent | 8df9e5e43e5e9b498922e59d897ea7e8c8137396 (diff) |
API Addition: Added pgm_init_process_local()
Added new PGM init API pgm_init_process_local(). This
initializes the PGM^RT backend to support only process-local
graphs. The PGM^RT runtime does not need to create any
directories or files when initialized into process-local mode.
However, the user cannot use FIFO-based edges in this mode.
-rw-r--r-- | include/pgm.h | 11 | ||||
-rw-r--r-- | src/pgm.cpp | 164 | ||||
-rw-r--r-- | tools/ringtest.cpp | 2 |
3 files changed, 115 insertions, 62 deletions
diff --git a/include/pgm.h b/include/pgm.h index d570faf..0db7e01 100644 --- a/include/pgm.h +++ b/include/pgm.h | |||
@@ -126,6 +126,17 @@ static const edge_attr_t default_edge = { | |||
126 | }; | 126 | }; |
127 | 127 | ||
128 | /* | 128 | /* |
129 | Initialize the PGM runtime in the application for cases where | ||
130 | graph nodes are always (1) process-local (threads within the same | ||
131 | process) and (2) FIFO-based edges are never used. | ||
132 | |||
133 | Use pgm_init() if nodes are cross-process, or FIFO edges | ||
134 | are needed. If just FIFO edges are needed and nodes are not | ||
135 | cross-process, pass "use_shared_mem=0" to pgm_init(). | ||
136 | */ | ||
137 | int pgm_init_process_local(void); | ||
138 | |||
139 | /* | ||
129 | Initialize the PGM runtime in the application. | 140 | Initialize the PGM runtime in the application. |
130 | [in] dir: Directory where PGM data is stored. | 141 | [in] dir: Directory where PGM data is stored. |
131 | [in] create: | 142 | [in] create: |
diff --git a/src/pgm.cpp b/src/pgm.cpp index 8057c36..a9816ea 100644 --- a/src/pgm.cpp +++ b/src/pgm.cpp | |||
@@ -46,9 +46,7 @@ using namespace boost::filesystem; | |||
46 | 46 | ||
47 | 47 | ||
48 | // TODO LIST: | 48 | // TODO LIST: |
49 | // * In-memory buffers that can be passed from producer to consumer | 49 | // * Ring buffers in shared memory. |
50 | // without any copies. | ||
51 | // BONUS: Shared memory support. | ||
52 | 50 | ||
53 | #ifndef PGM_CONFIG | 51 | #ifndef PGM_CONFIG |
54 | #error "pgm/include/config.h not included!" | 52 | #error "pgm/include/config.h not included!" |
@@ -420,19 +418,28 @@ static int fifo_create(pgm_graph* g, | |||
420 | pgm_edge* edge) | 418 | pgm_edge* edge) |
421 | { | 419 | { |
422 | int ret = -1; | 420 | int ret = -1; |
423 | string fifoName(fifo_name(g, producer, consumer, edge)); | ||
424 | 421 | ||
425 | path fifoPath(gGraphPath); | 422 | if(gGraphPath.string().empty()) |
426 | fifoPath /= fifoName; | 423 | { |
424 | F("Graph directory not set. Did you call pgm_init_process_local() " | ||
425 | "insted of pgm_init()?\n"); | ||
426 | } | ||
427 | else | ||
428 | { | ||
429 | string fifoName(fifo_name(g, producer, consumer, edge)); | ||
427 | 430 | ||
428 | // Remove any old FIFO that may exist. | 431 | path fifoPath(gGraphPath); |
429 | remove(fifoPath); | 432 | fifoPath /= fifoName; |
430 | 433 | ||
431 | // TODO: See what boost can do here. | 434 | // Remove any old FIFO that may exist. |
432 | ret = mkfifo(fifoPath.string().c_str(), S_IRUSR | S_IWUSR); | 435 | remove(fifoPath); |
433 | if(0 != ret) | 436 | |
434 | { | 437 | // TODO: See what boost can do here. |
435 | F("Failed to make FIFO %s\n", fifoPath.string().c_str()); | 438 | ret = mkfifo(fifoPath.string().c_str(), S_IRUSR | S_IWUSR); |
439 | if(0 != ret) | ||
440 | { | ||
441 | F("Failed to make FIFO %s\n", fifoPath.string().c_str()); | ||
442 | } | ||
436 | } | 443 | } |
437 | return ret; | 444 | return ret; |
438 | } | 445 | } |
@@ -441,18 +448,28 @@ static int fifo_open_consumer(pgm_graph* g, | |||
441 | pgm_node* producer, pgm_node* consumer, | 448 | pgm_node* producer, pgm_node* consumer, |
442 | pgm_edge* edge) | 449 | pgm_edge* edge) |
443 | { | 450 | { |
444 | path fifoPath(gGraphPath); | 451 | int ret = -1; |
445 | fifoPath /= fifo_name(g, producer, consumer, edge); | 452 | if(gGraphPath.string().empty()) |
446 | edge->fd_in = open(fifoPath.string().c_str(), O_RDONLY | O_NONBLOCK); | ||
447 | if(edge->fd_in == -1) | ||
448 | { | 453 | { |
449 | F("Could not open inbound edge %s/%s (FIFO)\n", g->name, edge->name); | 454 | F("Graph directory not set. Did you call pgm_init_process_local() " |
450 | return -1; | 455 | "insted of pgm_init()?\n"); |
451 | } | 456 | } |
457 | else | ||
458 | { | ||
459 | path fifoPath(gGraphPath); | ||
460 | fifoPath /= fifo_name(g, producer, consumer, edge); | ||
461 | edge->fd_in = open(fifoPath.string().c_str(), O_RDONLY | O_NONBLOCK); | ||
452 | 462 | ||
453 | edge->buf_in = __pgm_malloc_edge_buf(g, edge, false); | 463 | if(edge->fd_in != -1) |
454 | 464 | { | |
455 | return 0; | 465 | edge->buf_in = __pgm_malloc_edge_buf(g, edge, false); |
466 | } | ||
467 | else | ||
468 | { | ||
469 | F("Could not open inbound edge %s/%s (FIFO)\n", g->name, edge->name); | ||
470 | } | ||
471 | } | ||
472 | return ret; | ||
456 | } | 473 | } |
457 | 474 | ||
458 | static int fifo_open_producer(pgm_graph* g, | 475 | static int fifo_open_producer(pgm_graph* g, |
@@ -460,41 +477,54 @@ static int fifo_open_producer(pgm_graph* g, | |||
460 | pgm_edge* edge) | 477 | pgm_edge* edge) |
461 | { | 478 | { |
462 | int ret = -1; | 479 | int ret = -1; |
463 | path fifoPath(gGraphPath); | ||
464 | fifoPath /= fifo_name(g, producer, consumer, edge); | ||
465 | 480 | ||
466 | const int timeout = 60; | 481 | if(gGraphPath.string().empty()) |
467 | const int start_time = time(0); | ||
468 | __sync_synchronize(); | ||
469 | do | ||
470 | { | 482 | { |
471 | edge->fd_out = open(fifoPath.string().c_str(), O_WRONLY | O_NONBLOCK); | 483 | F("Graph directory not set. Did you call pgm_init_process_local() " |
472 | if(edge->fd_out == -1) | 484 | "insted of pgm_init()?\n"); |
485 | } | ||
486 | else | ||
487 | { | ||
488 | path fifoPath(gGraphPath); | ||
489 | fifoPath /= fifo_name(g, producer, consumer, edge); | ||
490 | |||
491 | const int timeout = 60; | ||
492 | const int start_time = time(0); | ||
493 | __sync_synchronize(); | ||
494 | do | ||
473 | { | 495 | { |
474 | if(errno != ENXIO) | 496 | edge->fd_out = open(fifoPath.string().c_str(), |
475 | { | 497 | O_WRONLY | O_NONBLOCK); |
476 | F("Could not open outbound edge %s/%s (FIFO)\n", g->name, edge->name); | 498 | if(edge->fd_out == -1) |
477 | break; | ||
478 | } | ||
479 | else | ||
480 | { | 499 | { |
481 | if(time(0) - start_time > timeout) | 500 | if(errno != ENXIO) |
482 | { | 501 | { |
483 | F("Could not open outbound edge %s/%s (FIFO)\n", g->name, edge->name); | 502 | F("Could not open outbound edge %s/%s (FIFO)\n", |
503 | g->name, edge->name); | ||
484 | break; | 504 | break; |
485 | } | 505 | } |
486 | usleep(1000); // wait for a millisecond | 506 | else |
507 | { | ||
508 | if(time(0) - start_time > timeout) | ||
509 | { | ||
510 | F("Could not open outbound edge %s/%s (FIFO)\n", | ||
511 | g->name, edge->name); | ||
512 | break; | ||
513 | } | ||
514 | usleep(1000); // wait for a millisecond | ||
515 | } | ||
487 | } | 516 | } |
488 | } | 517 | else |
489 | else | 518 | { |
519 | ret = 0; | ||
520 | } | ||
521 | }while(ret == -1); | ||
522 | |||
523 | if(!ret) | ||
490 | { | 524 | { |
491 | ret = 0; | 525 | edge->buf_out = __pgm_malloc_edge_buf(g, edge, true); |
492 | } | 526 | } |
493 | }while(ret == -1); | 527 | } |
494 | |||
495 | if(!ret) | ||
496 | edge->buf_out = __pgm_malloc_edge_buf(g, edge, true); | ||
497 | |||
498 | return ret; | 528 | return ret; |
499 | } | 529 | } |
500 | 530 | ||
@@ -527,26 +557,32 @@ static int fifo_destroy(pgm_graph* g, | |||
527 | pgm_edge* edge) | 557 | pgm_edge* edge) |
528 | { | 558 | { |
529 | int ret = -1; | 559 | int ret = -1; |
530 | string fifoName(fifo_name(g, producer, consumer, edge)); | ||
531 | 560 | ||
532 | path fifoPath(gGraphPath); | 561 | if(gGraphPath.string().empty()) |
533 | fifoPath /= fifoName; | ||
534 | |||
535 | if(edge->buf_in != 0 || edge->buf_out != 0) | ||
536 | { | 562 | { |
537 | W("Edge has not been closed: (producer:%s, consumer:%s)!\n", | 563 | F("Graph directory not set. Did you call pgm_init_process_local() " |
538 | (edge->buf_out != 0) ? "open" : "closed", | 564 | "insted of pgm_init()?\n"); |
539 | (edge->buf_in != 0) ? "open" : "closed"); | ||
540 | } | 565 | } |
566 | else | ||
567 | { | ||
568 | string fifoName(fifo_name(g, producer, consumer, edge)); | ||
569 | path fifoPath(gGraphPath); | ||
570 | fifoPath /= fifoName; | ||
541 | 571 | ||
542 | if(!exists(fifoPath)) | 572 | if(edge->buf_in != 0 || edge->buf_out != 0) |
543 | goto out; | 573 | { |
544 | if(!remove(fifoPath)) | 574 | W("Edge has not been closed: (producer:%s, consumer:%s)!\n", |
545 | goto out; | 575 | (edge->buf_out != 0) ? "open" : "closed", |
576 | (edge->buf_in != 0) ? "open" : "closed"); | ||
577 | } | ||
546 | 578 | ||
547 | ret = 0; | 579 | if(exists(fifoPath) && |
580 | remove(fifoPath)) | ||
581 | { | ||
582 | ret = 0; | ||
583 | } | ||
584 | } | ||
548 | 585 | ||
549 | out: | ||
550 | return ret; | 586 | return ret; |
551 | } | 587 | } |
552 | 588 | ||
@@ -1569,6 +1605,12 @@ static int prepare_graph_private_mem(void) | |||
1569 | return ret; | 1605 | return ret; |
1570 | } | 1606 | } |
1571 | 1607 | ||
1608 | int pgm_init_process_local(void) | ||
1609 | { | ||
1610 | int ret = prepare_graph_private_mem(); | ||
1611 | return ret; | ||
1612 | } | ||
1613 | |||
1572 | int pgm_init(const char* dir, int create, int use_shared_mem) | 1614 | int pgm_init(const char* dir, int create, int use_shared_mem) |
1573 | { | 1615 | { |
1574 | int ret = -1; | 1616 | int ret = -1; |
diff --git a/tools/ringtest.cpp b/tools/ringtest.cpp index d6850ed..3f2bbb5 100644 --- a/tools/ringtest.cpp +++ b/tools/ringtest.cpp | |||
@@ -130,7 +130,7 @@ int main(void) | |||
130 | ring_attr.nr_threshold = sizeof(uint32_t); | 130 | ring_attr.nr_threshold = sizeof(uint32_t); |
131 | ring_attr.nmemb = 32; | 131 | ring_attr.nmemb = 32; |
132 | 132 | ||
133 | CheckError(pgm_init("/tmp/graphs", 1)); | 133 | CheckError(pgm_init_process_local()); |
134 | CheckError(pgm_init_graph(&g, "demo")); | 134 | CheckError(pgm_init_graph(&g, "demo")); |
135 | 135 | ||
136 | CheckError(pgm_init_node(&n0, g, "n0")); | 136 | CheckError(pgm_init_node(&n0, g, "n0")); |