aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGlenn Elliott <gelliott@cs.unc.edu>2014-09-22 22:03:52 -0400
committerGlenn Elliott <gelliott@cs.unc.edu>2014-09-22 22:03:52 -0400
commit0808eb2a0e5b72903a8484728f94d49d8dbbad3d (patch)
tree912af115c9cfa55424ffe84d4f978134af86fbf0
parent5c2b513ea0f52986719c4cb185a3a3505e8376d4 (diff)
Optimize dp fetch from database
-rw-r--r--rtss14/createtables.sql10
-rwxr-xr-xrtss14/database.py243
-rwxr-xr-xrtss14/rtss14.py26
3 files changed, 115 insertions, 164 deletions
diff --git a/rtss14/createtables.sql b/rtss14/createtables.sql
index a072aa4..a76a579 100644
--- a/rtss14/createtables.sql
+++ b/rtss14/createtables.sql
@@ -41,6 +41,9 @@ INSERT INTO distrs(name) VALUES('c-const-heavy');
41CREATE TABLE dp_pending( 41CREATE TABLE dp_pending(
42 id INTEGER NOT NULL AUTO_INCREMENT, 42 id INTEGER NOT NULL AUTO_INCREMENT,
43 43
44 -- flag design point as taken
45 taken INTEGER,
46
44 -- task set util cap 47 -- task set util cap
45 ts_util REAL, 48 ts_util REAL,
46 49
@@ -76,7 +79,9 @@ CREATE TABLE dp_pending(
76 ncopy_engines INTEGER, 79 ncopy_engines INTEGER,
77 chunk_size INTEGER, 80 chunk_size INTEGER,
78 81
79 PRIMARY KEY (id) 82 PRIMARY KEY (id),
83 INDEX (id, taken),
84 INDEX (ts_util)
80 -- every permutation marks a unique configuration 85 -- every permutation marks a unique configuration
81 -- PRIMARY KEY(ts_util, 86 -- PRIMARY KEY(ts_util,
82 -- ncpu, cpu_cluster_size, ngpu, gpu_cluster_size, is_release_master, 87 -- ncpu, cpu_cluster_size, ngpu, gpu_cluster_size, is_release_master,
@@ -146,7 +151,8 @@ CREATE TABLE sched_results(
146 avg_bandwidth REAL, 151 avg_bandwidth REAL,
147 152
148 FOREIGN KEY(dp) REFERENCES dp_ptested(id), 153 FOREIGN KEY(dp) REFERENCES dp_ptested(id),
149 PRIMARY KEY(dp, ts_util) 154 PRIMARY KEY(dp, ts_util),
155 INDEX (ts_util)
150) ENGINE=InnoDB; 156) ENGINE=InnoDB;
151 157
152CREATE TABLE scaled_sched_results( 158CREATE TABLE scaled_sched_results(
diff --git a/rtss14/database.py b/rtss14/database.py
index cd5535d..c0b099a 100755
--- a/rtss14/database.py
+++ b/rtss14/database.py
@@ -70,7 +70,7 @@ def init_dp_col_names(conn):
70 dp_col_type_strs = {} 70 dp_col_type_strs = {}
71 for field in cols: 71 for field in cols:
72 name = field[0].encode('ascii', 'ignore') 72 name = field[0].encode('ascii', 'ignore')
73 if(name == 'id'): 73 if(name == 'id' or name == 'taken'):
74 continue 74 continue
75 dp_col_names.append(name) 75 dp_col_names.append(name)
76 form = '%s' 76 form = '%s'
@@ -249,40 +249,6 @@ def clear_tables(db_name):
249 c.close() 249 c.close()
250 ################ 250 ################
251 251
252#def get_results(conn, dp, fields = '*', table = 'sched_results', extra = None):
253# where = ['%s=?' % k for k in dp.iterkeys()]
254#
255# ################
256# c = conn.cursor(db.cursors.DictCursor)
257# begin_sync(conn)
258# if extra:
259# query = "SELECT %s FROM %s WHERE %s %s" % (", ".join(fields), table, " AND ".join(where), extra)
260# else:
261# query = "SELECT %s FROM %s WHERE %s" % (", ".join(fields), table, " AND ".join(where))
262#
263# c.execute(query, tuple(dp.values()))
264# data = c.fetchall()
265#
266# end_sync(conn)
267# ##############
268#
269# return data
270
271#def count_results(conn, dp, table = 'sched_results'):
272# c = conn.cursor(db.cursors.DictCursor)
273# where = ['%s=?' % k for k in dp.iterkeys()]
274#
275# query = "SELECT COUNT(*) FROM %s WHERE %s" % (table, " AND ".join(where))
276#
277# ################
278# begin_sync(conn)
279# c.execute(query, tuple(dp.values()))
280# data = c.fetchone()
281# end_sync(conn)
282# ##############
283#
284# return data[0]
285
286def __get_dp_id(c, dp, add_if_missing = False): 252def __get_dp_id(c, dp, add_if_missing = False):
287 global dp_col_names 253 global dp_col_names
288 global dp_col_type_strs 254 global dp_col_type_strs
@@ -305,71 +271,85 @@ def __get_dp_id(c, dp, add_if_missing = False):
305 row = c.fetchone() 271 row = c.fetchone()
306 return row['id'] 272 return row['id']
307 273
308def __already_pending(dp, conn): 274#def __already_pending(dp, conn):
309 seeking = dp_to_db(dp) 275# seeking = dp_to_db(dp)
310 c = conn.cursor() 276# c = conn.cursor()
311 c.execute('SELECT COUNT(*) FROM dp_pending WHERE %s' % ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())), 277# c.execute('SELECT COUNT(*) FROM dp_pending WHERE taken=0 AND %s' % ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())),
312 tuple(seeking.values())) 278# tuple(seeking.values()))
313 row = c.fetchone() 279# row = c.fetchone()
314 processed = bool(row[0]) 280# processed = bool(row[0])
315 c.close() 281# c.close()
316 return processed 282# return processed
317 283
318def already_pending(dp, conn = None, db_name = None): 284#def already_pending(dp, conn = None, db_name = None):
319 if conn: 285# if conn:
320 return __already_pending(dp, conn) 286# return __already_pending(dp, conn)
321 else: 287# else:
322 conn = connect_db(db_name) 288# conn = connect_db(db_name)
323 with conn: 289# with conn:
324 return __already_pending(dp, conn) 290# return __already_pending(dp, conn)
325 291
326def __already_processed(dp, conn): 292#def __already_processed(dp, conn):
327 seeking = dp_to_db(dp) 293# seeking = dp_to_db(dp)
328 seeking.pop('ts_util', None) 294# seeking.pop('ts_util', None)
329 c = conn.cursor() 295# c = conn.cursor()
330 c.execute('SELECT COUNT(*) FROM sched_results as R JOIN dp_ptested as K on R.dp=K.id ' 296# c.execute('SELECT COUNT(*) FROM sched_results as R JOIN dp_ptested as K on R.dp=K.id '
331 'WHERE R.ts_util=%%s AND %s' % ' AND '.join(map(lambda x: 'K.%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())), 297# 'WHERE R.ts_util=%%s AND %s' % ' AND '.join(map(lambda x: 'K.%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())),
332 ((dp.sys_util,) + tuple(seeking.values()))) 298# ((dp.sys_util,) + tuple(seeking.values())))
333 row = c.fetchone() 299# row = c.fetchone()
334 processed = bool(row[0]) 300# processed = bool(row[0])
335 c.close() 301# c.close()
336 return processed 302# return processed
337 303
338def already_processed(dp, conn = None, db_name = None): 304#def already_processed(dp, conn = None, db_name = None):
339 if conn: 305# if conn:
340 return __already_processed(dp, conn) 306# return __already_processed(dp, conn)
341 else: 307# else:
342 conn = connect_db(db_name) 308# conn = connect_db(db_name)
343 with conn: 309# with conn:
344 return __already_processed(dp, conn) 310# return __already_processed(dp, conn)
345 311
346def store_design_points(db_name, dps, clean): 312def store_design_points(db_name, dps, clean):
347 global dp_col_names 313 global dp_col_names
348 global dp_col_type_strs 314 global dp_col_type_strs
349 315
316 dps = [dp_to_db(dp) for dp in dps]
317 nstored = 0
318
350 conn = connect_db(db_name) 319 conn = connect_db(db_name)
351 npending = 0
352 with conn: 320 with conn:
353 c = conn.cursor() 321 c = conn.cursor()
354 322 if clean:
355 # convert to dicts with db names 323 c.executemany('INSERT INTO dp_pending VALUES(NULL,0,%s)' % ','.join(['%s']*len(dp_col_names)),
356 if not clean: 324 [tuple(map(lambda x: d[x], dp_col_names)) for d in dps])
357 dps = [dp_to_db(dp) for dp in dps if not (already_processed(dp, conn = conn) or already_pending(dp, conn = conn))] 325 nstored = len(dps)
358 else: 326 else:
359 dps = [dp_to_db(dp) for dp in dps] 327 col_names = copy.deepcopy(dp_col_names)
328 col_names.pop(0) # remove ts_util
329 # store entire dp set in a temporary staging table
330 c.execute('CREATE TEMPORARY TABLE tmp_dp LIKE dp_pending')
331 c.executemany('INSERT INTO tmp_dp VALUES(NULL,0,%s)' % ','.join(['%s']*len(dp_col_names)),
332 [tuple(map(lambda x: d[x], dp_col_names)) for d in dps])
333 # mask out the dps that have already been processed
334 c.execute('UPDATE tmp_dp as S JOIN sched_results as R on S.ts_util=R.ts_util JOIN dp_ptested as T on R.dp=T.id '
335 'SET S.taken=1 WHERE %s' % ' AND '.join(map(lambda x: 'S.%s=T.%s' % (x,x), col_names)))
336 # mask out the dps that are already pending
337 c.execute('UPDATE tmp_dp as S JOIN dp_pending as P on %s SET S.taken=1' % ' AND '.join(map(lambda x: 'S.%s=P.%s' % (x,x), dp_col_names)))
338 # copy any remaining dps into the pending table
339 c.execute('INSERT INTO dp_pending(taken,%s) SELECT taken,%s FROM tmp_dp WHERE tmp_dp.taken=0' % (','.join(dp_col_names), ','.join(dp_col_names)))
340 nstored = c.rowcount
341 c.execute('DROP TEMPORARY TABLE tmp_dp')
342 c.close()
343 return nstored
360 344
361 ################ 345def num_pending_design_points(db_name):
362 # convert dicts into correctly-ordered list and insert 346 conn = connect_db(db_name)
363 c.executemany('INSERT INTO dp_pending VALUES(NULL,%s)' % ','.join(['%s']*len(dp_col_names)), 347 with conn:
364 [tuple(map(lambda x: d[x], dp_col_names)) for d in dps]) 348 c = conn.cursor()
365 # complete 349 c.execute('SELECT COUNT(*) FROM dp_pending WHERE taken=0')
366 c.execute('SELECT COUNT(*) FROM dp_pending')
367 npending = c.fetchone()[0] 350 npending = c.fetchone()[0]
368 ##############
369
370 c.close() 351 c.close()
371 return len(dps), npending 352 return npending
372
373 353
374def get_design_points(db_name, ndp = 1): 354def get_design_points(db_name, ndp = 1):
375 global dp_col_names 355 global dp_col_names
@@ -381,44 +361,31 @@ def get_design_points(db_name, ndp = 1):
381 failcount = 0 361 failcount = 0
382 while True: 362 while True:
383 try: 363 try:
384 retry = True 364 if conn is not None:
385 while retry is True: 365 conn.close()
386 retry = False 366 del conn
387 367 conn = None
388 if conn is not None: 368
389 conn.close() 369 conn = connect_db(db_name)
390 del conn 370 with conn:
391 conn = None 371 ################
392 372 c = conn.cursor(db.cursors.DictCursor)
393 conn = connect_db(db_name) 373
394 with conn: 374 begin_sync(conn, c)
395 ################ 375 c.execute('SELECT id FROM dp_pending WHERE taken=0 LIMIT %s FOR UPDATE', (ndp,))
396 c = conn.cursor(db.cursors.DictCursor) 376 dp_ids = [d['id'] for d in c.fetchall()]
397 begin_sync(conn, c) 377 nfetched = len(dp_ids) if dp_ids else 0
398# c.execute('LOCK TABLES dp_pending WRITE') 378 if nfetched > 0:
399 c.execute('SELECT %s FROM dp_pending LIMIT %%s FOR UPDATE' % (','.join(dp_col_names)), (ndp,)) 379 c.execute('UPDATE dp_pending SET taken=1 WHERE %s' % ' OR '.join(['id=%s']*len(dp_ids)), tuple(dp_ids))
400 dps = [db_to_dp(d) for d in c.fetchall()] 380 end_sync(conn)
401 nfetched = len(dps) if dps else 0 381
402 if nfetched > 0: 382 if nfetched > 0:
403 temp = [dp_to_db(d) for d in dps] 383 c.execute('SELECT %s FROM dp_pending WHERE %s' % (','.join(dp_col_names), ' OR '.join(['id=%s']*len(dp_ids))), tuple(dp_ids))
404 for i in xrange(nfetched): 384 fetched = [db_to_dp(d) for d in c.fetchall()]
405 c.execute('DELETE FROM dp_pending ' 385
406 'WHERE %s' % ' AND '.join(map(lambda x: '%s=%s'%(x, dp_col_type_strs[x]), dp_col_names)), 386 c.close()
407 tuple(map(lambda x: temp[i][x], dp_col_names))) 387 break
408 if c.rowcount == 1: 388 ##############
409 fetched.append(dps[i])
410 elif c.rowcount == 0:
411 print 'raced for design point. dropping.'
412 else:
413 print 'deleted too many rows... dropping.'
414 if len(fetched) == 0:
415 retry = True
416# c.execute('UNLOCK TABLES')
417 end_sync(conn)
418 c.close()
419 ##############
420 # success!
421 break
422 except db.OperationalError, e: 389 except db.OperationalError, e:
423 print e 390 print e
424 # make sure the db connection is closed 391 # make sure the db connection is closed
@@ -518,24 +485,16 @@ def store_sched_results(db_name, data, ndp = 0):
518 485
519 if ndp > 0: 486 if ndp > 0:
520 begin_sync(conn, c) 487 begin_sync(conn, c)
521# c.execute('LOCK TABLES dp_pending WRITE') 488 c.execute('SELECT id FROM dp_pending WHERE taken=0 LIMIT %s FOR UPDATE', (ndp,))
522 c.execute('SELECT %s FROM dp_pending LIMIT %%s FOR UPDATE' % (','.join(dp_col_names)), (ndp,)) 489 dp_ids = [d['id'] for d in c.fetchall()]
523 dps = [db_to_dp(d) for d in c.fetchall()] 490 nfetched = len(dp_ids) if dp_ids else 0
524 nfetched = len(dps) if dps else 0
525 if nfetched > 0: 491 if nfetched > 0:
526 temp = [dp_to_db(d) for d in dps] 492 c.execute('UPDATE dp_pending SET taken=1 WHERE %s' % ' OR '.join(['id=%s']*len(dp_ids)), tuple(dp_ids))
527 for i in xrange(nfetched):
528 c.execute('DELETE FROM dp_pending '
529 'WHERE %s' % ' AND '.join(map(lambda x: '%s=%s'%(x, dp_col_type_strs[x]), dp_col_names)),
530 tuple(map(lambda x: temp[i][x], dp_col_names)))
531 if c.rowcount == 1:
532 fetched.append(dps[i])
533 elif c.rowcount == 0:
534 print 'store_sched_results: raced for design point. dropping.'
535 else:
536 print 'store_sched_results: deleted too many rows... dropping.'
537# c.execute('UNLOCK TABLES')
538 end_sync(conn) 493 end_sync(conn)
494
495 if nfetched > 0:
496 c.execute('SELECT %s FROM dp_pending WHERE %s' % (','.join(dp_col_names), ' OR '.join(['id=%s']*len(dp_ids))), tuple(dp_ids))
497 fetched = [db_to_dp(d) for d in c.fetchall()]
539 c.close() 498 c.close()
540 # success! 499 # success!
541 break 500 break
diff --git a/rtss14/rtss14.py b/rtss14/rtss14.py
index f049951..ee3564a 100755
--- a/rtss14/rtss14.py
+++ b/rtss14/rtss14.py
@@ -785,7 +785,6 @@ def get_dp_space(cpus):
785 exp.ncpus = [12] 785 exp.ncpus = [12]
786 exp.nclusters = [1, 2, 12] 786 exp.nclusters = [1, 2, 12]
787 exp.ngpus = [8] 787 exp.ngpus = [8]
788# exp.ngclusters = [1, 2, 4, 8]
789 exp.ngclusters = [1, 2, 4, 8] 788 exp.ngclusters = [1, 2, 4, 8]
790 exp.release_master = [False] 789 exp.release_master = [False]
791 exp.polluters = [False, True] 790 exp.polluters = [False, True]
@@ -794,7 +793,6 @@ def get_dp_space(cpus):
794 793
795 # gpusync config variables 794 # gpusync config variables
796 exp.rho = [0, 1, 2, 3] 795 exp.rho = [0, 1, 2, 3]
797# exp.rho = [1, 3]
798 exp.dgl = [True, False] 796 exp.dgl = [True, False]
799 exp.p2p = [True, False] 797 exp.p2p = [True, False]
800 exp.ncopy_engines = [2] 798 exp.ncopy_engines = [2]
@@ -802,8 +800,8 @@ def get_dp_space(cpus):
802 800
803 # task parameters 801 # task parameters
804# step_size = 0.1 802# step_size = 0.1
805# step_size = 0.5 803 step_size = 0.5
806 step_size = 0.25 804# step_size = 0.25
807 start_pt = step_size 805 start_pt = step_size
808# start_pt = 10.0 806# start_pt = 10.0
809 exp.sys_util = [float(v) for v in arange(start_pt, cpus+step_size, step_size)] 807 exp.sys_util = [float(v) for v in arange(start_pt, cpus+step_size, step_size)]
@@ -891,11 +889,10 @@ def main():
891 889
892 # load design points incrementally as to not overload memory 890 # load design points incrementally as to not overload memory
893 gen = DesignPointGenerator(exp, is_valid = valid) 891 gen = DesignPointGenerator(exp, is_valid = valid)
894 dp_chunk_sz = 10000 # number of design points to load at a time 892 dp_chunk_sz = 50000 # number of design points to load at a time
895 ndp = 0 893 ndp = 0
896 if not args.pretend: 894 if not args.pretend:
897 nstored = 0 895 nstored = 0
898 npending = 0
899 while True: 896 while True:
900 dp_chunk = [] 897 dp_chunk = []
901 for _ in range(dp_chunk_sz): 898 for _ in range(dp_chunk_sz):
@@ -907,26 +904,15 @@ def main():
907 break 904 break
908 ndp += len(dp_chunk) 905 ndp += len(dp_chunk)
909 random.shuffle(dp_chunk) 906 random.shuffle(dp_chunk)
910 tmpnstored, npending = db.store_design_points(args.database, dp_chunk, clean = not args.resume) 907 tmpnstored = db.store_design_points(args.database, dp_chunk, clean = not args.resume)
911 nstored += tmpnstored 908 nstored += tmpnstored
912 print "Loaded %d of %d design points. (%d were completed or pending, %d now pending)" % (nstored, ndp, ndp - nstored, npending) 909 npending = db.num_pending_design_points(args.database)
910 print "Loaded %d of %d design points. (%d were completed, %d now pending)" % (nstored, ndp, ndp - npending, npending)
913 else: 911 else:
914 for _ in gen: 912 for _ in gen:
915 ndp += 1 913 ndp += 1
916 print "%d design points planned" % ndp 914 print "%d design points planned" % ndp
917 915
918# design_points = [dp for dp in DesignPointGenerator(exp, is_valid = valid)]
919# ndp = len(design_points)
920# if not args.pretend:
921# random.shuffle(design_points)
922# if not args.resume:
923# db.clear_tables(args.database)
924# nstored, npending = db.store_design_points(args.database, design_points, clean = not args.resume)
925# print "Loaded %d of %d design points. (%d were completed or pending, %d now pending)" % (
926# nstored, ndp, ndp - nstored, npending)
927# else:
928# print "%d design points planned" % ndp
929
930 if args.pretend or args.initonly: 916 if args.pretend or args.initonly:
931 exit(0) 917 exit(0)
932 918