aboutsummaryrefslogtreecommitdiffstats
path: root/rtss14/database.py
diff options
context:
space:
mode:
authorGlenn Elliott <gelliott@cs.unc.edu>2014-10-03 11:15:51 -0400
committerGlenn Elliott <gelliott@cs.unc.edu>2014-10-03 11:15:51 -0400
commita703f86dc5465b05b8ffb46bda35c2c63c1be2bd (patch)
treec70b5492db925008ed5f914b30d24b1b5ef364e3 /rtss14/database.py
parentdcca2f41cca5abaca28fa9b322c718035e0d12be (diff)
save results in configable chunk size transactions
Diffstat (limited to 'rtss14/database.py')
-rwxr-xr-xrtss14/database.py95
1 files changed, 69 insertions, 26 deletions
diff --git a/rtss14/database.py b/rtss14/database.py
index 8290128..ec540ee 100755
--- a/rtss14/database.py
+++ b/rtss14/database.py
@@ -21,6 +21,10 @@ dp_col_names = None # list of keys to extract from sched results to write to db
21dp_col_type_strs = None 21dp_col_type_strs = None
22###################### 22######################
23 23
24def chunker(l, n):
25 for i in xrange(0, len(l), n):
26 yield l[i:i+n]
27
24def get_job_id(): 28def get_job_id():
25 job_idx_str = os.environ.get('LSB_JOBINDEX') 29 job_idx_str = os.environ.get('LSB_JOBINDEX')
26 job_pid_str = os.environ.get('LSB_JOBID') 30 job_pid_str = os.environ.get('LSB_JOBID')
@@ -449,23 +453,63 @@ def __store_eff_sched_results(c, dp_id, stats):
449 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)', 453 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)',
450 (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth)) 454 (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth))
451 455
452def __store_sched_result(c, dp_id, ts_util, stats): 456#def __store_sched_result(c, dp_id, ts_util, stats):
453 c.execute('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)', 457# c.execute('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)',
454 (dp_id, ts_util, 458# (dp_id, ts_util,
455 stats.avg_sched, 459# stats.avg_sched,
456 stats.ntested, 460# stats.ntested,
457 stats.nsched, 461# stats.nsched,
458 stats.avg_tard, 462# stats.avg_tard,
459 stats.avg_bandwidth) ) 463# stats.avg_bandwidth) )
460 464#
461def store_sched_result(conn, c, dp_id, result): 465#def store_sched_result(conn, c, dp_id, result):
466# done = False
467# while not done:
468# try:
469# begin_sync(conn, c)
470# __store_sched_result(c, dp_id, result.dp.sys_util, result.sched_stats)
471# if result.eff_sched_stats is not None:
472# __store_eff_sched_results(c, dp_id, result.eff_sched_stats)
473# end_sync(conn)
474# done = True
475#
476# except db.OperationalError, e:
477# errcode = e[0]
478# if errcode == 1213:
479# # deadlock - retry
480# print '(store_sched_result) Error is transaction deadlock. Will retry.'
481# print '(store_sched_result) Last query: %s' % c._last_executed
482# rollback(conn)
483# else:
484# print e
485# print '(store_sched_result) Last query: %s' % c._last_executed
486# rollback(conn)
487# raise e
488#
489# except BaseException, e:
490# print e
491# print '(store_sched_result) Last query: %s' % c._last_executed
492# rollback(conn)
493# raise e
494
495def store_sched_result_chunk(conn, c, chunk):
462 done = False 496 done = False
463 while not done: 497 while not done:
464 try: 498 try:
465 begin_sync(conn, c) 499 begin_sync(conn, c)
466 __store_sched_result(c, dp_id, result.dp.sys_util, result.sched_stats) 500 # insert normal sched results
467 if result.eff_sched_stats is not None: 501 c.executemany('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)',
468 __store_eff_sched_results(c, dp_id, result.eff_sched_stats) 502 [(dp_id,
503 result.dp.sys_util,
504 result.sched_stats.avg_sched,
505 result.sched_stats.ntested,
506 result.sched_stats.nsched,
507 result.sched_stats.avg_tard,
508 result.sched_stats.avg_bandwidth) for dp_id, result in chunk])
509 # insert scaled sched results
510 for dp_id, result in chunk:
511 if result.eff_sched_stats is not None:
512 __store_eff_sched_results(c, dp_id, result.eff_sched_stats)
469 end_sync(conn) 513 end_sync(conn)
470 done = True 514 done = True
471 515
@@ -473,23 +517,22 @@ def store_sched_result(conn, c, dp_id, result):
473 errcode = e[0] 517 errcode = e[0]
474 if errcode == 1213: 518 if errcode == 1213:
475 # deadlock - retry 519 # deadlock - retry
476 print '(store_sched_result) Error is transaction deadlock. Will retry.' 520 print '(store_sched_result_chunk) Error is transaction deadlock. Will retry.'
477 print '(store_sched_result) Last query: %s' % c._last_executed 521 print '(store_sched_result_chunk) Last query: %s' % c._last_executed
478 rollback(conn) 522 rollback(conn)
479 else: 523 else:
480 print e 524 print e
481 print '(store_sched_result) Last query: %s' % c._last_executed 525 print '(store_sched_result_chunk) Last query: %s' % c._last_executed
482 rollback(conn) 526 rollback(conn)
483 raise e 527 raise e
484 528
485 except BaseException, e: 529 except BaseException, e:
486 print e 530 print e
487 print '(store_sched_result) Last query: %s' % c._last_executed 531 print '(store_sched_result_chunk) Last query: %s' % c._last_executed
488 rollback(conn) 532 rollback(conn)
489 raise e 533 raise e
490 534
491def store_sched_results(db_name, data, ndp = 0): 535def store_sched_results(db_name, data, ndp = 0):
492 global dp_col_names
493 fetched = [] 536 fetched = []
494 537
495 conn = connect_db(db_name) 538 conn = connect_db(db_name)
@@ -499,15 +542,15 @@ def store_sched_results(db_name, data, ndp = 0):
499 # must be merged with prior results in the 'scaled_sched_results' table 542 # must be merged with prior results in the 'scaled_sched_results' table
500 c = conn.cursor(db.cursors.DictCursor) 543 c = conn.cursor(db.cursors.DictCursor)
501 544
502 col_names = [n for n in dp_col_names if n != 'id']
503
504 try: 545 try:
505 # get IDs for all design points. 546 # X design point results per chunk
506 # sort to reduce (eliminate?) chance of deadlock. 547 chunk_size = 50
507 id_and_data = sorted([(lookup_dp_id(c, d.dp), d) for d in data], key=lambda x: x[0]) 548 chunks = list(chunker(data, chunk_size))
508 549 for chk in chunks:
509 for dp_id, result in id_and_data: 550 # get IDs for all design points
510 store_sched_result(conn, c, dp_id, result) 551 # sort to reduce (eliminate?) chance of deadlock
552 id_and_data = sorted([(lookup_dp_id(c, d.dp), d) for d in chk], key=lambda x: x[0])
553 store_sched_result_chunk(conn, c, id_and_data)
511 554
512 except db.OperationalError, e: 555 except db.OperationalError, e:
513 print '(store_sched_results) Caught database exception.' 556 print '(store_sched_results) Caught database exception.'