aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGlenn Elliott <gelliott@cs.unc.edu>2014-09-21 18:35:53 -0400
committerGlenn Elliott <gelliott@cs.unc.edu>2014-09-21 18:35:53 -0400
commit0d0de82d3994f3e737925e1e6a3e4d403375e529 (patch)
treef6d883c29fc61bf089a7232328841fc990306317
parentd3efc4e3241fdc1d2ec8524a424a29895dc86a22 (diff)
Move to MySQL
-rw-r--r--rtss14/createtables.sql50
-rwxr-xr-xrtss14/database.py172
-rwxr-xr-xrtss14/rtss14.py104
3 files changed, 190 insertions, 136 deletions
diff --git a/rtss14/createtables.sql b/rtss14/createtables.sql
index 45490d3..a072aa4 100644
--- a/rtss14/createtables.sql
+++ b/rtss14/createtables.sql
@@ -1,7 +1,8 @@
1CREATE TABLE distrs( 1CREATE TABLE distrs(
2 id INTEGER PRIMARY KEY NOT NULL, 2 id INTEGER NOT NULL AUTO_INCREMENT,
3 name TEXT 3 name TEXT,
4); 4 PRIMARY KEY (id)
5) ENGINE=InnoDB;
5-- util 6-- util
6INSERT INTO distrs(name) VALUES('u-uni-light'); 7INSERT INTO distrs(name) VALUES('u-uni-light');
7INSERT INTO distrs(name) VALUES('u-uni-medium'); 8INSERT INTO distrs(name) VALUES('u-uni-medium');
@@ -37,12 +38,9 @@ INSERT INTO distrs(name) VALUES('c-const-light');
37INSERT INTO distrs(name) VALUES('c-const-medium'); 38INSERT INTO distrs(name) VALUES('c-const-medium');
38INSERT INTO distrs(name) VALUES('c-const-heavy'); 39INSERT INTO distrs(name) VALUES('c-const-heavy');
39 40
40--CREATE TABLE dummy(
41-- pid INTEGER
42--);
43--INSERT INTO dummy(pid) VALUES(0);
44
45CREATE TABLE dp_pending( 41CREATE TABLE dp_pending(
42 id INTEGER NOT NULL AUTO_INCREMENT,
43
46 -- task set util cap 44 -- task set util cap
47 ts_util REAL, 45 ts_util REAL,
48 46
@@ -51,6 +49,7 @@ CREATE TABLE dp_pending(
51 cpu_cluster_size INTEGER, 49 cpu_cluster_size INTEGER,
52 ngpu INTEGER, 50 ngpu INTEGER,
53 gpu_cluster_size INTEGER, 51 gpu_cluster_size INTEGER,
52 is_release_master INTEGER,
54 53
55 -- overheads config 54 -- overheads config
56 is_worst_case INTEGER, 55 is_worst_case INTEGER,
@@ -77,25 +76,27 @@ CREATE TABLE dp_pending(
77 ncopy_engines INTEGER, 76 ncopy_engines INTEGER,
78 chunk_size INTEGER, 77 chunk_size INTEGER,
79 78
79 PRIMARY KEY (id)
80 -- every permutation marks a unique configuration 80 -- every permutation marks a unique configuration
81 PRIMARY KEY(ts_util, 81 -- PRIMARY KEY(ts_util,
82 ncpu, cpu_cluster_size, ngpu, gpu_cluster_size, 82 -- ncpu, cpu_cluster_size, ngpu, gpu_cluster_size, is_release_master,
83 is_worst_case, is_polluters, wss_size, 83 -- is_worst_case, is_polluters, wss_size,
84 util_dist, period_dist, data_dist, state_dist, kernel_dist, cpu_dist, 84 -- util_dist, period_dist, data_dist, state_dist, kernel_dist, cpu_dist,
85 gpu_population, 85 -- gpu_population,
86 rho, is_dgl, is_p2p, ncopy_engines, chunk_size) 86 -- rho, is_dgl, is_p2p, ncopy_engines, chunk_size)
87); 87) ENGINE=InnoDB;
88 88
89-- partially tested dps 89-- partially tested dps
90CREATE TABLE dp_ptested( 90CREATE TABLE dp_ptested(
91 -- auto-computed unique id for this 91 -- auto-computed unique id for this
92 id INTEGER PRIMARY KEY NOT NULL, 92 id INTEGER NOT NULL AUTO_INCREMENT,
93 93
94 -- platform parameters 94 -- platform parameters
95 ncpu INTEGER, 95 ncpu INTEGER,
96 cpu_cluster_size INTEGER, 96 cpu_cluster_size INTEGER,
97 ngpu INTEGER, 97 ngpu INTEGER,
98 gpu_cluster_size INTEGER, 98 gpu_cluster_size INTEGER,
99 is_release_master INTEGER,
99 100
100 -- overheads config 101 -- overheads config
101 is_worst_case INTEGER, 102 is_worst_case INTEGER,
@@ -122,13 +123,14 @@ CREATE TABLE dp_ptested(
122 ncopy_engines INTEGER, 123 ncopy_engines INTEGER,
123 chunk_size INTEGER, 124 chunk_size INTEGER,
124 125
126 PRIMARY KEY (id)
125 -- every permutation marks a unique configuration 127 -- every permutation marks a unique configuration
126 UNIQUE(ncpu, cpu_cluster_size, ngpu, gpu_cluster_size, 128 -- UNIQUE(ncpu, cpu_cluster_size, ngpu, gpu_cluster_size, is_release_master,
127 is_worst_case, is_polluters, wss_size, 129 -- is_worst_case, is_polluters, wss_size,
128 util_dist, period_dist, data_dist, state_dist, kernel_dist, cpu_dist, 130 -- util_dist, period_dist, data_dist, state_dist, kernel_dist, cpu_dist,
129 gpu_population, 131 -- gpu_population,
130 rho, is_dgl, is_p2p, ncopy_engines, chunk_size) 132 -- rho, is_dgl, is_p2p, ncopy_engines, chunk_size)
131); 133) ENGINE=InnoDB;
132 134
133CREATE TABLE sched_results( 135CREATE TABLE sched_results(
134 dp INTEGER, 136 dp INTEGER,
@@ -145,7 +147,7 @@ CREATE TABLE sched_results(
145 147
146 FOREIGN KEY(dp) REFERENCES dp_ptested(id), 148 FOREIGN KEY(dp) REFERENCES dp_ptested(id),
147 PRIMARY KEY(dp, ts_util) 149 PRIMARY KEY(dp, ts_util)
148); 150) ENGINE=InnoDB;
149 151
150CREATE TABLE scaled_sched_results( 152CREATE TABLE scaled_sched_results(
151 dp INTEGER, 153 dp INTEGER,
@@ -161,4 +163,4 @@ CREATE TABLE scaled_sched_results(
161 163
162 FOREIGN KEY(dp) REFERENCES dp_ptested(id), 164 FOREIGN KEY(dp) REFERENCES dp_ptested(id),
163 PRIMARY KEY(dp, eff_ts_util, scale_factor) 165 PRIMARY KEY(dp, eff_ts_util, scale_factor)
164); 166) ENGINE=InnoDB;
diff --git a/rtss14/database.py b/rtss14/database.py
index af6e966..cd5535d 100755
--- a/rtss14/database.py
+++ b/rtss14/database.py
@@ -5,8 +5,7 @@ import time
5import random 5import random
6import copy 6import copy
7import itertools 7import itertools
8import sqlite3 as lite 8import MySQLdb as db
9import json
10 9
11from schedcat.util.storage import storage 10from schedcat.util.storage import storage
12from generator import DesignPointGenerator 11from generator import DesignPointGenerator
@@ -17,15 +16,13 @@ timeout = 10*60 # 10 minute timeout on database locks
17max_fail = 30 # maximum number of times to reset a bad db connection 16max_fail = 30 # maximum number of times to reset a bad db connection
18distr_mapper = None # maps database distribution ids to strings 17distr_mapper = None # maps database distribution ids to strings
19dp_col_names = None # list of keys to extract from sched results to write to db 18dp_col_names = None # list of keys to extract from sched results to write to db
19dp_col_type_strs = None
20###################### 20######################
21 21
22def backoff(t): 22def backoff(t):
23 time_to_sleep = random.random() * t 23 time_to_sleep = random.random() * t
24 time.sleep(time_to_sleep) 24 time.sleep(time_to_sleep)
25 25
26def init_db(db_name):
27 lite.register_converter("JSON", json.loads)
28
29def init_distr_mapper(conn): 26def init_distr_mapper(conn):
30 global distr_mapper 27 global distr_mapper
31 global timeout 28 global timeout
@@ -34,7 +31,7 @@ def init_distr_mapper(conn):
34 return 31 return
35 32
36 distr_mapper = {} 33 distr_mapper = {}
37 c = conn.cursor() 34 c = conn.cursor(db.cursors.DictCursor)
38 start = time.time() 35 start = time.time()
39 while True: 36 while True:
40 try: 37 try:
@@ -44,17 +41,19 @@ def init_distr_mapper(conn):
44 distr_mapper[int(row['id'])] = str(row['name']) 41 distr_mapper[int(row['id'])] = str(row['name'])
45 distr_mapper[str(row['name'])] = int(row['id']) 42 distr_mapper[str(row['name'])] = int(row['id'])
46 break 43 break
47 except lite.OperationalError: 44 except db.OperationalError, e:
45 print e
48 elapsed = time.time() - start 46 elapsed = time.time() - start
49 if elapsed > timeout: 47 if elapsed > timeout:
50 raise 48 raise
51 else: 49 else:
52 backoff(10) 50 backoff(10)
53 pass 51 pass
54 52 c.close()
55 53
56def init_dp_col_names(conn): 54def init_dp_col_names(conn):
57 global dp_col_names 55 global dp_col_names
56 global dp_col_type_strs
58 global timeout 57 global timeout
59 58
60 if dp_col_names is not None and len(dp_col_names) > 0: 59 if dp_col_names is not None and len(dp_col_names) > 0:
@@ -65,16 +64,27 @@ def init_dp_col_names(conn):
65 while True: 64 while True:
66 # can't use pragma within transaction safely. so loop. 65 # can't use pragma within transaction safely. so loop.
67 try: 66 try:
68 c.execute('PRAGMA table_info(dp_pending)') 67 c.execute('SHOW COLUMNS FROM dp_pending')
69 dp_col_names = list(map(lambda x: x[1].encode('ascii', 'ignore'), c.fetchall())) 68 cols = c.fetchall()
69 dp_col_names = []
70 dp_col_type_strs = {}
71 for field in cols:
72 name = field[0].encode('ascii', 'ignore')
73 if(name == 'id'):
74 continue
75 dp_col_names.append(name)
76 form = '%s'
77 dp_col_type_strs[name] = form
70 break 78 break
71 except lite.OperationalError: 79 except db.OperationalError, e:
80 print e
72 elapsed = time.time() - start 81 elapsed = time.time() - start
73 if elapsed > timeout: 82 if elapsed > timeout:
74 raise 83 raise
75 else: 84 else:
76 backoff(10) 85 backoff(10)
77 pass 86 pass
87 c.close()
78 88
79def db_type(var): 89def db_type(var):
80 if type(var) is int or type(var) is bool: 90 if type(var) is int or type(var) is bool:
@@ -108,6 +118,7 @@ def dp_to_db(dp):
108 data['cpu_cluster_size'] = dp.ncpus / dp.nclusters 118 data['cpu_cluster_size'] = dp.ncpus / dp.nclusters
109 data['ngpu'] = dp.ngpus 119 data['ngpu'] = dp.ngpus
110 data['gpu_cluster_size'] = dp.ngpus / dp.ngclusters 120 data['gpu_cluster_size'] = dp.ngpus / dp.ngclusters
121 data['is_release_master'] = dp.release_master
111 data['is_worst_case'] = 1 if dp.ovh_type == 'max' else 0 122 data['is_worst_case'] = 1 if dp.ovh_type == 'max' else 0
112 data['is_polluters'] = dp.polluters 123 data['is_polluters'] = dp.polluters
113 data['wss_size'] = dp.wss 124 data['wss_size'] = dp.wss
@@ -138,6 +149,7 @@ def db_to_dp(data):
138 exp.nclusters = [exp.ncpus[0]/int(data['cpu_cluster_size'])] 149 exp.nclusters = [exp.ncpus[0]/int(data['cpu_cluster_size'])]
139 exp.ngpus = [int(data['ngpu'])] 150 exp.ngpus = [int(data['ngpu'])]
140 exp.ngclusters = [exp.ngpus[0]/int(data['gpu_cluster_size'])] 151 exp.ngclusters = [exp.ngpus[0]/int(data['gpu_cluster_size'])]
152 exp.release_master = [int(data['is_release_master'])]
141 exp.polluters = [int(data['is_polluters']) != 0] 153 exp.polluters = [int(data['is_polluters']) != 0]
142 exp.ovh_type = ['max' if int(data['is_worst_case']) == 1 else 'mean'] 154 exp.ovh_type = ['max' if int(data['is_worst_case']) == 1 else 'mean']
143 exp.rho = [int(data['rho'])] 155 exp.rho = [int(data['rho'])]
@@ -169,43 +181,44 @@ def dp_to_db_vals(dp):
169 vals[key] = to_db_val(value) 181 vals[key] = to_db_val(value)
170 return vals 182 return vals
171 183
172def begin_sync(conn): 184def begin_sync(conn, c):
173 # use a shorter timout because we'll retry with a new db connection instead 185 # use a shorter timout because we'll retry with a new db connection instead
174 transaction_timeout = 5*60 186 transaction_timeout = 5*60
175 start = time.time() 187 start = time.time()
176 while True: 188 while True:
177 try: 189 try:
178 conn.execute('BEGIN EXCLUSIVE TRANSACTION') 190 conn.autocommit(False)
179 # Force a write lock to be acquired before we do 191 c.execute('START TRANSACTION')
180 # anything more. I hate sqlite.
181 break 192 break
182 except lite.OperationalError: 193 except db.OperationalError, e:
194 print e
183 elapsed = time.time() - start 195 elapsed = time.time() - start
184 if elapsed > transaction_timeout: 196 if elapsed > transaction_timeout:
185 raise 197 raise
186 else: 198 else:
187 backoff(20) 199 backoff(20)
188 pass 200 pass
189 return conn 201 return c
190 202
191def end_sync(conn): 203def end_sync(conn):
192 success = conn.commit() 204 success = conn.commit()
205 conn.autocommit(True)
193 return success 206 return success
194 207
195 208
196def connect_db(db_name, isolation = 'EXCLUSIVE'): 209def connect_db(db_name):
197 global distr_mapper 210 global distr_mapper
198 global dp_col_names 211 global dp_col_names
212 global dp_col_type_strs
199 global timeout 213 global timeout
200 214
201 init_db(db_name)
202
203 start = time.time() 215 start = time.time()
204 while True: 216 while True:
205 try: 217 try:
206 conn = lite.connect(db_name, detect_types=lite.PARSE_DECLTYPES|lite.PARSE_COLNAMES, isolation_level=isolation) 218 conn = db.connect('mydb.cs.unc.edu', 'gelliott', 'G1ennDB', db_name);
207 break 219 break
208 except lite.OperationalError: 220 except db.OperationalError, e:
221 print e
209 elapsed = time.time() - start 222 elapsed = time.time() - start
210 # just give up :( 223 # just give up :(
211 if elapsed > timeout: 224 if elapsed > timeout:
@@ -213,9 +226,6 @@ def connect_db(db_name, isolation = 'EXCLUSIVE'):
213 else: 226 else:
214 backoff(10) 227 backoff(10)
215 pass 228 pass
216 conn.row_factory = lite.Row
217 # enable foreign keys
218 conn.execute('PRAGMA FOREIGN_KEYS=ON')
219 229
220 if distr_mapper is None: 230 if distr_mapper is None:
221 init_distr_mapper(conn) 231 init_distr_mapper(conn)
@@ -232,17 +242,18 @@ def clear_tables(db_name):
232 with conn: 242 with conn:
233 c = conn.cursor() 243 c = conn.cursor()
234 ################ 244 ################
235 begin_sync(conn) 245 begin_sync(conn, c)
236 for t in tables: 246 for t in tables:
237 c.execute('DELETE FROM %s' % t) 247 c.execute('DELETE FROM %s' % t)
238 end_sync(conn) 248 end_sync(conn)
249 c.close()
239 ################ 250 ################
240 251
241#def get_results(conn, dp, fields = '*', table = 'sched_results', extra = None): 252#def get_results(conn, dp, fields = '*', table = 'sched_results', extra = None):
242# where = ['%s=?' % k for k in dp.iterkeys()] 253# where = ['%s=?' % k for k in dp.iterkeys()]
243# 254#
244# ################ 255# ################
245# c = conn.cursor() 256# c = conn.cursor(db.cursors.DictCursor)
246# begin_sync(conn) 257# begin_sync(conn)
247# if extra: 258# if extra:
248# query = "SELECT %s FROM %s WHERE %s %s" % (", ".join(fields), table, " AND ".join(where), extra) 259# query = "SELECT %s FROM %s WHERE %s %s" % (", ".join(fields), table, " AND ".join(where), extra)
@@ -258,7 +269,7 @@ def clear_tables(db_name):
258# return data 269# return data
259 270
260#def count_results(conn, dp, table = 'sched_results'): 271#def count_results(conn, dp, table = 'sched_results'):
261# c = conn.cursor() 272# c = conn.cursor(db.cursors.DictCursor)
262# where = ['%s=?' % k for k in dp.iterkeys()] 273# where = ['%s=?' % k for k in dp.iterkeys()]
263# 274#
264# query = "SELECT COUNT(*) FROM %s WHERE %s" % (table, " AND ".join(where)) 275# query = "SELECT COUNT(*) FROM %s WHERE %s" % (table, " AND ".join(where))
@@ -272,35 +283,36 @@ def clear_tables(db_name):
272# 283#
273# return data[0] 284# return data[0]
274 285
275def __get_dp_id(conn, dp, add_if_missing = False): 286def __get_dp_id(c, dp, add_if_missing = False):
276 global dp_col_names 287 global dp_col_names
288 global dp_col_type_strs
277 seeking = dp_to_db(dp) 289 seeking = dp_to_db(dp)
278 seeking.pop('ts_util', None) 290 seeking.pop('ts_util', None)
279 query = 'SELECT id FROM dp_ptested WHERE %s' % ' AND '.join(map(lambda c: '%s=?' % c, seeking.iterkeys())) 291 query = 'SELECT id FROM dp_ptested WHERE %s' % ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys()))
280 c = conn.cursor()
281 c.execute(query, tuple(seeking.values())) 292 c.execute(query, tuple(seeking.values()))
282 data = c.fetchone() 293 row = c.fetchone()
283 if data: 294 if row:
284 return data[0] 295 return row['id']
285 elif add_if_missing == False: 296 elif add_if_missing == False:
286 return None 297 return None
287 # add the design point 298 # add the design point
288 col_names = copy.deepcopy(dp_col_names) 299 col_names = copy.deepcopy(dp_col_names)
289 col_names.pop(0) # remove the id field 300 col_names.pop(0) # remove the id field
290 c.execute('INSERT INTO dp_ptested VALUES(NULL,%s)' % ','.join(['?']*len(col_names)), 301 c.execute('INSERT INTO dp_ptested VALUES(NULL,%s)' % ','.join(['%s']*len(col_names)),
291 tuple(map(lambda x: seeking[x], col_names))) 302 tuple(map(lambda x: seeking[x], col_names)))
292 # get the assigned id 303 # get the assigned id
293 c.execute(query, tuple(seeking.values())) 304 c.execute(query, tuple(seeking.values()))
294 data = c.fetchone() 305 row = c.fetchone()
295 return data[0] 306 return row['id']
296 307
297def __already_pending(dp, conn): 308def __already_pending(dp, conn):
298 seeking = dp_to_db(dp) 309 seeking = dp_to_db(dp)
299 c = conn.cursor() 310 c = conn.cursor()
300 c.execute('SELECT COUNT(*) FROM dp_pending WHERE %s' % ' AND '.join(map(lambda x: '%s=?' % x, seeking.iterkeys())), 311 c.execute('SELECT COUNT(*) FROM dp_pending WHERE %s' % ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())),
301 tuple(seeking.values())) 312 tuple(seeking.values()))
302 data = c.fetchone() 313 row = c.fetchone()
303 processed = bool(data[0]) 314 processed = bool(row[0])
315 c.close()
304 return processed 316 return processed
305 317
306def already_pending(dp, conn = None, db_name = None): 318def already_pending(dp, conn = None, db_name = None):
@@ -316,10 +328,11 @@ def __already_processed(dp, conn):
316 seeking.pop('ts_util', None) 328 seeking.pop('ts_util', None)
317 c = conn.cursor() 329 c = conn.cursor()
318 c.execute('SELECT COUNT(*) FROM sched_results as R JOIN dp_ptested as K on R.dp=K.id ' 330 c.execute('SELECT COUNT(*) FROM sched_results as R JOIN dp_ptested as K on R.dp=K.id '
319 'WHERE R.ts_util=? AND %s' % ' AND '.join(map(lambda x: 'K.%s=?' % x, seeking.iterkeys())), 331 'WHERE R.ts_util=%%s AND %s' % ' AND '.join(map(lambda x: 'K.%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())),
320 ((dp.sys_util,) + tuple(seeking.values()))) 332 ((dp.sys_util,) + tuple(seeking.values())))
321 data = c.fetchone() 333 row = c.fetchone()
322 processed = bool(data[0]) 334 processed = bool(row[0])
335 c.close()
323 return processed 336 return processed
324 337
325def already_processed(dp, conn = None, db_name = None): 338def already_processed(dp, conn = None, db_name = None):
@@ -332,6 +345,7 @@ def already_processed(dp, conn = None, db_name = None):
332 345
333def store_design_points(db_name, dps, clean): 346def store_design_points(db_name, dps, clean):
334 global dp_col_names 347 global dp_col_names
348 global dp_col_type_strs
335 349
336 conn = connect_db(db_name) 350 conn = connect_db(db_name)
337 npending = 0 351 npending = 0
@@ -345,22 +359,21 @@ def store_design_points(db_name, dps, clean):
345 dps = [dp_to_db(dp) for dp in dps] 359 dps = [dp_to_db(dp) for dp in dps]
346 360
347 ################ 361 ################
348 begin_sync(conn)
349
350 # convert dicts into correctly-ordered list and insert 362 # convert dicts into correctly-ordered list and insert
351 c.executemany('INSERT INTO dp_pending VALUES(%s)' % ','.join(['?']*len(dp_col_names)), 363 c.executemany('INSERT INTO dp_pending VALUES(NULL,%s)' % ','.join(['%s']*len(dp_col_names)),
352 [tuple(map(lambda x: d[x], dp_col_names)) for d in dps]) 364 [tuple(map(lambda x: d[x], dp_col_names)) for d in dps])
353 # complete 365 # complete
354 c.execute('SELECT COUNT(*) FROM dp_pending') 366 c.execute('SELECT COUNT(*) FROM dp_pending')
355 npending = c.fetchone()[0] 367 npending = c.fetchone()[0]
356
357 end_sync(conn)
358 ############## 368 ##############
369
370 c.close()
359 return len(dps), npending 371 return len(dps), npending
360 372
361 373
362def get_design_points(db_name, ndp = 1): 374def get_design_points(db_name, ndp = 1):
363 global dp_col_names 375 global dp_col_names
376 global dp_col_type_strs
364 global max_fail 377 global max_fail
365 fetched = [] 378 fetched = []
366 379
@@ -380,16 +393,17 @@ def get_design_points(db_name, ndp = 1):
380 conn = connect_db(db_name) 393 conn = connect_db(db_name)
381 with conn: 394 with conn:
382 ################ 395 ################
383 begin_sync(conn) 396 c = conn.cursor(db.cursors.DictCursor)
384 c = conn.cursor() 397 begin_sync(conn, c)
385 c.execute('SELECT %s FROM dp_pending LIMIT ?' % (','.join(dp_col_names)), (ndp,)) 398# c.execute('LOCK TABLES dp_pending WRITE')
399 c.execute('SELECT %s FROM dp_pending LIMIT %%s FOR UPDATE' % (','.join(dp_col_names)), (ndp,))
386 dps = [db_to_dp(d) for d in c.fetchall()] 400 dps = [db_to_dp(d) for d in c.fetchall()]
387 nfetched = len(dps) if dps else 0 401 nfetched = len(dps) if dps else 0
388 if nfetched > 0: 402 if nfetched > 0:
389 temp = [dp_to_db(d) for d in dps] 403 temp = [dp_to_db(d) for d in dps]
390 for i in xrange(nfetched): 404 for i in xrange(nfetched):
391 c.execute('DELETE FROM dp_pending ' 405 c.execute('DELETE FROM dp_pending '
392 'WHERE %s' % ' AND '.join(map(lambda c: '%s=?'%c, dp_col_names)), 406 'WHERE %s' % ' AND '.join(map(lambda x: '%s=%s'%(x, dp_col_type_strs[x]), dp_col_names)),
393 tuple(map(lambda x: temp[i][x], dp_col_names))) 407 tuple(map(lambda x: temp[i][x], dp_col_names)))
394 if c.rowcount == 1: 408 if c.rowcount == 1:
395 fetched.append(dps[i]) 409 fetched.append(dps[i])
@@ -399,11 +413,14 @@ def get_design_points(db_name, ndp = 1):
399 print 'deleted too many rows... dropping.' 413 print 'deleted too many rows... dropping.'
400 if len(fetched) == 0: 414 if len(fetched) == 0:
401 retry = True 415 retry = True
416# c.execute('UNLOCK TABLES')
402 end_sync(conn) 417 end_sync(conn)
418 c.close()
403 ############## 419 ##############
404 # success! 420 # success!
405 break 421 break
406 except lite.OperationalError: 422 except db.OperationalError, e:
423 print e
407 # make sure the db connection is closed 424 # make sure the db connection is closed
408 if conn is not None: 425 if conn is not None:
409 conn.close() 426 conn.close()
@@ -416,9 +433,7 @@ def get_design_points(db_name, ndp = 1):
416 else: 433 else:
417 time_to_sleep = (random.random()*20 + 5)*failcount 434 time_to_sleep = (random.random()*20 + 5)*failcount
418 time_to_sleep = min(time_to_sleep, 4*60) 435 time_to_sleep = min(time_to_sleep, 4*60)
419 # flush lustre cache???
420 print 'retrying get_design_points from scratch. sleep = %.3f' % time_to_sleep 436 print 'retrying get_design_points from scratch. sleep = %.3f' % time_to_sleep
421 os.system('sync')
422 time.sleep(time_to_sleep) 437 time.sleep(time_to_sleep)
423 438
424 if failcount > 0: 439 if failcount > 0:
@@ -430,19 +445,16 @@ def get_design_points(db_name, ndp = 1):
430 del conn 445 del conn
431 conn = None 446 conn = None
432 447
433 os.system('sync')
434
435 return fetched 448 return fetched
436 449
437 450
438def __store_eff_sched_results(conn, dp_id, stats): 451def __store_eff_sched_results(c, dp_id, stats):
439 c = conn.cursor()
440 for factor,eff_curve in stats.iteritems(): 452 for factor,eff_curve in stats.iteritems():
441 for eff_ts_util,sched in eff_curve.iteritems(): 453 for eff_ts_util,sched in eff_curve.iteritems():
442 # do prior results exist? 454 # do prior results exist?
443 c.execute('SELECT ntested, nsched, avg_sched, avg_tard, avg_bandwidth ' 455 c.execute('SELECT ntested, nsched, avg_sched, avg_tard, avg_bandwidth '
444 'FROM scaled_sched_results ' 456 'FROM scaled_sched_results '
445 'WHERE dp=? AND eff_ts_util=? AND scale_factor=?', 457 'WHERE dp=%s AND eff_ts_util=%s AND scale_factor=%s FOR UPDATE',
446 (dp_id, eff_ts_util, factor)) 458 (dp_id, eff_ts_util, factor))
447 row = c.fetchone() 459 row = c.fetchone()
448 if row: 460 if row:
@@ -459,16 +471,17 @@ def __store_eff_sched_results(conn, dp_id, stats):
459 sched.ntested += rntested 471 sched.ntested += rntested
460 sched.nsched += rnsched 472 sched.nsched += rnsched
461 c.execute('UPDATE scaled_sched_results ' 473 c.execute('UPDATE scaled_sched_results '
462 'SET ntested=?, nsched=?, avg_sched=?, avg_tard=?, avg_bandwidth=? ' 474 'SET ntested=%s, nsched=%s, avg_sched=%s, avg_tard=%s, avg_bandwidth=%s '
463 'WHERE dp=? AND eff_ts_util=? AND scale_factor=?', 475 'WHERE dp=%s AND eff_ts_util=%s AND scale_factor=%s',
464 (sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth, dp_id, eff_ts_util, factor)) 476 (sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth, dp_id, eff_ts_util, factor))
465 else: 477 else:
466 c.execute('INSERT INTO scaled_sched_results ' 478 c.execute('INSERT INTO scaled_sched_results '
467 'VALUES(?,?,?,?,?,?,?,?)', 479 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)',
468 (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth)) 480 (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth))
469 481
470def store_sched_results(db_name, data, ndp = 0): 482def store_sched_results(db_name, data, ndp = 0):
471 global dp_col_names 483 global dp_col_names
484 global dp_col_type_strs
472 global max_fail 485 global max_fail
473 col_names = None 486 col_names = None
474 487
@@ -488,33 +501,32 @@ def store_sched_results(db_name, data, ndp = 0):
488 col_names.pop(0) # remove the id field 501 col_names.pop(0) # remove the id field
489 502
490 with conn: 503 with conn:
491 # if the db hangs, it's always here on the begin transaction... 504 c = conn.cursor(db.cursors.DictCursor)
492 begin_sync(conn)
493 505
494 # get IDs for all design points 506 # get IDs for all design points
495 dp_ids = [__get_dp_id(conn, d.dp, add_if_missing = True) for d in data] 507 dp_ids = [__get_dp_id(c, d.dp, add_if_missing = True) for d in data]
496 # insert the normal sched data in one go 508 # insert the normal sched data in one go
497 conn.executemany('INSERT INTO sched_results VALUES(?,?,?,?,?,?,?)', 509 c.executemany('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)',
498 [(dp_id, ts_util, stats.avg_sched, stats.ntested, stats.nsched, stats.avg_tard, stats.avg_bandwidth) 510 [(dp_id, ts_util, stats.avg_sched, stats.ntested, stats.nsched, stats.avg_tard, stats.avg_bandwidth)
499 for dp_id, ts_util, stats in zip(dp_ids, [d.dp.sys_util for d in data], [d.sched_stats for d in data])]) 511 for dp_id, ts_util, stats in zip(dp_ids, [d.dp.sys_util for d in data], [d.sched_stats for d in data])])
500 512
501 if d.eff_sched_stats is not None: 513 if d.eff_sched_stats is not None:
514 begin_sync(conn, c)
502 for dp_id, d in zip(dp_ids, data): 515 for dp_id, d in zip(dp_ids, data):
503 __store_eff_sched_results(conn, dp_id, d.eff_sched_stats) 516 __store_eff_sched_results(c, dp_id, d.eff_sched_stats)
517 end_sync(conn)
504 518
505 # try to fetch the next design points while we hold the db lock
506 # not the best thing to do for concurrency, but lustre's locking
507 # is very slow/costly.
508 if ndp > 0: 519 if ndp > 0:
509 c = conn.cursor() 520 begin_sync(conn, c)
510 c.execute('SELECT %s FROM dp_pending LIMIT ?' % (','.join(dp_col_names)), (ndp,)) 521# c.execute('LOCK TABLES dp_pending WRITE')
522 c.execute('SELECT %s FROM dp_pending LIMIT %%s FOR UPDATE' % (','.join(dp_col_names)), (ndp,))
511 dps = [db_to_dp(d) for d in c.fetchall()] 523 dps = [db_to_dp(d) for d in c.fetchall()]
512 nfetched = len(dps) if dps else 0 524 nfetched = len(dps) if dps else 0
513 if nfetched > 0: 525 if nfetched > 0:
514 temp = [dp_to_db(d) for d in dps] 526 temp = [dp_to_db(d) for d in dps]
515 for i in xrange(nfetched): 527 for i in xrange(nfetched):
516 c.execute('DELETE FROM dp_pending ' 528 c.execute('DELETE FROM dp_pending '
517 'WHERE %s' % ' AND '.join(map(lambda c: '%s=?'%c, dp_col_names)), 529 'WHERE %s' % ' AND '.join(map(lambda x: '%s=%s'%(x, dp_col_type_strs[x]), dp_col_names)),
518 tuple(map(lambda x: temp[i][x], dp_col_names))) 530 tuple(map(lambda x: temp[i][x], dp_col_names)))
519 if c.rowcount == 1: 531 if c.rowcount == 1:
520 fetched.append(dps[i]) 532 fetched.append(dps[i])
@@ -522,12 +534,13 @@ def store_sched_results(db_name, data, ndp = 0):
522 print 'store_sched_results: raced for design point. dropping.' 534 print 'store_sched_results: raced for design point. dropping.'
523 else: 535 else:
524 print 'store_sched_results: deleted too many rows... dropping.' 536 print 'store_sched_results: deleted too many rows... dropping.'
525 # commit all changes 537# c.execute('UNLOCK TABLES')
526 end_sync(conn) 538 end_sync(conn)
527 539 c.close()
528 # success! 540 # success!
529 break 541 break
530 except lite.OperationalError: 542 except db.OperationalError, e:
543 print e
531 if conn is not None: 544 if conn is not None:
532 conn.close() 545 conn.close()
533 del conn 546 del conn
@@ -540,7 +553,6 @@ def store_sched_results(db_name, data, ndp = 0):
540 time_to_sleep = (random.random()*20 + 5)*failcount 553 time_to_sleep = (random.random()*20 + 5)*failcount
541 time_to_sleep = min(time_to_sleep, 4*60) 554 time_to_sleep = min(time_to_sleep, 4*60)
542 print 'retrying store_sched_results from scratch. sleep = %.3f' % time_to_sleep 555 print 'retrying store_sched_results from scratch. sleep = %.3f' % time_to_sleep
543 os.system('sync')
544 time.sleep(time_to_sleep) 556 time.sleep(time_to_sleep)
545 557
546 if failcount > 0: 558 if failcount > 0:
@@ -552,6 +564,4 @@ def store_sched_results(db_name, data, ndp = 0):
552 del conn 564 del conn
553 conn = None 565 conn = None
554 566
555 os.system('sync')
556
557 return fetched 567 return fetched
diff --git a/rtss14/rtss14.py b/rtss14/rtss14.py
index 6d12361..09529c2 100755
--- a/rtss14/rtss14.py
+++ b/rtss14/rtss14.py
@@ -10,7 +10,8 @@ import math
10import time 10import time
11import inspect 11import inspect
12 12
13import sqlite3 as lite 13#import sqlite3 as lite
14import MySQLdb
14 15
15 16
16 17
@@ -128,6 +129,7 @@ def create_gpu_task_set(dp, overheads = None):
128 else: 129 else:
129 ts[i].uses_gpu = False 130 ts[i].uses_gpu = False
130 ts[i].nrequests = 0 131 ts[i].nrequests = 0
132 ts[i].nengine_requests = 0
131 133
132 for t in ts: 134 for t in ts:
133 t.wss = dp.wss 135 t.wss = dp.wss
@@ -174,14 +176,19 @@ def create_gpu_task_set(dp, overheads = None):
174 t.max_np_interval = max(t.max_np_interval, overheads.gpu_xmit.xmit_cost(XmitOverheads.H2D, min(t.stdata, dp.chunk_size))) 176 t.max_np_interval = max(t.max_np_interval, overheads.gpu_xmit.xmit_cost(XmitOverheads.H2D, min(t.stdata, dp.chunk_size)))
175 t.max_np_interval = max(t.max_np_interval, overheads.gpu_xmit.xmit_cost(XmitOverheads.D2H, min(t.stdata, dp.chunk_size))) 177 t.max_np_interval = max(t.max_np_interval, overheads.gpu_xmit.xmit_cost(XmitOverheads.D2H, min(t.stdata, dp.chunk_size)))
176 178
177 # one for token lock, one for exec engine, one for each chunk 179 t.nkernrequests = 1
180 t.nsendrequests = int(ceil(float(t.sdata)/dp.chunk_size))
181 t.nrecvrequests = int(ceil(float(t.rdata)/dp.chunk_size))
178 t.nstaterequests = int(ceil(float(t.stdata)/dp.chunk_size)) 182 t.nstaterequests = int(ceil(float(t.stdata)/dp.chunk_size))
179 t.nrequests = 1 + \ 183 if not dp.p2p:
180 1 + \ 184 t.nstaterequests *= 2
181 int(ceil(float(t.sdata)/dp.chunk_size)) + \ 185 if dp.rho > 0:
182 int(ceil(float(t.rdata)/dp.chunk_size)) + \ 186 t.ntokenrequests = 1
183 t.nstaterequests 187 else:
184 188 t.ntokenrequests = 0
189 t.nengine_requests = t.nkernrequests + t.nsendrequests + t.nrecvrequests + t.nstaterequests
190 t.nrequests = t.nengine_requests + t.ntokenrequests
191
185 return ts 192 return ts
186 193
187def complete(results, n): 194def complete(results, n):
@@ -347,9 +354,11 @@ def is_schedulable(ts, dp, overheads):
347 if len(g) <= tokensPerClstr or gsize == 1: 354 if len(g) <= tokensPerClstr or gsize == 1:
348 for t in g: 355 for t in g:
349 if t.stdata != 0: 356 if t.stdata != 0:
357 t.nrequests -= t.nstaterequests
358 t.nengine_requests -= t.nstaterequests
359 t.nstaterequests = 0
350 t.stdata = 0 360 t.stdata = 0
351 t.stcost = 0.0 361 t.stcost = 0.0
352 t.nrequests -= t.nstaterequests
353 362
354 # do we overutilize any of the clusters, or does migration 363 # do we overutilize any of the clusters, or does migration
355 # cause per-task constraint violations? 364 # cause per-task constraint violations?
@@ -400,7 +409,7 @@ def is_schedulable(ts, dp, overheads):
400 if not p or len(p) == 0: 409 if not p or len(p) == 0:
401 continue 410 continue
402 411
403 if not jlfp.charge_scheduling_overheads(overheads, size, False, p): 412 if not jlfp.charge_scheduling_overheads(overheads, size, False, p, clusters, gclusters, dp):
404# print 'failed in overhead charging' 413# print 'failed in overhead charging'
405 success = False 414 success = False
406 break 415 break
@@ -714,7 +723,7 @@ def process_design_points(args):
714 723
715 nfinished += nchunk 724 nfinished += nchunk
716 725
717 except lite.OperationalError: 726 except MySQLdb.OperationalError, e:
718 print '%d: CRAP. Database Error while %s' % (os.getpid(), 'getting work.' if dps is None else 'storing results.') 727 print '%d: CRAP. Database Error while %s' % (os.getpid(), 'getting work.' if dps is None else 'storing results.')
719 print traceback.format_exc() 728 print traceback.format_exc()
720 allend = time.time() 729 allend = time.time()
@@ -731,16 +740,17 @@ def TEST_get_dp_space(cpus):
731 # system parameters 740 # system parameters
732 exp.host = ['bonham'] 741 exp.host = ['bonham']
733 exp.ncpus = [12] 742 exp.ncpus = [12]
734# exp.nclusters = [1, 2, 12]
735 exp.nclusters = [1, 2, 12] 743 exp.nclusters = [1, 2, 12]
744# exp.nclusters = [12]
736 exp.ngpus = [8] 745 exp.ngpus = [8]
737# exp.ngclusters = [1, 4, 8]
738 exp.ngclusters = [1, 4, 8] 746 exp.ngclusters = [1, 4, 8]
747# exp.ngclusters = [8]
748 exp.release_master = [False]
739 exp.ovh_type = ['mean'] 749 exp.ovh_type = ['mean']
740 exp.polluters = [True] 750 exp.polluters = [True]
741 751
742 # gpusync config variables 752 # gpusync config variables
743 exp.rho = [0,3] 753 exp.rho = [2]
744 exp.dgl = [True,False] 754 exp.dgl = [True,False]
745 exp.p2p = [True,False] 755 exp.p2p = [True,False]
746# exp.ncopy_engines = [1, 2] 756# exp.ncopy_engines = [1, 2]
@@ -748,7 +758,8 @@ def TEST_get_dp_space(cpus):
748 exp.chunk_size = [1*1024*1024] # 1MB (2MB?) 758 exp.chunk_size = [1*1024*1024] # 1MB (2MB?)
749 759
750 # task parameters 760 # task parameters
751 exp.sys_util = [0.5,5.0,10.0] 761# exp.sys_util = [0.5,5.0,10.0]
762 exp.sys_util = [5.0]
752# exp.task_util = ['u-uni-medium', 'u-uni-heavy'] 763# exp.task_util = ['u-uni-medium', 'u-uni-heavy']
753 exp.task_util = ['u-uni-medium'] 764 exp.task_util = ['u-uni-medium']
754 exp.period = ['p-uni-long'] 765 exp.period = ['p-uni-long']
@@ -775,7 +786,8 @@ def get_dp_space(cpus):
775 exp.nclusters = [1, 2, 12] 786 exp.nclusters = [1, 2, 12]
776 exp.ngpus = [8] 787 exp.ngpus = [8]
777# exp.ngclusters = [1, 2, 4, 8] 788# exp.ngclusters = [1, 2, 4, 8]
778 exp.ngclusters = [1, 4, 8] 789 exp.ngclusters = [1, 2, 4, 8]
790 exp.release_master = [False]
779 exp.polluters = [False, True] 791 exp.polluters = [False, True]
780 exp.ovh_type = ['mean', 'max'] 792 exp.ovh_type = ['mean', 'max']
781# exp.ovh_type = ['mean'] 793# exp.ovh_type = ['mean']
@@ -790,8 +802,8 @@ def get_dp_space(cpus):
790 802
791 # task parameters 803 # task parameters
792# step_size = 0.1 804# step_size = 0.1
793 step_size = 0.5 805# step_size = 0.5
794# step_size = 0.25 806 step_size = 0.25
795 start_pt = step_size 807 start_pt = step_size
796# start_pt = 10.0 808# start_pt = 10.0
797 exp.sys_util = [float(v) for v in arange(start_pt, cpus+step_size, step_size)] 809 exp.sys_util = [float(v) for v in arange(start_pt, cpus+step_size, step_size)]
@@ -807,9 +819,10 @@ def get_dp_space(cpus):
807 exp.kern = ['k-uni-light', 'k-uni-medium', 'k-uni-heavy'] 819 exp.kern = ['k-uni-light', 'k-uni-medium', 'k-uni-heavy']
808# exp.cpu_csx = ['c-const-light', 'c-const-medium', 'c-const-heavy'] 820# exp.cpu_csx = ['c-const-light', 'c-const-medium', 'c-const-heavy']
809 exp.cpu_csx = ['c-const-light'] 821 exp.cpu_csx = ['c-const-light']
810# exp.data = ['d-const-light', 'd-const-medium', 'd-const-heavy', 'd-const-very-heavy'] 822# exp.data = ['d-const-light', 'd-const-medium', 'd-const-heavy', 'd-const-very-heavy']
811 exp.data = ['d-const-light', 'd-const-medium', 'd-const-heavy'] 823 exp.data = ['d-uni-light', 'd-uni-medium', 'd-uni-heavy']
812 exp.state = ['s-const-zero', 's-const-light', 's-const-medium'] 824# exp.state = ['s-const-zero', 's-const-light', 's-const-medium', 's-const-heavy']
825 exp.state = ['s-const-zero', 's-uni-light', 's-uni-medium']
813 826
814 return exp 827 return exp
815 828
@@ -844,8 +857,8 @@ def main():
844 host = 'bonham' 857 host = 'bonham'
845 858
846 cpus = 12.0 859 cpus = 12.0
847 exp = get_dp_space(cpus) 860# exp = get_dp_space(cpus)
848# exp = TEST_get_dp_space(cpus) 861 exp = TEST_get_dp_space(cpus)
849 862
850 def valid(dp): 863 def valid(dp):
851 # filter out gpus shared between cpu clusters for now 864 # filter out gpus shared between cpu clusters for now
@@ -873,18 +886,47 @@ def main():
873 886
874 return True 887 return True
875 888
876 design_points = [dp for dp in DesignPointGenerator(exp, is_valid = valid)] 889 if not args.pretend and not args.resume:
877 ndp = len(design_points) 890 db.clear_tables(args.database)
891
892 # load design points incrementally as to not overload memory
893 gen = DesignPointGenerator(exp, is_valid = valid)
894 dp_chunk_sz = 10000 # number of design points to load at a time
895 ndp = 0
878 if not args.pretend: 896 if not args.pretend:
879 random.shuffle(design_points) 897 nstored = 0
880 if not args.resume: 898 npending = 0
881 db.clear_tables(args.database) 899 while True:
882 nstored, npending = db.store_design_points(args.database, design_points, clean = not args.resume) 900 dp_chunk = []
883 print "Loaded %d of %d design points. (%d were completed or pending, %d now pending)" % ( 901 for _ in range(dp_chunk_sz):
884 nstored, ndp, ndp - nstored, npending) 902 try:
903 dp_chunk.append(gen.next())
904 except StopIteration:
905 break
906 if len(dp_chunk) == 0:
907 break
908 ndp += len(dp_chunk)
909 random.shuffle(dp_chunk)
910 tmpnstored, npending = db.store_design_points(args.database, dp_chunk, clean = not args.resume)
911 nstored += tmpnstored
912 print "Loaded %d of %d design points. (%d were completed or pending, %d now pending)" % (nstored, ndp, ndp - nstored, npending)
885 else: 913 else:
914 for _ in gen:
915 ndp += 1
886 print "%d design points planned" % ndp 916 print "%d design points planned" % ndp
887 917
918# design_points = [dp for dp in DesignPointGenerator(exp, is_valid = valid)]
919# ndp = len(design_points)
920# if not args.pretend:
921# random.shuffle(design_points)
922# if not args.resume:
923# db.clear_tables(args.database)
924# nstored, npending = db.store_design_points(args.database, design_points, clean = not args.resume)
925# print "Loaded %d of %d design points. (%d were completed or pending, %d now pending)" % (
926# nstored, ndp, ndp - nstored, npending)
927# else:
928# print "%d design points planned" % ndp
929
888 if args.pretend or args.initonly: 930 if args.pretend or args.initonly:
889 exit(0) 931 exit(0)
890 932