#/usr/bin/env python import os import time import random import copy import itertools import socket import MySQLdb as db from schedcat.util.storage import storage from generator import DesignPointGenerator from schedcat.util.storage import storage #### Some globals #### timeout = 15*60 # 15 minute timeout on database locks max_fail = 30 # maximum number of times to reset a bad db connection distr_mapper = None # maps database distribution ids to strings dp_col_names = None # list of keys to extract from sched results to write to db dp_col_type_strs = None ###################### def chunker(l, n): for i in xrange(0, len(l), n): yield l[i:i+n] def get_job_id(): job_idx_str = os.environ.get('LSB_JOBINDEX') job_pid_str = os.environ.get('LSB_JOBID') job_idx = int(job_idx_str) if job_idx_str is not None else 0 job_pid = int(job_pid_str) if job_pid_str is not None else os.getpid() # job_pid changes the least, so make that the MSBs job_id = (job_pid << 32) | job_idx # add two to avoid '0' and '1' job_id += 2 return job_id def backoff(t): time_to_sleep = random.random() * t time.sleep(time_to_sleep) def init_distr_mapper(conn): global distr_mapper if distr_mapper is not None and len(distr_mapper.keys()) > 0: return distr_mapper = {} c = conn.cursor(db.cursors.DictCursor) c.execute('SELECT id,name FROM distrs') data = c.fetchall() for row in data: distr_mapper[int(row['id'])] = str(row['name']) distr_mapper[str(row['name'])] = int(row['id']) c.close() def init_dp_col_names(conn): global dp_col_names global dp_col_type_strs if dp_col_names is not None and len(dp_col_names) > 0: return dp_col_names = [] dp_col_type_strs = {} c = conn.cursor() c.execute('SHOW COLUMNS FROM dp_pending') cols = c.fetchall() for field in cols: name = field[0].encode('ascii', 'ignore') if(name == 'id' or name == 'taken'): continue dp_col_names.append(name) form = '%s' dp_col_type_strs[name] = form c.close() def db_type(var): if type(var) is int or type(var) is bool: type_str = "INTEGER" elif type(var) is str: type_str = "TEXT" elif type(var) is float: type_str = "REAL" else: type_str = "JSON" return type_str def to_db_val(var): if type(var) is int: return int(var) elif type(var) is bool: return 1 if var == True else 0 elif type(var) is str: return var elif type(var) is float: return var else: return json.dumps(var) def dp_to_db(dp): global distr_mapper data = {} # TODO: do these conversions using a loop and field descr. data['ts_util'] = dp.sys_util data['ncpu'] = dp.ncpus data['cpu_cluster_size'] = dp.ncpus / dp.nclusters data['ngpu'] = dp.ngpus data['gpu_cluster_size'] = dp.ngpus / dp.ngclusters data['is_release_master'] = dp.release_master data['is_worst_case'] = 1 if dp.ovh_type == 'max' else 0 data['is_polluters'] = dp.polluters data['wss_size'] = dp.wss data['util_dist'] = distr_mapper[dp.task_util] data['period_dist'] = distr_mapper[dp.period] data['data_dist'] = distr_mapper[dp.data] data['state_dist'] = distr_mapper[dp.state] data['kernel_dist'] = distr_mapper[dp.kern] data['cpu_dist'] = distr_mapper[dp.cpu_csx] data['gpu_population'] = dp.gpu_population data['rho'] = dp.rho data['is_dgl'] = dp.dgl data['is_p2p'] = dp.p2p data['ncopy_engines'] = dp.ncopy_engines data['chunk_size'] = dp.chunk_size # make sure data is db-safe. keys = [k for k in data.iterkeys()] for k in keys: data[k] = to_db_val(data[k]) return data def db_to_dp(data): global distr_mapper # TODO: do these conversions using a loop and field descr. exp = storage() exp.host = ['bonham'] exp.ncpus = [int(data['ncpu'])] exp.nclusters = [exp.ncpus[0]/int(data['cpu_cluster_size'])] exp.ngpus = [int(data['ngpu'])] exp.ngclusters = [exp.ngpus[0]/int(data['gpu_cluster_size'])] exp.release_master = [int(data['is_release_master'])] exp.polluters = [int(data['is_polluters']) != 0] exp.ovh_type = ['max' if int(data['is_worst_case']) == 1 else 'mean'] exp.rho = [int(data['rho'])] exp.dgl = [int(data['is_dgl']) != 0] exp.p2p = [int(data['is_p2p']) != 0] exp.ncopy_engines = [int(data['ncopy_engines'])] exp.chunk_size = [int(data['chunk_size'])] exp.sys_util = [float(data['ts_util'])] exp.task_util = [distr_mapper[int(data['util_dist'])]] exp.period = [distr_mapper[int(data['period_dist'])]] exp.wss = [int(data['wss_size'])] exp.gpu_population = [float(data['gpu_population'])] exp.kern = [distr_mapper[int(data['kernel_dist'])]] exp.cpu_csx = [distr_mapper[int(data['cpu_dist'])]] exp.data = [distr_mapper[int(data['data_dist'])]] exp.state = [distr_mapper[int(data['state_dist'])]] dps = DesignPointGenerator(exp) dp = dps.next() # eliminate unicode keys = dp.iterkeys() for k in keys: if type(dp[k]) is unicode: dp[k] = dp[k].encode('ascii', 'ignore') return dp def dp_to_db_vals(dp): vals = {} for key,value in dp.iteritems(): vals[key] = to_db_val(value) return vals def begin_sync(conn, c): global timeout start = time.time() while True: try: conn.autocommit(False) c.execute('START TRANSACTION') break except db.OperationalError, e: print e if c is not None: print c._last_executed elapsed = time.time() - start if elapsed > timeout: raise e else: backoff(20) pass return c def end_sync(conn): success = conn.commit() conn.autocommit(True) return success def rollback(conn): conn.rollback() conn.autocommit(True) def connect_db(db_name): global distr_mapper global dp_col_names if socket.gethostname() == 'bonham': conn = db.connect('localhost', 'gelliott', 'G1ennDB', db_name) else: conn = db.connect('bonham.cs.unc.edu', 'gelliott', 'G1ennDB', db_name, port=23306) if distr_mapper is None: init_distr_mapper(conn) if dp_col_names is None: init_dp_col_names(conn) return conn def clear_tables(db_name): # tables to clear tables = ['sched_results', 'scaled_sched_results', 'dp_ptested', 'dp_pending'] conn = connect_db(db_name) with conn: c = conn.cursor() ################ begin_sync(conn, c) for t in tables: c.execute('DELETE FROM %s' % t) end_sync(conn) ################ c.close() def __repair_design_points(conn): global dp_col_names c = conn.cursor() c.execute('UPDATE dp_pending AS P ' 'JOIN dp_ptested AS T ON %s ' 'LEFT JOIN sched_results as R ON P.ts_util=R.ts_util AND T.id=R.dp ' 'SET P.taken=0 ' 'WHERE R.dp IS NULL AND R.ts_util IS NULL' % ' AND '.join(map(lambda x: 'P.%s=T.%s' % (x,x), [d for d in dp_col_names if d != 'ts_util'])) ) nrepaired = c.rowcount c.close() return nrepaired def repair_design_points(db_name): conn = connect_db(db_name) nrepaired = 0 with conn: nrepaired = __repair_design_points(conn) return nrepaired repaired = False def store_design_points(db_name, dps, clean): global dp_col_names global dp_col_type_strs global repaired conn = connect_db(db_name) dps = [dp_to_db(dp) for dp in dps] nstored = 0 with conn: if not clean and not repaired: nrepaired = __repair_design_points(conn) repaired = True if dps is None or len(dps) == 0: return 0 c = conn.cursor() if clean: c.executemany('INSERT INTO dp_pending VALUES(NULL,0,%s)' % ','.join(['%s']*len(dp_col_names)), [tuple(map(lambda x: d[x], dp_col_names)) for d in dps]) nstored = len(dps) else: col_names = [n for n in dp_col_names if n != 'ts_util'] # TODO: Insert directly into dp_pending instead of tmp_dp_pending # store entire dp set in a temporary staging table c.execute('CREATE TEMPORARY TABLE tmp_dp_pending LIKE dp_pending') # filter out dps that are already pending and not taken. # entries taken in dp_pending may still be mirrored here. c.executemany('INSERT INTO tmp_dp_pending(id,taken,%s) ' 'SELECT NULL,0,%s ' 'FROM dual ' 'WHERE NOT EXISTS ( ' 'SELECT 1 ' 'FROM dp_pending ' 'WHERE %s )' % ( ','.join(dp_col_names), ','.join(['%s']*len(dp_col_names)), ' AND '.join(map(lambda x: '%s=%%s' % x, dp_col_names)) ), [tuple(map(lambda x: d[x], dp_col_names)*2) for d in dps]) c.execute('INSERT INTO dp_pending(id,taken,%s) SELECT NULL,0,%s FROM tmp_dp_pending' % (','.join(dp_col_names), ','.join(dp_col_names))) nstored = c.rowcount c.execute('DROP TEMPORARY TABLE tmp_dp_pending') c.close() return nstored def num_pending_design_points(db_name): conn = connect_db(db_name) with conn: c = conn.cursor() c.execute('SELECT COUNT(*) FROM dp_pending WHERE taken=0') npending = c.fetchone()[0] c.close() return npending def __get_design_points(conn, ndp): global dp_col_names fetched = [] c = conn.cursor(db.cursors.DictCursor) #################### done = False while not done: try: begin_sync(conn, c) c.execute('SELECT id,%s FROM dp_pending WHERE taken=0 LIMIT %%s FOR UPDATE' % ','.join(dp_col_names), (ndp,)) rows = c.fetchall() nfetched = c.rowcount if nfetched > 0: dp_ids = [r['id'] for r in rows] c.execute('UPDATE dp_pending SET taken=1 WHERE id IN (%s)' % ','.join(['%s']*len(dp_ids)), tuple(dp_ids)) end_sync(conn) fetched = [db_to_dp(r) for r in rows] done = True except db.OperationalError, e: errcode = e[0] if errcode == 1213: # deadlock - retry print '(__get_design_points) Error is transaction deadlock. Will retry.' print '(__get_design_points) Last query: %s' % c._last_executed rollback(conn) else: print e print '(__get_design_points) Last query: %s' % c._last_executed rollback(conn) c.close() # rethrow raise e except BaseException, e: print e print '(__get_design_points) Last query: %s' % c._last_executed rollback(conn) c.close() raise e c.close() return fetched def get_design_points(db_name, ndp = 1): fetched = [] conn = connect_db(db_name) with conn: fetched = __get_design_points(conn, ndp) return fetched def lookup_dp_id(c, dp): global dp_col_names global dp_col_type_strs seeking = dp_to_db(dp) seeking.pop('ts_util', None) col_names = [n for n in dp_col_names if n != 'ts_util'] dp_id = None while dp_id is None: try: # atomically add new dp to dp_ptested. fails if dp already in table c.execute('INSERT INTO dp_ptested(id,%s) ' 'SELECT NULL,%s ' 'FROM dual ' 'WHERE NOT EXISTS ( ' 'SELECT 1 ' 'FROM dp_ptested ' 'WHERE %s )' % ( ','.join(col_names), ','.join(['%s']*(len(col_names))), ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())) ), tuple([seeking[n] for n in col_names] + seeking.values()) ) if c.rowcount != 0: # new id c.execute('SELECT LAST_INSERT_ID()') row = c.fetchone() assert row is not None dp_id = row.values()[0] else: # id already exists. look it up. c.execute('SELECT id ' 'FROM dp_ptested ' 'WHERE %s' % (' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())) ), tuple(seeking.values()) ) row = c.fetchone() assert row is not None dp_id = row['id'] except db.OperationalError, e: errcode = e[0] if errcode == 1213 or errcode == 1205: # deadlock - retry print '(lookup_dp_id) Error is transaction deadlock or lock timeout. Will retry.' print '(lookup_dp_id) Last query: %s' % c._last_executed else: print e print '(lookup_dp_id) Last query: %s' % c._last_executed raise e return dp_id def __store_eff_sched_results(c, dp_id, stats): for factor,eff_curve in stats.iteritems(): for eff_ts_util,sched in eff_curve.iteritems(): # do prior results exist? c.execute('SELECT ntested, nsched, avg_sched, avg_tard, avg_bandwidth ' 'FROM scaled_sched_results ' 'WHERE dp=%s AND eff_ts_util=%s AND scale_factor=%s FOR UPDATE', (dp_id, eff_ts_util, factor)) row = c.fetchone() if row: rntested = row['ntested'] rnsched = row['nsched'] # recompute the averages sched.avg_sched = (sched.avg_sched*sched.ntested + row['avg_sched']*rntested)/(sched.ntested + rntested) if sched.nsched + rnsched > 0: sched.avg_tard = (sched.avg_tard*sched.nsched + row['avg_tard']*rnsched)/(sched.nsched + rnsched) sched.avg_bandwidth = (sched.avg_bandwidth*sched.nsched + row['avg_bandwidth']*rnsched)/(sched.nsched + rnsched) else: sched.avg_tard = 0.0 sched.avg_bandwidth = 0.0 sched.ntested += rntested sched.nsched += rnsched c.execute('UPDATE scaled_sched_results ' 'SET ntested=%s, nsched=%s, avg_sched=%s, avg_tard=%s, avg_bandwidth=%s ' 'WHERE dp=%s AND eff_ts_util=%s AND scale_factor=%s', (sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth, dp_id, eff_ts_util, factor)) else: c.execute('INSERT INTO scaled_sched_results ' 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)', (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth)) #def __store_sched_result(c, dp_id, ts_util, stats): # c.execute('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)', # (dp_id, ts_util, # stats.avg_sched, # stats.ntested, # stats.nsched, # stats.avg_tard, # stats.avg_bandwidth) ) # #def store_sched_result(conn, c, dp_id, result): # done = False # while not done: # try: # begin_sync(conn, c) # __store_sched_result(c, dp_id, result.dp.sys_util, result.sched_stats) # if result.eff_sched_stats is not None: # __store_eff_sched_results(c, dp_id, result.eff_sched_stats) # end_sync(conn) # done = True # # except db.OperationalError, e: # errcode = e[0] # if errcode == 1213: # # deadlock - retry # print '(store_sched_result) Error is transaction deadlock. Will retry.' # print '(store_sched_result) Last query: %s' % c._last_executed # rollback(conn) # else: # print e # print '(store_sched_result) Last query: %s' % c._last_executed # rollback(conn) # raise e # # except BaseException, e: # print e # print '(store_sched_result) Last query: %s' % c._last_executed # rollback(conn) # raise e def store_sched_result_chunk(conn, c, chunk): done = False while not done: try: begin_sync(conn, c) # insert normal sched results c.executemany('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)', [(dp_id, result.dp.sys_util, result.sched_stats.avg_sched, result.sched_stats.ntested, result.sched_stats.nsched, result.sched_stats.avg_tard, result.sched_stats.avg_bandwidth) for dp_id, result in chunk]) # insert scaled sched results for dp_id, result in chunk: if result.eff_sched_stats is not None: __store_eff_sched_results(c, dp_id, result.eff_sched_stats) end_sync(conn) done = True except db.OperationalError, e: errcode = e[0] if errcode == 1213 or errcode == 1205: # deadlock - retry print '(store_sched_result_chunk) Error is transaction deadlock or lock timeout. Will retry.' print '(store_sched_result_chunk) Last query: %s' % c._last_executed rollback(conn) else: print e print '(store_sched_result_chunk) Last query: %s' % c._last_executed rollback(conn) raise e except BaseException, e: print e print '(store_sched_result_chunk) Last query: %s' % c._last_executed rollback(conn) raise e def store_sched_results(db_name, data, ndp = 0): fetched = [] conn = connect_db(db_name) with conn: # data is a list of storage(), with fields 'dp, 'sched_stats', and 'eff_sched_stats'. # 'eff_sched_stats' is a dictionary keyed on scale-factor. stats in 'eff_sched_stats' # must be merged with prior results in the 'scaled_sched_results' table c = conn.cursor(db.cursors.DictCursor) try: # X design point results per chunk chunk_size = 50 chunks = list(chunker(data, chunk_size)) for chk in chunks: # get IDs for all design points # sort to reduce (eliminate?) chance of deadlock id_and_data = sorted([(lookup_dp_id(c, d.dp), d) for d in chk], key=lambda x: x[0]) store_sched_result_chunk(conn, c, id_and_data) except db.OperationalError, e: print '(store_sched_results) Caught database exception.' c.close() raise e except BaseException, e: print '(store_sched_results) Caught unknown exception.' c.close() raise e c.close() if ndp > 0: fetched = __get_design_points(conn, ndp) return fetched