aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGlenn Elliott <gelliott@cs.unc.edu>2014-06-02 13:23:07 -0400
committerGlenn Elliott <gelliott@cs.unc.edu>2014-06-02 13:23:07 -0400
commitfb476b1a4112608acfe8c6ba822a490d3df295b7 (patch)
treead51760e6c3d29237b92a3d6c167f6f1c8ef364c
parent8df9e5e43e5e9b498922e59d897ea7e8c8137396 (diff)
API Addition: Added pgm_init_process_local()
Added new PGM init API pgm_init_process_local(). This initializes the PGM^RT backend to support only process-local graphs. The PGM^RT runtime does not need to create any directories or files when initialized into process-local mode. However, the user cannot use FIFO-based edges in this mode.
-rw-r--r--include/pgm.h11
-rw-r--r--src/pgm.cpp164
-rw-r--r--tools/ringtest.cpp2
3 files changed, 115 insertions, 62 deletions
diff --git a/include/pgm.h b/include/pgm.h
index d570faf..0db7e01 100644
--- a/include/pgm.h
+++ b/include/pgm.h
@@ -126,6 +126,17 @@ static const edge_attr_t default_edge = {
126}; 126};
127 127
128/* 128/*
129 Initialize the PGM runtime in the application for cases where
130 graph nodes are always (1) process-local (threads within the same
131 process) and (2) FIFO-based edges are never used.
132
133 Use pgm_init() if nodes are cross-process, or FIFO edges
134 are needed. If just FIFO edges are needed and nodes are not
135 cross-process, pass "use_shared_mem=0" to pgm_init().
136*/
137int pgm_init_process_local(void);
138
139/*
129 Initialize the PGM runtime in the application. 140 Initialize the PGM runtime in the application.
130 [in] dir: Directory where PGM data is stored. 141 [in] dir: Directory where PGM data is stored.
131 [in] create: 142 [in] create:
diff --git a/src/pgm.cpp b/src/pgm.cpp
index 8057c36..a9816ea 100644
--- a/src/pgm.cpp
+++ b/src/pgm.cpp
@@ -46,9 +46,7 @@ using namespace boost::filesystem;
46 46
47 47
48// TODO LIST: 48// TODO LIST:
49// * In-memory buffers that can be passed from producer to consumer 49// * Ring buffers in shared memory.
50// without any copies.
51// BONUS: Shared memory support.
52 50
53#ifndef PGM_CONFIG 51#ifndef PGM_CONFIG
54#error "pgm/include/config.h not included!" 52#error "pgm/include/config.h not included!"
@@ -420,19 +418,28 @@ static int fifo_create(pgm_graph* g,
420 pgm_edge* edge) 418 pgm_edge* edge)
421{ 419{
422 int ret = -1; 420 int ret = -1;
423 string fifoName(fifo_name(g, producer, consumer, edge));
424 421
425 path fifoPath(gGraphPath); 422 if(gGraphPath.string().empty())
426 fifoPath /= fifoName; 423 {
424 F("Graph directory not set. Did you call pgm_init_process_local() "
425 "insted of pgm_init()?\n");
426 }
427 else
428 {
429 string fifoName(fifo_name(g, producer, consumer, edge));
427 430
428 // Remove any old FIFO that may exist. 431 path fifoPath(gGraphPath);
429 remove(fifoPath); 432 fifoPath /= fifoName;
430 433
431 // TODO: See what boost can do here. 434 // Remove any old FIFO that may exist.
432 ret = mkfifo(fifoPath.string().c_str(), S_IRUSR | S_IWUSR); 435 remove(fifoPath);
433 if(0 != ret) 436
434 { 437 // TODO: See what boost can do here.
435 F("Failed to make FIFO %s\n", fifoPath.string().c_str()); 438 ret = mkfifo(fifoPath.string().c_str(), S_IRUSR | S_IWUSR);
439 if(0 != ret)
440 {
441 F("Failed to make FIFO %s\n", fifoPath.string().c_str());
442 }
436 } 443 }
437 return ret; 444 return ret;
438} 445}
@@ -441,18 +448,28 @@ static int fifo_open_consumer(pgm_graph* g,
441 pgm_node* producer, pgm_node* consumer, 448 pgm_node* producer, pgm_node* consumer,
442 pgm_edge* edge) 449 pgm_edge* edge)
443{ 450{
444 path fifoPath(gGraphPath); 451 int ret = -1;
445 fifoPath /= fifo_name(g, producer, consumer, edge); 452 if(gGraphPath.string().empty())
446 edge->fd_in = open(fifoPath.string().c_str(), O_RDONLY | O_NONBLOCK);
447 if(edge->fd_in == -1)
448 { 453 {
449 F("Could not open inbound edge %s/%s (FIFO)\n", g->name, edge->name); 454 F("Graph directory not set. Did you call pgm_init_process_local() "
450 return -1; 455 "insted of pgm_init()?\n");
451 } 456 }
457 else
458 {
459 path fifoPath(gGraphPath);
460 fifoPath /= fifo_name(g, producer, consumer, edge);
461 edge->fd_in = open(fifoPath.string().c_str(), O_RDONLY | O_NONBLOCK);
452 462
453 edge->buf_in = __pgm_malloc_edge_buf(g, edge, false); 463 if(edge->fd_in != -1)
454 464 {
455 return 0; 465 edge->buf_in = __pgm_malloc_edge_buf(g, edge, false);
466 }
467 else
468 {
469 F("Could not open inbound edge %s/%s (FIFO)\n", g->name, edge->name);
470 }
471 }
472 return ret;
456} 473}
457 474
458static int fifo_open_producer(pgm_graph* g, 475static int fifo_open_producer(pgm_graph* g,
@@ -460,41 +477,54 @@ static int fifo_open_producer(pgm_graph* g,
460 pgm_edge* edge) 477 pgm_edge* edge)
461{ 478{
462 int ret = -1; 479 int ret = -1;
463 path fifoPath(gGraphPath);
464 fifoPath /= fifo_name(g, producer, consumer, edge);
465 480
466 const int timeout = 60; 481 if(gGraphPath.string().empty())
467 const int start_time = time(0);
468 __sync_synchronize();
469 do
470 { 482 {
471 edge->fd_out = open(fifoPath.string().c_str(), O_WRONLY | O_NONBLOCK); 483 F("Graph directory not set. Did you call pgm_init_process_local() "
472 if(edge->fd_out == -1) 484 "insted of pgm_init()?\n");
485 }
486 else
487 {
488 path fifoPath(gGraphPath);
489 fifoPath /= fifo_name(g, producer, consumer, edge);
490
491 const int timeout = 60;
492 const int start_time = time(0);
493 __sync_synchronize();
494 do
473 { 495 {
474 if(errno != ENXIO) 496 edge->fd_out = open(fifoPath.string().c_str(),
475 { 497 O_WRONLY | O_NONBLOCK);
476 F("Could not open outbound edge %s/%s (FIFO)\n", g->name, edge->name); 498 if(edge->fd_out == -1)
477 break;
478 }
479 else
480 { 499 {
481 if(time(0) - start_time > timeout) 500 if(errno != ENXIO)
482 { 501 {
483 F("Could not open outbound edge %s/%s (FIFO)\n", g->name, edge->name); 502 F("Could not open outbound edge %s/%s (FIFO)\n",
503 g->name, edge->name);
484 break; 504 break;
485 } 505 }
486 usleep(1000); // wait for a millisecond 506 else
507 {
508 if(time(0) - start_time > timeout)
509 {
510 F("Could not open outbound edge %s/%s (FIFO)\n",
511 g->name, edge->name);
512 break;
513 }
514 usleep(1000); // wait for a millisecond
515 }
487 } 516 }
488 } 517 else
489 else 518 {
519 ret = 0;
520 }
521 }while(ret == -1);
522
523 if(!ret)
490 { 524 {
491 ret = 0; 525 edge->buf_out = __pgm_malloc_edge_buf(g, edge, true);
492 } 526 }
493 }while(ret == -1); 527 }
494
495 if(!ret)
496 edge->buf_out = __pgm_malloc_edge_buf(g, edge, true);
497
498 return ret; 528 return ret;
499} 529}
500 530
@@ -527,26 +557,32 @@ static int fifo_destroy(pgm_graph* g,
527 pgm_edge* edge) 557 pgm_edge* edge)
528{ 558{
529 int ret = -1; 559 int ret = -1;
530 string fifoName(fifo_name(g, producer, consumer, edge));
531 560
532 path fifoPath(gGraphPath); 561 if(gGraphPath.string().empty())
533 fifoPath /= fifoName;
534
535 if(edge->buf_in != 0 || edge->buf_out != 0)
536 { 562 {
537 W("Edge has not been closed: (producer:%s, consumer:%s)!\n", 563 F("Graph directory not set. Did you call pgm_init_process_local() "
538 (edge->buf_out != 0) ? "open" : "closed", 564 "insted of pgm_init()?\n");
539 (edge->buf_in != 0) ? "open" : "closed");
540 } 565 }
566 else
567 {
568 string fifoName(fifo_name(g, producer, consumer, edge));
569 path fifoPath(gGraphPath);
570 fifoPath /= fifoName;
541 571
542 if(!exists(fifoPath)) 572 if(edge->buf_in != 0 || edge->buf_out != 0)
543 goto out; 573 {
544 if(!remove(fifoPath)) 574 W("Edge has not been closed: (producer:%s, consumer:%s)!\n",
545 goto out; 575 (edge->buf_out != 0) ? "open" : "closed",
576 (edge->buf_in != 0) ? "open" : "closed");
577 }
546 578
547 ret = 0; 579 if(exists(fifoPath) &&
580 remove(fifoPath))
581 {
582 ret = 0;
583 }
584 }
548 585
549out:
550 return ret; 586 return ret;
551} 587}
552 588
@@ -1569,6 +1605,12 @@ static int prepare_graph_private_mem(void)
1569 return ret; 1605 return ret;
1570} 1606}
1571 1607
1608int pgm_init_process_local(void)
1609{
1610 int ret = prepare_graph_private_mem();
1611 return ret;
1612}
1613
1572int pgm_init(const char* dir, int create, int use_shared_mem) 1614int pgm_init(const char* dir, int create, int use_shared_mem)
1573{ 1615{
1574 int ret = -1; 1616 int ret = -1;
diff --git a/tools/ringtest.cpp b/tools/ringtest.cpp
index d6850ed..3f2bbb5 100644
--- a/tools/ringtest.cpp
+++ b/tools/ringtest.cpp
@@ -130,7 +130,7 @@ int main(void)
130 ring_attr.nr_threshold = sizeof(uint32_t); 130 ring_attr.nr_threshold = sizeof(uint32_t);
131 ring_attr.nmemb = 32; 131 ring_attr.nmemb = 32;
132 132
133 CheckError(pgm_init("/tmp/graphs", 1)); 133 CheckError(pgm_init_process_local());
134 CheckError(pgm_init_graph(&g, "demo")); 134 CheckError(pgm_init_graph(&g, "demo"));
135 135
136 CheckError(pgm_init_node(&n0, g, "n0")); 136 CheckError(pgm_init_node(&n0, g, "n0"));