diff options
author | Glenn Elliott <gelliott@cs.unc.edu> | 2014-10-03 11:15:51 -0400 |
---|---|---|
committer | Glenn Elliott <gelliott@cs.unc.edu> | 2014-10-03 11:15:51 -0400 |
commit | a703f86dc5465b05b8ffb46bda35c2c63c1be2bd (patch) | |
tree | c70b5492db925008ed5f914b30d24b1b5ef364e3 | |
parent | dcca2f41cca5abaca28fa9b322c718035e0d12be (diff) |
save results in configable chunk size transactions
-rwxr-xr-x | rtss14/database.py | 95 |
1 files changed, 69 insertions, 26 deletions
diff --git a/rtss14/database.py b/rtss14/database.py index 8290128..ec540ee 100755 --- a/rtss14/database.py +++ b/rtss14/database.py | |||
@@ -21,6 +21,10 @@ dp_col_names = None # list of keys to extract from sched results to write to db | |||
21 | dp_col_type_strs = None | 21 | dp_col_type_strs = None |
22 | ###################### | 22 | ###################### |
23 | 23 | ||
24 | def chunker(l, n): | ||
25 | for i in xrange(0, len(l), n): | ||
26 | yield l[i:i+n] | ||
27 | |||
24 | def get_job_id(): | 28 | def get_job_id(): |
25 | job_idx_str = os.environ.get('LSB_JOBINDEX') | 29 | job_idx_str = os.environ.get('LSB_JOBINDEX') |
26 | job_pid_str = os.environ.get('LSB_JOBID') | 30 | job_pid_str = os.environ.get('LSB_JOBID') |
@@ -449,23 +453,63 @@ def __store_eff_sched_results(c, dp_id, stats): | |||
449 | 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)', | 453 | 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)', |
450 | (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth)) | 454 | (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth)) |
451 | 455 | ||
452 | def __store_sched_result(c, dp_id, ts_util, stats): | 456 | #def __store_sched_result(c, dp_id, ts_util, stats): |
453 | c.execute('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)', | 457 | # c.execute('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)', |
454 | (dp_id, ts_util, | 458 | # (dp_id, ts_util, |
455 | stats.avg_sched, | 459 | # stats.avg_sched, |
456 | stats.ntested, | 460 | # stats.ntested, |
457 | stats.nsched, | 461 | # stats.nsched, |
458 | stats.avg_tard, | 462 | # stats.avg_tard, |
459 | stats.avg_bandwidth) ) | 463 | # stats.avg_bandwidth) ) |
460 | 464 | # | |
461 | def store_sched_result(conn, c, dp_id, result): | 465 | #def store_sched_result(conn, c, dp_id, result): |
466 | # done = False | ||
467 | # while not done: | ||
468 | # try: | ||
469 | # begin_sync(conn, c) | ||
470 | # __store_sched_result(c, dp_id, result.dp.sys_util, result.sched_stats) | ||
471 | # if result.eff_sched_stats is not None: | ||
472 | # __store_eff_sched_results(c, dp_id, result.eff_sched_stats) | ||
473 | # end_sync(conn) | ||
474 | # done = True | ||
475 | # | ||
476 | # except db.OperationalError, e: | ||
477 | # errcode = e[0] | ||
478 | # if errcode == 1213: | ||
479 | # # deadlock - retry | ||
480 | # print '(store_sched_result) Error is transaction deadlock. Will retry.' | ||
481 | # print '(store_sched_result) Last query: %s' % c._last_executed | ||
482 | # rollback(conn) | ||
483 | # else: | ||
484 | # print e | ||
485 | # print '(store_sched_result) Last query: %s' % c._last_executed | ||
486 | # rollback(conn) | ||
487 | # raise e | ||
488 | # | ||
489 | # except BaseException, e: | ||
490 | # print e | ||
491 | # print '(store_sched_result) Last query: %s' % c._last_executed | ||
492 | # rollback(conn) | ||
493 | # raise e | ||
494 | |||
495 | def store_sched_result_chunk(conn, c, chunk): | ||
462 | done = False | 496 | done = False |
463 | while not done: | 497 | while not done: |
464 | try: | 498 | try: |
465 | begin_sync(conn, c) | 499 | begin_sync(conn, c) |
466 | __store_sched_result(c, dp_id, result.dp.sys_util, result.sched_stats) | 500 | # insert normal sched results |
467 | if result.eff_sched_stats is not None: | 501 | c.executemany('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)', |
468 | __store_eff_sched_results(c, dp_id, result.eff_sched_stats) | 502 | [(dp_id, |
503 | result.dp.sys_util, | ||
504 | result.sched_stats.avg_sched, | ||
505 | result.sched_stats.ntested, | ||
506 | result.sched_stats.nsched, | ||
507 | result.sched_stats.avg_tard, | ||
508 | result.sched_stats.avg_bandwidth) for dp_id, result in chunk]) | ||
509 | # insert scaled sched results | ||
510 | for dp_id, result in chunk: | ||
511 | if result.eff_sched_stats is not None: | ||
512 | __store_eff_sched_results(c, dp_id, result.eff_sched_stats) | ||
469 | end_sync(conn) | 513 | end_sync(conn) |
470 | done = True | 514 | done = True |
471 | 515 | ||
@@ -473,23 +517,22 @@ def store_sched_result(conn, c, dp_id, result): | |||
473 | errcode = e[0] | 517 | errcode = e[0] |
474 | if errcode == 1213: | 518 | if errcode == 1213: |
475 | # deadlock - retry | 519 | # deadlock - retry |
476 | print '(store_sched_result) Error is transaction deadlock. Will retry.' | 520 | print '(store_sched_result_chunk) Error is transaction deadlock. Will retry.' |
477 | print '(store_sched_result) Last query: %s' % c._last_executed | 521 | print '(store_sched_result_chunk) Last query: %s' % c._last_executed |
478 | rollback(conn) | 522 | rollback(conn) |
479 | else: | 523 | else: |
480 | print e | 524 | print e |
481 | print '(store_sched_result) Last query: %s' % c._last_executed | 525 | print '(store_sched_result_chunk) Last query: %s' % c._last_executed |
482 | rollback(conn) | 526 | rollback(conn) |
483 | raise e | 527 | raise e |
484 | 528 | ||
485 | except BaseException, e: | 529 | except BaseException, e: |
486 | print e | 530 | print e |
487 | print '(store_sched_result) Last query: %s' % c._last_executed | 531 | print '(store_sched_result_chunk) Last query: %s' % c._last_executed |
488 | rollback(conn) | 532 | rollback(conn) |
489 | raise e | 533 | raise e |
490 | 534 | ||
491 | def store_sched_results(db_name, data, ndp = 0): | 535 | def store_sched_results(db_name, data, ndp = 0): |
492 | global dp_col_names | ||
493 | fetched = [] | 536 | fetched = [] |
494 | 537 | ||
495 | conn = connect_db(db_name) | 538 | conn = connect_db(db_name) |
@@ -499,15 +542,15 @@ def store_sched_results(db_name, data, ndp = 0): | |||
499 | # must be merged with prior results in the 'scaled_sched_results' table | 542 | # must be merged with prior results in the 'scaled_sched_results' table |
500 | c = conn.cursor(db.cursors.DictCursor) | 543 | c = conn.cursor(db.cursors.DictCursor) |
501 | 544 | ||
502 | col_names = [n for n in dp_col_names if n != 'id'] | ||
503 | |||
504 | try: | 545 | try: |
505 | # get IDs for all design points. | 546 | # X design point results per chunk |
506 | # sort to reduce (eliminate?) chance of deadlock. | 547 | chunk_size = 50 |
507 | id_and_data = sorted([(lookup_dp_id(c, d.dp), d) for d in data], key=lambda x: x[0]) | 548 | chunks = list(chunker(data, chunk_size)) |
508 | 549 | for chk in chunks: | |
509 | for dp_id, result in id_and_data: | 550 | # get IDs for all design points |
510 | store_sched_result(conn, c, dp_id, result) | 551 | # sort to reduce (eliminate?) chance of deadlock |
552 | id_and_data = sorted([(lookup_dp_id(c, d.dp), d) for d in chk], key=lambda x: x[0]) | ||
553 | store_sched_result_chunk(conn, c, id_and_data) | ||
511 | 554 | ||
512 | except db.OperationalError, e: | 555 | except db.OperationalError, e: |
513 | print '(store_sched_results) Caught database exception.' | 556 | print '(store_sched_results) Caught database exception.' |