diff options
| author | Glenn Elliott <gelliott@cs.unc.edu> | 2014-10-03 11:15:51 -0400 |
|---|---|---|
| committer | Glenn Elliott <gelliott@cs.unc.edu> | 2014-10-03 11:15:51 -0400 |
| commit | a703f86dc5465b05b8ffb46bda35c2c63c1be2bd (patch) | |
| tree | c70b5492db925008ed5f914b30d24b1b5ef364e3 /rtss14/database.py | |
| parent | dcca2f41cca5abaca28fa9b322c718035e0d12be (diff) | |
save results in configable chunk size transactions
Diffstat (limited to 'rtss14/database.py')
| -rwxr-xr-x | rtss14/database.py | 95 |
1 files changed, 69 insertions, 26 deletions
diff --git a/rtss14/database.py b/rtss14/database.py index 8290128..ec540ee 100755 --- a/rtss14/database.py +++ b/rtss14/database.py | |||
| @@ -21,6 +21,10 @@ dp_col_names = None # list of keys to extract from sched results to write to db | |||
| 21 | dp_col_type_strs = None | 21 | dp_col_type_strs = None |
| 22 | ###################### | 22 | ###################### |
| 23 | 23 | ||
| 24 | def chunker(l, n): | ||
| 25 | for i in xrange(0, len(l), n): | ||
| 26 | yield l[i:i+n] | ||
| 27 | |||
| 24 | def get_job_id(): | 28 | def get_job_id(): |
| 25 | job_idx_str = os.environ.get('LSB_JOBINDEX') | 29 | job_idx_str = os.environ.get('LSB_JOBINDEX') |
| 26 | job_pid_str = os.environ.get('LSB_JOBID') | 30 | job_pid_str = os.environ.get('LSB_JOBID') |
| @@ -449,23 +453,63 @@ def __store_eff_sched_results(c, dp_id, stats): | |||
| 449 | 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)', | 453 | 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)', |
| 450 | (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth)) | 454 | (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth)) |
| 451 | 455 | ||
| 452 | def __store_sched_result(c, dp_id, ts_util, stats): | 456 | #def __store_sched_result(c, dp_id, ts_util, stats): |
| 453 | c.execute('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)', | 457 | # c.execute('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)', |
| 454 | (dp_id, ts_util, | 458 | # (dp_id, ts_util, |
| 455 | stats.avg_sched, | 459 | # stats.avg_sched, |
| 456 | stats.ntested, | 460 | # stats.ntested, |
| 457 | stats.nsched, | 461 | # stats.nsched, |
| 458 | stats.avg_tard, | 462 | # stats.avg_tard, |
| 459 | stats.avg_bandwidth) ) | 463 | # stats.avg_bandwidth) ) |
| 460 | 464 | # | |
| 461 | def store_sched_result(conn, c, dp_id, result): | 465 | #def store_sched_result(conn, c, dp_id, result): |
| 466 | # done = False | ||
| 467 | # while not done: | ||
| 468 | # try: | ||
| 469 | # begin_sync(conn, c) | ||
| 470 | # __store_sched_result(c, dp_id, result.dp.sys_util, result.sched_stats) | ||
| 471 | # if result.eff_sched_stats is not None: | ||
| 472 | # __store_eff_sched_results(c, dp_id, result.eff_sched_stats) | ||
| 473 | # end_sync(conn) | ||
| 474 | # done = True | ||
| 475 | # | ||
| 476 | # except db.OperationalError, e: | ||
| 477 | # errcode = e[0] | ||
| 478 | # if errcode == 1213: | ||
| 479 | # # deadlock - retry | ||
| 480 | # print '(store_sched_result) Error is transaction deadlock. Will retry.' | ||
| 481 | # print '(store_sched_result) Last query: %s' % c._last_executed | ||
| 482 | # rollback(conn) | ||
| 483 | # else: | ||
| 484 | # print e | ||
| 485 | # print '(store_sched_result) Last query: %s' % c._last_executed | ||
| 486 | # rollback(conn) | ||
| 487 | # raise e | ||
| 488 | # | ||
| 489 | # except BaseException, e: | ||
| 490 | # print e | ||
| 491 | # print '(store_sched_result) Last query: %s' % c._last_executed | ||
| 492 | # rollback(conn) | ||
| 493 | # raise e | ||
| 494 | |||
| 495 | def store_sched_result_chunk(conn, c, chunk): | ||
| 462 | done = False | 496 | done = False |
| 463 | while not done: | 497 | while not done: |
| 464 | try: | 498 | try: |
| 465 | begin_sync(conn, c) | 499 | begin_sync(conn, c) |
| 466 | __store_sched_result(c, dp_id, result.dp.sys_util, result.sched_stats) | 500 | # insert normal sched results |
| 467 | if result.eff_sched_stats is not None: | 501 | c.executemany('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)', |
| 468 | __store_eff_sched_results(c, dp_id, result.eff_sched_stats) | 502 | [(dp_id, |
| 503 | result.dp.sys_util, | ||
| 504 | result.sched_stats.avg_sched, | ||
| 505 | result.sched_stats.ntested, | ||
| 506 | result.sched_stats.nsched, | ||
| 507 | result.sched_stats.avg_tard, | ||
| 508 | result.sched_stats.avg_bandwidth) for dp_id, result in chunk]) | ||
| 509 | # insert scaled sched results | ||
| 510 | for dp_id, result in chunk: | ||
| 511 | if result.eff_sched_stats is not None: | ||
| 512 | __store_eff_sched_results(c, dp_id, result.eff_sched_stats) | ||
| 469 | end_sync(conn) | 513 | end_sync(conn) |
| 470 | done = True | 514 | done = True |
| 471 | 515 | ||
| @@ -473,23 +517,22 @@ def store_sched_result(conn, c, dp_id, result): | |||
| 473 | errcode = e[0] | 517 | errcode = e[0] |
| 474 | if errcode == 1213: | 518 | if errcode == 1213: |
| 475 | # deadlock - retry | 519 | # deadlock - retry |
| 476 | print '(store_sched_result) Error is transaction deadlock. Will retry.' | 520 | print '(store_sched_result_chunk) Error is transaction deadlock. Will retry.' |
| 477 | print '(store_sched_result) Last query: %s' % c._last_executed | 521 | print '(store_sched_result_chunk) Last query: %s' % c._last_executed |
| 478 | rollback(conn) | 522 | rollback(conn) |
| 479 | else: | 523 | else: |
| 480 | print e | 524 | print e |
| 481 | print '(store_sched_result) Last query: %s' % c._last_executed | 525 | print '(store_sched_result_chunk) Last query: %s' % c._last_executed |
| 482 | rollback(conn) | 526 | rollback(conn) |
| 483 | raise e | 527 | raise e |
| 484 | 528 | ||
| 485 | except BaseException, e: | 529 | except BaseException, e: |
| 486 | print e | 530 | print e |
| 487 | print '(store_sched_result) Last query: %s' % c._last_executed | 531 | print '(store_sched_result_chunk) Last query: %s' % c._last_executed |
| 488 | rollback(conn) | 532 | rollback(conn) |
| 489 | raise e | 533 | raise e |
| 490 | 534 | ||
| 491 | def store_sched_results(db_name, data, ndp = 0): | 535 | def store_sched_results(db_name, data, ndp = 0): |
| 492 | global dp_col_names | ||
| 493 | fetched = [] | 536 | fetched = [] |
| 494 | 537 | ||
| 495 | conn = connect_db(db_name) | 538 | conn = connect_db(db_name) |
| @@ -499,15 +542,15 @@ def store_sched_results(db_name, data, ndp = 0): | |||
| 499 | # must be merged with prior results in the 'scaled_sched_results' table | 542 | # must be merged with prior results in the 'scaled_sched_results' table |
| 500 | c = conn.cursor(db.cursors.DictCursor) | 543 | c = conn.cursor(db.cursors.DictCursor) |
| 501 | 544 | ||
| 502 | col_names = [n for n in dp_col_names if n != 'id'] | ||
| 503 | |||
| 504 | try: | 545 | try: |
| 505 | # get IDs for all design points. | 546 | # X design point results per chunk |
| 506 | # sort to reduce (eliminate?) chance of deadlock. | 547 | chunk_size = 50 |
| 507 | id_and_data = sorted([(lookup_dp_id(c, d.dp), d) for d in data], key=lambda x: x[0]) | 548 | chunks = list(chunker(data, chunk_size)) |
| 508 | 549 | for chk in chunks: | |
| 509 | for dp_id, result in id_and_data: | 550 | # get IDs for all design points |
| 510 | store_sched_result(conn, c, dp_id, result) | 551 | # sort to reduce (eliminate?) chance of deadlock |
| 552 | id_and_data = sorted([(lookup_dp_id(c, d.dp), d) for d in chk], key=lambda x: x[0]) | ||
| 553 | store_sched_result_chunk(conn, c, id_and_data) | ||
| 511 | 554 | ||
| 512 | except db.OperationalError, e: | 555 | except db.OperationalError, e: |
| 513 | print '(store_sched_results) Caught database exception.' | 556 | print '(store_sched_results) Caught database exception.' |
