diff options
author | Glenn Elliott <gelliott@cs.unc.edu> | 2014-09-22 22:03:52 -0400 |
---|---|---|
committer | Glenn Elliott <gelliott@cs.unc.edu> | 2014-09-22 22:03:52 -0400 |
commit | 0808eb2a0e5b72903a8484728f94d49d8dbbad3d (patch) | |
tree | 912af115c9cfa55424ffe84d4f978134af86fbf0 | |
parent | 5c2b513ea0f52986719c4cb185a3a3505e8376d4 (diff) |
Optimize dp fetch from database
-rw-r--r-- | rtss14/createtables.sql | 10 | ||||
-rwxr-xr-x | rtss14/database.py | 243 | ||||
-rwxr-xr-x | rtss14/rtss14.py | 26 |
3 files changed, 115 insertions, 164 deletions
diff --git a/rtss14/createtables.sql b/rtss14/createtables.sql index a072aa4..a76a579 100644 --- a/rtss14/createtables.sql +++ b/rtss14/createtables.sql | |||
@@ -41,6 +41,9 @@ INSERT INTO distrs(name) VALUES('c-const-heavy'); | |||
41 | CREATE TABLE dp_pending( | 41 | CREATE TABLE dp_pending( |
42 | id INTEGER NOT NULL AUTO_INCREMENT, | 42 | id INTEGER NOT NULL AUTO_INCREMENT, |
43 | 43 | ||
44 | -- flag design point as taken | ||
45 | taken INTEGER, | ||
46 | |||
44 | -- task set util cap | 47 | -- task set util cap |
45 | ts_util REAL, | 48 | ts_util REAL, |
46 | 49 | ||
@@ -76,7 +79,9 @@ CREATE TABLE dp_pending( | |||
76 | ncopy_engines INTEGER, | 79 | ncopy_engines INTEGER, |
77 | chunk_size INTEGER, | 80 | chunk_size INTEGER, |
78 | 81 | ||
79 | PRIMARY KEY (id) | 82 | PRIMARY KEY (id), |
83 | INDEX (id, taken), | ||
84 | INDEX (ts_util) | ||
80 | -- every permutation marks a unique configuration | 85 | -- every permutation marks a unique configuration |
81 | -- PRIMARY KEY(ts_util, | 86 | -- PRIMARY KEY(ts_util, |
82 | -- ncpu, cpu_cluster_size, ngpu, gpu_cluster_size, is_release_master, | 87 | -- ncpu, cpu_cluster_size, ngpu, gpu_cluster_size, is_release_master, |
@@ -146,7 +151,8 @@ CREATE TABLE sched_results( | |||
146 | avg_bandwidth REAL, | 151 | avg_bandwidth REAL, |
147 | 152 | ||
148 | FOREIGN KEY(dp) REFERENCES dp_ptested(id), | 153 | FOREIGN KEY(dp) REFERENCES dp_ptested(id), |
149 | PRIMARY KEY(dp, ts_util) | 154 | PRIMARY KEY(dp, ts_util), |
155 | INDEX (ts_util) | ||
150 | ) ENGINE=InnoDB; | 156 | ) ENGINE=InnoDB; |
151 | 157 | ||
152 | CREATE TABLE scaled_sched_results( | 158 | CREATE TABLE scaled_sched_results( |
diff --git a/rtss14/database.py b/rtss14/database.py index cd5535d..c0b099a 100755 --- a/rtss14/database.py +++ b/rtss14/database.py | |||
@@ -70,7 +70,7 @@ def init_dp_col_names(conn): | |||
70 | dp_col_type_strs = {} | 70 | dp_col_type_strs = {} |
71 | for field in cols: | 71 | for field in cols: |
72 | name = field[0].encode('ascii', 'ignore') | 72 | name = field[0].encode('ascii', 'ignore') |
73 | if(name == 'id'): | 73 | if(name == 'id' or name == 'taken'): |
74 | continue | 74 | continue |
75 | dp_col_names.append(name) | 75 | dp_col_names.append(name) |
76 | form = '%s' | 76 | form = '%s' |
@@ -249,40 +249,6 @@ def clear_tables(db_name): | |||
249 | c.close() | 249 | c.close() |
250 | ################ | 250 | ################ |
251 | 251 | ||
252 | #def get_results(conn, dp, fields = '*', table = 'sched_results', extra = None): | ||
253 | # where = ['%s=?' % k for k in dp.iterkeys()] | ||
254 | # | ||
255 | # ################ | ||
256 | # c = conn.cursor(db.cursors.DictCursor) | ||
257 | # begin_sync(conn) | ||
258 | # if extra: | ||
259 | # query = "SELECT %s FROM %s WHERE %s %s" % (", ".join(fields), table, " AND ".join(where), extra) | ||
260 | # else: | ||
261 | # query = "SELECT %s FROM %s WHERE %s" % (", ".join(fields), table, " AND ".join(where)) | ||
262 | # | ||
263 | # c.execute(query, tuple(dp.values())) | ||
264 | # data = c.fetchall() | ||
265 | # | ||
266 | # end_sync(conn) | ||
267 | # ############## | ||
268 | # | ||
269 | # return data | ||
270 | |||
271 | #def count_results(conn, dp, table = 'sched_results'): | ||
272 | # c = conn.cursor(db.cursors.DictCursor) | ||
273 | # where = ['%s=?' % k for k in dp.iterkeys()] | ||
274 | # | ||
275 | # query = "SELECT COUNT(*) FROM %s WHERE %s" % (table, " AND ".join(where)) | ||
276 | # | ||
277 | # ################ | ||
278 | # begin_sync(conn) | ||
279 | # c.execute(query, tuple(dp.values())) | ||
280 | # data = c.fetchone() | ||
281 | # end_sync(conn) | ||
282 | # ############## | ||
283 | # | ||
284 | # return data[0] | ||
285 | |||
286 | def __get_dp_id(c, dp, add_if_missing = False): | 252 | def __get_dp_id(c, dp, add_if_missing = False): |
287 | global dp_col_names | 253 | global dp_col_names |
288 | global dp_col_type_strs | 254 | global dp_col_type_strs |
@@ -305,71 +271,85 @@ def __get_dp_id(c, dp, add_if_missing = False): | |||
305 | row = c.fetchone() | 271 | row = c.fetchone() |
306 | return row['id'] | 272 | return row['id'] |
307 | 273 | ||
308 | def __already_pending(dp, conn): | 274 | #def __already_pending(dp, conn): |
309 | seeking = dp_to_db(dp) | 275 | # seeking = dp_to_db(dp) |
310 | c = conn.cursor() | 276 | # c = conn.cursor() |
311 | c.execute('SELECT COUNT(*) FROM dp_pending WHERE %s' % ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())), | 277 | # c.execute('SELECT COUNT(*) FROM dp_pending WHERE taken=0 AND %s' % ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())), |
312 | tuple(seeking.values())) | 278 | # tuple(seeking.values())) |
313 | row = c.fetchone() | 279 | # row = c.fetchone() |
314 | processed = bool(row[0]) | 280 | # processed = bool(row[0]) |
315 | c.close() | 281 | # c.close() |
316 | return processed | 282 | # return processed |
317 | 283 | ||
318 | def already_pending(dp, conn = None, db_name = None): | 284 | #def already_pending(dp, conn = None, db_name = None): |
319 | if conn: | 285 | # if conn: |
320 | return __already_pending(dp, conn) | 286 | # return __already_pending(dp, conn) |
321 | else: | 287 | # else: |
322 | conn = connect_db(db_name) | 288 | # conn = connect_db(db_name) |
323 | with conn: | 289 | # with conn: |
324 | return __already_pending(dp, conn) | 290 | # return __already_pending(dp, conn) |
325 | 291 | ||
326 | def __already_processed(dp, conn): | 292 | #def __already_processed(dp, conn): |
327 | seeking = dp_to_db(dp) | 293 | # seeking = dp_to_db(dp) |
328 | seeking.pop('ts_util', None) | 294 | # seeking.pop('ts_util', None) |
329 | c = conn.cursor() | 295 | # c = conn.cursor() |
330 | c.execute('SELECT COUNT(*) FROM sched_results as R JOIN dp_ptested as K on R.dp=K.id ' | 296 | # c.execute('SELECT COUNT(*) FROM sched_results as R JOIN dp_ptested as K on R.dp=K.id ' |
331 | 'WHERE R.ts_util=%%s AND %s' % ' AND '.join(map(lambda x: 'K.%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())), | 297 | # 'WHERE R.ts_util=%%s AND %s' % ' AND '.join(map(lambda x: 'K.%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())), |
332 | ((dp.sys_util,) + tuple(seeking.values()))) | 298 | # ((dp.sys_util,) + tuple(seeking.values()))) |
333 | row = c.fetchone() | 299 | # row = c.fetchone() |
334 | processed = bool(row[0]) | 300 | # processed = bool(row[0]) |
335 | c.close() | 301 | # c.close() |
336 | return processed | 302 | # return processed |
337 | 303 | ||
338 | def already_processed(dp, conn = None, db_name = None): | 304 | #def already_processed(dp, conn = None, db_name = None): |
339 | if conn: | 305 | # if conn: |
340 | return __already_processed(dp, conn) | 306 | # return __already_processed(dp, conn) |
341 | else: | 307 | # else: |
342 | conn = connect_db(db_name) | 308 | # conn = connect_db(db_name) |
343 | with conn: | 309 | # with conn: |
344 | return __already_processed(dp, conn) | 310 | # return __already_processed(dp, conn) |
345 | 311 | ||
346 | def store_design_points(db_name, dps, clean): | 312 | def store_design_points(db_name, dps, clean): |
347 | global dp_col_names | 313 | global dp_col_names |
348 | global dp_col_type_strs | 314 | global dp_col_type_strs |
349 | 315 | ||
316 | dps = [dp_to_db(dp) for dp in dps] | ||
317 | nstored = 0 | ||
318 | |||
350 | conn = connect_db(db_name) | 319 | conn = connect_db(db_name) |
351 | npending = 0 | ||
352 | with conn: | 320 | with conn: |
353 | c = conn.cursor() | 321 | c = conn.cursor() |
354 | 322 | if clean: | |
355 | # convert to dicts with db names | 323 | c.executemany('INSERT INTO dp_pending VALUES(NULL,0,%s)' % ','.join(['%s']*len(dp_col_names)), |
356 | if not clean: | 324 | [tuple(map(lambda x: d[x], dp_col_names)) for d in dps]) |
357 | dps = [dp_to_db(dp) for dp in dps if not (already_processed(dp, conn = conn) or already_pending(dp, conn = conn))] | 325 | nstored = len(dps) |
358 | else: | 326 | else: |
359 | dps = [dp_to_db(dp) for dp in dps] | 327 | col_names = copy.deepcopy(dp_col_names) |
328 | col_names.pop(0) # remove ts_util | ||
329 | # store entire dp set in a temporary staging table | ||
330 | c.execute('CREATE TEMPORARY TABLE tmp_dp LIKE dp_pending') | ||
331 | c.executemany('INSERT INTO tmp_dp VALUES(NULL,0,%s)' % ','.join(['%s']*len(dp_col_names)), | ||
332 | [tuple(map(lambda x: d[x], dp_col_names)) for d in dps]) | ||
333 | # mask out the dps that have already been processed | ||
334 | c.execute('UPDATE tmp_dp as S JOIN sched_results as R on S.ts_util=R.ts_util JOIN dp_ptested as T on R.dp=T.id ' | ||
335 | 'SET S.taken=1 WHERE %s' % ' AND '.join(map(lambda x: 'S.%s=T.%s' % (x,x), col_names))) | ||
336 | # mask out the dps that are already pending | ||
337 | c.execute('UPDATE tmp_dp as S JOIN dp_pending as P on %s SET S.taken=1' % ' AND '.join(map(lambda x: 'S.%s=P.%s' % (x,x), dp_col_names))) | ||
338 | # copy any remaining dps into the pending table | ||
339 | c.execute('INSERT INTO dp_pending(taken,%s) SELECT taken,%s FROM tmp_dp WHERE tmp_dp.taken=0' % (','.join(dp_col_names), ','.join(dp_col_names))) | ||
340 | nstored = c.rowcount | ||
341 | c.execute('DROP TEMPORARY TABLE tmp_dp') | ||
342 | c.close() | ||
343 | return nstored | ||
360 | 344 | ||
361 | ################ | 345 | def num_pending_design_points(db_name): |
362 | # convert dicts into correctly-ordered list and insert | 346 | conn = connect_db(db_name) |
363 | c.executemany('INSERT INTO dp_pending VALUES(NULL,%s)' % ','.join(['%s']*len(dp_col_names)), | 347 | with conn: |
364 | [tuple(map(lambda x: d[x], dp_col_names)) for d in dps]) | 348 | c = conn.cursor() |
365 | # complete | 349 | c.execute('SELECT COUNT(*) FROM dp_pending WHERE taken=0') |
366 | c.execute('SELECT COUNT(*) FROM dp_pending') | ||
367 | npending = c.fetchone()[0] | 350 | npending = c.fetchone()[0] |
368 | ############## | ||
369 | |||
370 | c.close() | 351 | c.close() |
371 | return len(dps), npending | 352 | return npending |
372 | |||
373 | 353 | ||
374 | def get_design_points(db_name, ndp = 1): | 354 | def get_design_points(db_name, ndp = 1): |
375 | global dp_col_names | 355 | global dp_col_names |
@@ -381,44 +361,31 @@ def get_design_points(db_name, ndp = 1): | |||
381 | failcount = 0 | 361 | failcount = 0 |
382 | while True: | 362 | while True: |
383 | try: | 363 | try: |
384 | retry = True | 364 | if conn is not None: |
385 | while retry is True: | 365 | conn.close() |
386 | retry = False | 366 | del conn |
387 | 367 | conn = None | |
388 | if conn is not None: | 368 | |
389 | conn.close() | 369 | conn = connect_db(db_name) |
390 | del conn | 370 | with conn: |
391 | conn = None | 371 | ################ |
392 | 372 | c = conn.cursor(db.cursors.DictCursor) | |
393 | conn = connect_db(db_name) | 373 | |
394 | with conn: | 374 | begin_sync(conn, c) |
395 | ################ | 375 | c.execute('SELECT id FROM dp_pending WHERE taken=0 LIMIT %s FOR UPDATE', (ndp,)) |
396 | c = conn.cursor(db.cursors.DictCursor) | 376 | dp_ids = [d['id'] for d in c.fetchall()] |
397 | begin_sync(conn, c) | 377 | nfetched = len(dp_ids) if dp_ids else 0 |
398 | # c.execute('LOCK TABLES dp_pending WRITE') | 378 | if nfetched > 0: |
399 | c.execute('SELECT %s FROM dp_pending LIMIT %%s FOR UPDATE' % (','.join(dp_col_names)), (ndp,)) | 379 | c.execute('UPDATE dp_pending SET taken=1 WHERE %s' % ' OR '.join(['id=%s']*len(dp_ids)), tuple(dp_ids)) |
400 | dps = [db_to_dp(d) for d in c.fetchall()] | 380 | end_sync(conn) |
401 | nfetched = len(dps) if dps else 0 | 381 | |
402 | if nfetched > 0: | 382 | if nfetched > 0: |
403 | temp = [dp_to_db(d) for d in dps] | 383 | c.execute('SELECT %s FROM dp_pending WHERE %s' % (','.join(dp_col_names), ' OR '.join(['id=%s']*len(dp_ids))), tuple(dp_ids)) |
404 | for i in xrange(nfetched): | 384 | fetched = [db_to_dp(d) for d in c.fetchall()] |
405 | c.execute('DELETE FROM dp_pending ' | 385 | |
406 | 'WHERE %s' % ' AND '.join(map(lambda x: '%s=%s'%(x, dp_col_type_strs[x]), dp_col_names)), | 386 | c.close() |
407 | tuple(map(lambda x: temp[i][x], dp_col_names))) | 387 | break |
408 | if c.rowcount == 1: | 388 | ############## |
409 | fetched.append(dps[i]) | ||
410 | elif c.rowcount == 0: | ||
411 | print 'raced for design point. dropping.' | ||
412 | else: | ||
413 | print 'deleted too many rows... dropping.' | ||
414 | if len(fetched) == 0: | ||
415 | retry = True | ||
416 | # c.execute('UNLOCK TABLES') | ||
417 | end_sync(conn) | ||
418 | c.close() | ||
419 | ############## | ||
420 | # success! | ||
421 | break | ||
422 | except db.OperationalError, e: | 389 | except db.OperationalError, e: |
423 | print e | 390 | print e |
424 | # make sure the db connection is closed | 391 | # make sure the db connection is closed |
@@ -518,24 +485,16 @@ def store_sched_results(db_name, data, ndp = 0): | |||
518 | 485 | ||
519 | if ndp > 0: | 486 | if ndp > 0: |
520 | begin_sync(conn, c) | 487 | begin_sync(conn, c) |
521 | # c.execute('LOCK TABLES dp_pending WRITE') | 488 | c.execute('SELECT id FROM dp_pending WHERE taken=0 LIMIT %s FOR UPDATE', (ndp,)) |
522 | c.execute('SELECT %s FROM dp_pending LIMIT %%s FOR UPDATE' % (','.join(dp_col_names)), (ndp,)) | 489 | dp_ids = [d['id'] for d in c.fetchall()] |
523 | dps = [db_to_dp(d) for d in c.fetchall()] | 490 | nfetched = len(dp_ids) if dp_ids else 0 |
524 | nfetched = len(dps) if dps else 0 | ||
525 | if nfetched > 0: | 491 | if nfetched > 0: |
526 | temp = [dp_to_db(d) for d in dps] | 492 | c.execute('UPDATE dp_pending SET taken=1 WHERE %s' % ' OR '.join(['id=%s']*len(dp_ids)), tuple(dp_ids)) |
527 | for i in xrange(nfetched): | ||
528 | c.execute('DELETE FROM dp_pending ' | ||
529 | 'WHERE %s' % ' AND '.join(map(lambda x: '%s=%s'%(x, dp_col_type_strs[x]), dp_col_names)), | ||
530 | tuple(map(lambda x: temp[i][x], dp_col_names))) | ||
531 | if c.rowcount == 1: | ||
532 | fetched.append(dps[i]) | ||
533 | elif c.rowcount == 0: | ||
534 | print 'store_sched_results: raced for design point. dropping.' | ||
535 | else: | ||
536 | print 'store_sched_results: deleted too many rows... dropping.' | ||
537 | # c.execute('UNLOCK TABLES') | ||
538 | end_sync(conn) | 493 | end_sync(conn) |
494 | |||
495 | if nfetched > 0: | ||
496 | c.execute('SELECT %s FROM dp_pending WHERE %s' % (','.join(dp_col_names), ' OR '.join(['id=%s']*len(dp_ids))), tuple(dp_ids)) | ||
497 | fetched = [db_to_dp(d) for d in c.fetchall()] | ||
539 | c.close() | 498 | c.close() |
540 | # success! | 499 | # success! |
541 | break | 500 | break |
diff --git a/rtss14/rtss14.py b/rtss14/rtss14.py index f049951..ee3564a 100755 --- a/rtss14/rtss14.py +++ b/rtss14/rtss14.py | |||
@@ -785,7 +785,6 @@ def get_dp_space(cpus): | |||
785 | exp.ncpus = [12] | 785 | exp.ncpus = [12] |
786 | exp.nclusters = [1, 2, 12] | 786 | exp.nclusters = [1, 2, 12] |
787 | exp.ngpus = [8] | 787 | exp.ngpus = [8] |
788 | # exp.ngclusters = [1, 2, 4, 8] | ||
789 | exp.ngclusters = [1, 2, 4, 8] | 788 | exp.ngclusters = [1, 2, 4, 8] |
790 | exp.release_master = [False] | 789 | exp.release_master = [False] |
791 | exp.polluters = [False, True] | 790 | exp.polluters = [False, True] |
@@ -794,7 +793,6 @@ def get_dp_space(cpus): | |||
794 | 793 | ||
795 | # gpusync config variables | 794 | # gpusync config variables |
796 | exp.rho = [0, 1, 2, 3] | 795 | exp.rho = [0, 1, 2, 3] |
797 | # exp.rho = [1, 3] | ||
798 | exp.dgl = [True, False] | 796 | exp.dgl = [True, False] |
799 | exp.p2p = [True, False] | 797 | exp.p2p = [True, False] |
800 | exp.ncopy_engines = [2] | 798 | exp.ncopy_engines = [2] |
@@ -802,8 +800,8 @@ def get_dp_space(cpus): | |||
802 | 800 | ||
803 | # task parameters | 801 | # task parameters |
804 | # step_size = 0.1 | 802 | # step_size = 0.1 |
805 | # step_size = 0.5 | 803 | step_size = 0.5 |
806 | step_size = 0.25 | 804 | # step_size = 0.25 |
807 | start_pt = step_size | 805 | start_pt = step_size |
808 | # start_pt = 10.0 | 806 | # start_pt = 10.0 |
809 | exp.sys_util = [float(v) for v in arange(start_pt, cpus+step_size, step_size)] | 807 | exp.sys_util = [float(v) for v in arange(start_pt, cpus+step_size, step_size)] |
@@ -891,11 +889,10 @@ def main(): | |||
891 | 889 | ||
892 | # load design points incrementally as to not overload memory | 890 | # load design points incrementally as to not overload memory |
893 | gen = DesignPointGenerator(exp, is_valid = valid) | 891 | gen = DesignPointGenerator(exp, is_valid = valid) |
894 | dp_chunk_sz = 10000 # number of design points to load at a time | 892 | dp_chunk_sz = 50000 # number of design points to load at a time |
895 | ndp = 0 | 893 | ndp = 0 |
896 | if not args.pretend: | 894 | if not args.pretend: |
897 | nstored = 0 | 895 | nstored = 0 |
898 | npending = 0 | ||
899 | while True: | 896 | while True: |
900 | dp_chunk = [] | 897 | dp_chunk = [] |
901 | for _ in range(dp_chunk_sz): | 898 | for _ in range(dp_chunk_sz): |
@@ -907,26 +904,15 @@ def main(): | |||
907 | break | 904 | break |
908 | ndp += len(dp_chunk) | 905 | ndp += len(dp_chunk) |
909 | random.shuffle(dp_chunk) | 906 | random.shuffle(dp_chunk) |
910 | tmpnstored, npending = db.store_design_points(args.database, dp_chunk, clean = not args.resume) | 907 | tmpnstored = db.store_design_points(args.database, dp_chunk, clean = not args.resume) |
911 | nstored += tmpnstored | 908 | nstored += tmpnstored |
912 | print "Loaded %d of %d design points. (%d were completed or pending, %d now pending)" % (nstored, ndp, ndp - nstored, npending) | 909 | npending = db.num_pending_design_points(args.database) |
910 | print "Loaded %d of %d design points. (%d were completed, %d now pending)" % (nstored, ndp, ndp - npending, npending) | ||
913 | else: | 911 | else: |
914 | for _ in gen: | 912 | for _ in gen: |
915 | ndp += 1 | 913 | ndp += 1 |
916 | print "%d design points planned" % ndp | 914 | print "%d design points planned" % ndp |
917 | 915 | ||
918 | # design_points = [dp for dp in DesignPointGenerator(exp, is_valid = valid)] | ||
919 | # ndp = len(design_points) | ||
920 | # if not args.pretend: | ||
921 | # random.shuffle(design_points) | ||
922 | # if not args.resume: | ||
923 | # db.clear_tables(args.database) | ||
924 | # nstored, npending = db.store_design_points(args.database, design_points, clean = not args.resume) | ||
925 | # print "Loaded %d of %d design points. (%d were completed or pending, %d now pending)" % ( | ||
926 | # nstored, ndp, ndp - nstored, npending) | ||
927 | # else: | ||
928 | # print "%d design points planned" % ndp | ||
929 | |||
930 | if args.pretend or args.initonly: | 916 | if args.pretend or args.initonly: |
931 | exit(0) | 917 | exit(0) |
932 | 918 | ||