diff options
author | Glenn Elliott <gelliott@cs.unc.edu> | 2014-09-21 18:35:53 -0400 |
---|---|---|
committer | Glenn Elliott <gelliott@cs.unc.edu> | 2014-09-21 18:35:53 -0400 |
commit | 0d0de82d3994f3e737925e1e6a3e4d403375e529 (patch) | |
tree | f6d883c29fc61bf089a7232328841fc990306317 | |
parent | d3efc4e3241fdc1d2ec8524a424a29895dc86a22 (diff) |
Move to MySQL
-rw-r--r-- | rtss14/createtables.sql | 50 | ||||
-rwxr-xr-x | rtss14/database.py | 172 | ||||
-rwxr-xr-x | rtss14/rtss14.py | 104 |
3 files changed, 190 insertions, 136 deletions
diff --git a/rtss14/createtables.sql b/rtss14/createtables.sql index 45490d3..a072aa4 100644 --- a/rtss14/createtables.sql +++ b/rtss14/createtables.sql | |||
@@ -1,7 +1,8 @@ | |||
1 | CREATE TABLE distrs( | 1 | CREATE TABLE distrs( |
2 | id INTEGER PRIMARY KEY NOT NULL, | 2 | id INTEGER NOT NULL AUTO_INCREMENT, |
3 | name TEXT | 3 | name TEXT, |
4 | ); | 4 | PRIMARY KEY (id) |
5 | ) ENGINE=InnoDB; | ||
5 | -- util | 6 | -- util |
6 | INSERT INTO distrs(name) VALUES('u-uni-light'); | 7 | INSERT INTO distrs(name) VALUES('u-uni-light'); |
7 | INSERT INTO distrs(name) VALUES('u-uni-medium'); | 8 | INSERT INTO distrs(name) VALUES('u-uni-medium'); |
@@ -37,12 +38,9 @@ INSERT INTO distrs(name) VALUES('c-const-light'); | |||
37 | INSERT INTO distrs(name) VALUES('c-const-medium'); | 38 | INSERT INTO distrs(name) VALUES('c-const-medium'); |
38 | INSERT INTO distrs(name) VALUES('c-const-heavy'); | 39 | INSERT INTO distrs(name) VALUES('c-const-heavy'); |
39 | 40 | ||
40 | --CREATE TABLE dummy( | ||
41 | -- pid INTEGER | ||
42 | --); | ||
43 | --INSERT INTO dummy(pid) VALUES(0); | ||
44 | |||
45 | CREATE TABLE dp_pending( | 41 | CREATE TABLE dp_pending( |
42 | id INTEGER NOT NULL AUTO_INCREMENT, | ||
43 | |||
46 | -- task set util cap | 44 | -- task set util cap |
47 | ts_util REAL, | 45 | ts_util REAL, |
48 | 46 | ||
@@ -51,6 +49,7 @@ CREATE TABLE dp_pending( | |||
51 | cpu_cluster_size INTEGER, | 49 | cpu_cluster_size INTEGER, |
52 | ngpu INTEGER, | 50 | ngpu INTEGER, |
53 | gpu_cluster_size INTEGER, | 51 | gpu_cluster_size INTEGER, |
52 | is_release_master INTEGER, | ||
54 | 53 | ||
55 | -- overheads config | 54 | -- overheads config |
56 | is_worst_case INTEGER, | 55 | is_worst_case INTEGER, |
@@ -77,25 +76,27 @@ CREATE TABLE dp_pending( | |||
77 | ncopy_engines INTEGER, | 76 | ncopy_engines INTEGER, |
78 | chunk_size INTEGER, | 77 | chunk_size INTEGER, |
79 | 78 | ||
79 | PRIMARY KEY (id) | ||
80 | -- every permutation marks a unique configuration | 80 | -- every permutation marks a unique configuration |
81 | PRIMARY KEY(ts_util, | 81 | -- PRIMARY KEY(ts_util, |
82 | ncpu, cpu_cluster_size, ngpu, gpu_cluster_size, | 82 | -- ncpu, cpu_cluster_size, ngpu, gpu_cluster_size, is_release_master, |
83 | is_worst_case, is_polluters, wss_size, | 83 | -- is_worst_case, is_polluters, wss_size, |
84 | util_dist, period_dist, data_dist, state_dist, kernel_dist, cpu_dist, | 84 | -- util_dist, period_dist, data_dist, state_dist, kernel_dist, cpu_dist, |
85 | gpu_population, | 85 | -- gpu_population, |
86 | rho, is_dgl, is_p2p, ncopy_engines, chunk_size) | 86 | -- rho, is_dgl, is_p2p, ncopy_engines, chunk_size) |
87 | ); | 87 | ) ENGINE=InnoDB; |
88 | 88 | ||
89 | -- partially tested dps | 89 | -- partially tested dps |
90 | CREATE TABLE dp_ptested( | 90 | CREATE TABLE dp_ptested( |
91 | -- auto-computed unique id for this | 91 | -- auto-computed unique id for this |
92 | id INTEGER PRIMARY KEY NOT NULL, | 92 | id INTEGER NOT NULL AUTO_INCREMENT, |
93 | 93 | ||
94 | -- platform parameters | 94 | -- platform parameters |
95 | ncpu INTEGER, | 95 | ncpu INTEGER, |
96 | cpu_cluster_size INTEGER, | 96 | cpu_cluster_size INTEGER, |
97 | ngpu INTEGER, | 97 | ngpu INTEGER, |
98 | gpu_cluster_size INTEGER, | 98 | gpu_cluster_size INTEGER, |
99 | is_release_master INTEGER, | ||
99 | 100 | ||
100 | -- overheads config | 101 | -- overheads config |
101 | is_worst_case INTEGER, | 102 | is_worst_case INTEGER, |
@@ -122,13 +123,14 @@ CREATE TABLE dp_ptested( | |||
122 | ncopy_engines INTEGER, | 123 | ncopy_engines INTEGER, |
123 | chunk_size INTEGER, | 124 | chunk_size INTEGER, |
124 | 125 | ||
126 | PRIMARY KEY (id) | ||
125 | -- every permutation marks a unique configuration | 127 | -- every permutation marks a unique configuration |
126 | UNIQUE(ncpu, cpu_cluster_size, ngpu, gpu_cluster_size, | 128 | -- UNIQUE(ncpu, cpu_cluster_size, ngpu, gpu_cluster_size, is_release_master, |
127 | is_worst_case, is_polluters, wss_size, | 129 | -- is_worst_case, is_polluters, wss_size, |
128 | util_dist, period_dist, data_dist, state_dist, kernel_dist, cpu_dist, | 130 | -- util_dist, period_dist, data_dist, state_dist, kernel_dist, cpu_dist, |
129 | gpu_population, | 131 | -- gpu_population, |
130 | rho, is_dgl, is_p2p, ncopy_engines, chunk_size) | 132 | -- rho, is_dgl, is_p2p, ncopy_engines, chunk_size) |
131 | ); | 133 | ) ENGINE=InnoDB; |
132 | 134 | ||
133 | CREATE TABLE sched_results( | 135 | CREATE TABLE sched_results( |
134 | dp INTEGER, | 136 | dp INTEGER, |
@@ -145,7 +147,7 @@ CREATE TABLE sched_results( | |||
145 | 147 | ||
146 | FOREIGN KEY(dp) REFERENCES dp_ptested(id), | 148 | FOREIGN KEY(dp) REFERENCES dp_ptested(id), |
147 | PRIMARY KEY(dp, ts_util) | 149 | PRIMARY KEY(dp, ts_util) |
148 | ); | 150 | ) ENGINE=InnoDB; |
149 | 151 | ||
150 | CREATE TABLE scaled_sched_results( | 152 | CREATE TABLE scaled_sched_results( |
151 | dp INTEGER, | 153 | dp INTEGER, |
@@ -161,4 +163,4 @@ CREATE TABLE scaled_sched_results( | |||
161 | 163 | ||
162 | FOREIGN KEY(dp) REFERENCES dp_ptested(id), | 164 | FOREIGN KEY(dp) REFERENCES dp_ptested(id), |
163 | PRIMARY KEY(dp, eff_ts_util, scale_factor) | 165 | PRIMARY KEY(dp, eff_ts_util, scale_factor) |
164 | ); | 166 | ) ENGINE=InnoDB; |
diff --git a/rtss14/database.py b/rtss14/database.py index af6e966..cd5535d 100755 --- a/rtss14/database.py +++ b/rtss14/database.py | |||
@@ -5,8 +5,7 @@ import time | |||
5 | import random | 5 | import random |
6 | import copy | 6 | import copy |
7 | import itertools | 7 | import itertools |
8 | import sqlite3 as lite | 8 | import MySQLdb as db |
9 | import json | ||
10 | 9 | ||
11 | from schedcat.util.storage import storage | 10 | from schedcat.util.storage import storage |
12 | from generator import DesignPointGenerator | 11 | from generator import DesignPointGenerator |
@@ -17,15 +16,13 @@ timeout = 10*60 # 10 minute timeout on database locks | |||
17 | max_fail = 30 # maximum number of times to reset a bad db connection | 16 | max_fail = 30 # maximum number of times to reset a bad db connection |
18 | distr_mapper = None # maps database distribution ids to strings | 17 | distr_mapper = None # maps database distribution ids to strings |
19 | dp_col_names = None # list of keys to extract from sched results to write to db | 18 | dp_col_names = None # list of keys to extract from sched results to write to db |
19 | dp_col_type_strs = None | ||
20 | ###################### | 20 | ###################### |
21 | 21 | ||
22 | def backoff(t): | 22 | def backoff(t): |
23 | time_to_sleep = random.random() * t | 23 | time_to_sleep = random.random() * t |
24 | time.sleep(time_to_sleep) | 24 | time.sleep(time_to_sleep) |
25 | 25 | ||
26 | def init_db(db_name): | ||
27 | lite.register_converter("JSON", json.loads) | ||
28 | |||
29 | def init_distr_mapper(conn): | 26 | def init_distr_mapper(conn): |
30 | global distr_mapper | 27 | global distr_mapper |
31 | global timeout | 28 | global timeout |
@@ -34,7 +31,7 @@ def init_distr_mapper(conn): | |||
34 | return | 31 | return |
35 | 32 | ||
36 | distr_mapper = {} | 33 | distr_mapper = {} |
37 | c = conn.cursor() | 34 | c = conn.cursor(db.cursors.DictCursor) |
38 | start = time.time() | 35 | start = time.time() |
39 | while True: | 36 | while True: |
40 | try: | 37 | try: |
@@ -44,17 +41,19 @@ def init_distr_mapper(conn): | |||
44 | distr_mapper[int(row['id'])] = str(row['name']) | 41 | distr_mapper[int(row['id'])] = str(row['name']) |
45 | distr_mapper[str(row['name'])] = int(row['id']) | 42 | distr_mapper[str(row['name'])] = int(row['id']) |
46 | break | 43 | break |
47 | except lite.OperationalError: | 44 | except db.OperationalError, e: |
45 | print e | ||
48 | elapsed = time.time() - start | 46 | elapsed = time.time() - start |
49 | if elapsed > timeout: | 47 | if elapsed > timeout: |
50 | raise | 48 | raise |
51 | else: | 49 | else: |
52 | backoff(10) | 50 | backoff(10) |
53 | pass | 51 | pass |
54 | 52 | c.close() | |
55 | 53 | ||
56 | def init_dp_col_names(conn): | 54 | def init_dp_col_names(conn): |
57 | global dp_col_names | 55 | global dp_col_names |
56 | global dp_col_type_strs | ||
58 | global timeout | 57 | global timeout |
59 | 58 | ||
60 | if dp_col_names is not None and len(dp_col_names) > 0: | 59 | if dp_col_names is not None and len(dp_col_names) > 0: |
@@ -65,16 +64,27 @@ def init_dp_col_names(conn): | |||
65 | while True: | 64 | while True: |
66 | # can't use pragma within transaction safely. so loop. | 65 | # can't use pragma within transaction safely. so loop. |
67 | try: | 66 | try: |
68 | c.execute('PRAGMA table_info(dp_pending)') | 67 | c.execute('SHOW COLUMNS FROM dp_pending') |
69 | dp_col_names = list(map(lambda x: x[1].encode('ascii', 'ignore'), c.fetchall())) | 68 | cols = c.fetchall() |
69 | dp_col_names = [] | ||
70 | dp_col_type_strs = {} | ||
71 | for field in cols: | ||
72 | name = field[0].encode('ascii', 'ignore') | ||
73 | if(name == 'id'): | ||
74 | continue | ||
75 | dp_col_names.append(name) | ||
76 | form = '%s' | ||
77 | dp_col_type_strs[name] = form | ||
70 | break | 78 | break |
71 | except lite.OperationalError: | 79 | except db.OperationalError, e: |
80 | print e | ||
72 | elapsed = time.time() - start | 81 | elapsed = time.time() - start |
73 | if elapsed > timeout: | 82 | if elapsed > timeout: |
74 | raise | 83 | raise |
75 | else: | 84 | else: |
76 | backoff(10) | 85 | backoff(10) |
77 | pass | 86 | pass |
87 | c.close() | ||
78 | 88 | ||
79 | def db_type(var): | 89 | def db_type(var): |
80 | if type(var) is int or type(var) is bool: | 90 | if type(var) is int or type(var) is bool: |
@@ -108,6 +118,7 @@ def dp_to_db(dp): | |||
108 | data['cpu_cluster_size'] = dp.ncpus / dp.nclusters | 118 | data['cpu_cluster_size'] = dp.ncpus / dp.nclusters |
109 | data['ngpu'] = dp.ngpus | 119 | data['ngpu'] = dp.ngpus |
110 | data['gpu_cluster_size'] = dp.ngpus / dp.ngclusters | 120 | data['gpu_cluster_size'] = dp.ngpus / dp.ngclusters |
121 | data['is_release_master'] = dp.release_master | ||
111 | data['is_worst_case'] = 1 if dp.ovh_type == 'max' else 0 | 122 | data['is_worst_case'] = 1 if dp.ovh_type == 'max' else 0 |
112 | data['is_polluters'] = dp.polluters | 123 | data['is_polluters'] = dp.polluters |
113 | data['wss_size'] = dp.wss | 124 | data['wss_size'] = dp.wss |
@@ -138,6 +149,7 @@ def db_to_dp(data): | |||
138 | exp.nclusters = [exp.ncpus[0]/int(data['cpu_cluster_size'])] | 149 | exp.nclusters = [exp.ncpus[0]/int(data['cpu_cluster_size'])] |
139 | exp.ngpus = [int(data['ngpu'])] | 150 | exp.ngpus = [int(data['ngpu'])] |
140 | exp.ngclusters = [exp.ngpus[0]/int(data['gpu_cluster_size'])] | 151 | exp.ngclusters = [exp.ngpus[0]/int(data['gpu_cluster_size'])] |
152 | exp.release_master = [int(data['is_release_master'])] | ||
141 | exp.polluters = [int(data['is_polluters']) != 0] | 153 | exp.polluters = [int(data['is_polluters']) != 0] |
142 | exp.ovh_type = ['max' if int(data['is_worst_case']) == 1 else 'mean'] | 154 | exp.ovh_type = ['max' if int(data['is_worst_case']) == 1 else 'mean'] |
143 | exp.rho = [int(data['rho'])] | 155 | exp.rho = [int(data['rho'])] |
@@ -169,43 +181,44 @@ def dp_to_db_vals(dp): | |||
169 | vals[key] = to_db_val(value) | 181 | vals[key] = to_db_val(value) |
170 | return vals | 182 | return vals |
171 | 183 | ||
172 | def begin_sync(conn): | 184 | def begin_sync(conn, c): |
173 | # use a shorter timout because we'll retry with a new db connection instead | 185 | # use a shorter timout because we'll retry with a new db connection instead |
174 | transaction_timeout = 5*60 | 186 | transaction_timeout = 5*60 |
175 | start = time.time() | 187 | start = time.time() |
176 | while True: | 188 | while True: |
177 | try: | 189 | try: |
178 | conn.execute('BEGIN EXCLUSIVE TRANSACTION') | 190 | conn.autocommit(False) |
179 | # Force a write lock to be acquired before we do | 191 | c.execute('START TRANSACTION') |
180 | # anything more. I hate sqlite. | ||
181 | break | 192 | break |
182 | except lite.OperationalError: | 193 | except db.OperationalError, e: |
194 | print e | ||
183 | elapsed = time.time() - start | 195 | elapsed = time.time() - start |
184 | if elapsed > transaction_timeout: | 196 | if elapsed > transaction_timeout: |
185 | raise | 197 | raise |
186 | else: | 198 | else: |
187 | backoff(20) | 199 | backoff(20) |
188 | pass | 200 | pass |
189 | return conn | 201 | return c |
190 | 202 | ||
191 | def end_sync(conn): | 203 | def end_sync(conn): |
192 | success = conn.commit() | 204 | success = conn.commit() |
205 | conn.autocommit(True) | ||
193 | return success | 206 | return success |
194 | 207 | ||
195 | 208 | ||
196 | def connect_db(db_name, isolation = 'EXCLUSIVE'): | 209 | def connect_db(db_name): |
197 | global distr_mapper | 210 | global distr_mapper |
198 | global dp_col_names | 211 | global dp_col_names |
212 | global dp_col_type_strs | ||
199 | global timeout | 213 | global timeout |
200 | 214 | ||
201 | init_db(db_name) | ||
202 | |||
203 | start = time.time() | 215 | start = time.time() |
204 | while True: | 216 | while True: |
205 | try: | 217 | try: |
206 | conn = lite.connect(db_name, detect_types=lite.PARSE_DECLTYPES|lite.PARSE_COLNAMES, isolation_level=isolation) | 218 | conn = db.connect('mydb.cs.unc.edu', 'gelliott', 'G1ennDB', db_name); |
207 | break | 219 | break |
208 | except lite.OperationalError: | 220 | except db.OperationalError, e: |
221 | print e | ||
209 | elapsed = time.time() - start | 222 | elapsed = time.time() - start |
210 | # just give up :( | 223 | # just give up :( |
211 | if elapsed > timeout: | 224 | if elapsed > timeout: |
@@ -213,9 +226,6 @@ def connect_db(db_name, isolation = 'EXCLUSIVE'): | |||
213 | else: | 226 | else: |
214 | backoff(10) | 227 | backoff(10) |
215 | pass | 228 | pass |
216 | conn.row_factory = lite.Row | ||
217 | # enable foreign keys | ||
218 | conn.execute('PRAGMA FOREIGN_KEYS=ON') | ||
219 | 229 | ||
220 | if distr_mapper is None: | 230 | if distr_mapper is None: |
221 | init_distr_mapper(conn) | 231 | init_distr_mapper(conn) |
@@ -232,17 +242,18 @@ def clear_tables(db_name): | |||
232 | with conn: | 242 | with conn: |
233 | c = conn.cursor() | 243 | c = conn.cursor() |
234 | ################ | 244 | ################ |
235 | begin_sync(conn) | 245 | begin_sync(conn, c) |
236 | for t in tables: | 246 | for t in tables: |
237 | c.execute('DELETE FROM %s' % t) | 247 | c.execute('DELETE FROM %s' % t) |
238 | end_sync(conn) | 248 | end_sync(conn) |
249 | c.close() | ||
239 | ################ | 250 | ################ |
240 | 251 | ||
241 | #def get_results(conn, dp, fields = '*', table = 'sched_results', extra = None): | 252 | #def get_results(conn, dp, fields = '*', table = 'sched_results', extra = None): |
242 | # where = ['%s=?' % k for k in dp.iterkeys()] | 253 | # where = ['%s=?' % k for k in dp.iterkeys()] |
243 | # | 254 | # |
244 | # ################ | 255 | # ################ |
245 | # c = conn.cursor() | 256 | # c = conn.cursor(db.cursors.DictCursor) |
246 | # begin_sync(conn) | 257 | # begin_sync(conn) |
247 | # if extra: | 258 | # if extra: |
248 | # query = "SELECT %s FROM %s WHERE %s %s" % (", ".join(fields), table, " AND ".join(where), extra) | 259 | # query = "SELECT %s FROM %s WHERE %s %s" % (", ".join(fields), table, " AND ".join(where), extra) |
@@ -258,7 +269,7 @@ def clear_tables(db_name): | |||
258 | # return data | 269 | # return data |
259 | 270 | ||
260 | #def count_results(conn, dp, table = 'sched_results'): | 271 | #def count_results(conn, dp, table = 'sched_results'): |
261 | # c = conn.cursor() | 272 | # c = conn.cursor(db.cursors.DictCursor) |
262 | # where = ['%s=?' % k for k in dp.iterkeys()] | 273 | # where = ['%s=?' % k for k in dp.iterkeys()] |
263 | # | 274 | # |
264 | # query = "SELECT COUNT(*) FROM %s WHERE %s" % (table, " AND ".join(where)) | 275 | # query = "SELECT COUNT(*) FROM %s WHERE %s" % (table, " AND ".join(where)) |
@@ -272,35 +283,36 @@ def clear_tables(db_name): | |||
272 | # | 283 | # |
273 | # return data[0] | 284 | # return data[0] |
274 | 285 | ||
275 | def __get_dp_id(conn, dp, add_if_missing = False): | 286 | def __get_dp_id(c, dp, add_if_missing = False): |
276 | global dp_col_names | 287 | global dp_col_names |
288 | global dp_col_type_strs | ||
277 | seeking = dp_to_db(dp) | 289 | seeking = dp_to_db(dp) |
278 | seeking.pop('ts_util', None) | 290 | seeking.pop('ts_util', None) |
279 | query = 'SELECT id FROM dp_ptested WHERE %s' % ' AND '.join(map(lambda c: '%s=?' % c, seeking.iterkeys())) | 291 | query = 'SELECT id FROM dp_ptested WHERE %s' % ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())) |
280 | c = conn.cursor() | ||
281 | c.execute(query, tuple(seeking.values())) | 292 | c.execute(query, tuple(seeking.values())) |
282 | data = c.fetchone() | 293 | row = c.fetchone() |
283 | if data: | 294 | if row: |
284 | return data[0] | 295 | return row['id'] |
285 | elif add_if_missing == False: | 296 | elif add_if_missing == False: |
286 | return None | 297 | return None |
287 | # add the design point | 298 | # add the design point |
288 | col_names = copy.deepcopy(dp_col_names) | 299 | col_names = copy.deepcopy(dp_col_names) |
289 | col_names.pop(0) # remove the id field | 300 | col_names.pop(0) # remove the id field |
290 | c.execute('INSERT INTO dp_ptested VALUES(NULL,%s)' % ','.join(['?']*len(col_names)), | 301 | c.execute('INSERT INTO dp_ptested VALUES(NULL,%s)' % ','.join(['%s']*len(col_names)), |
291 | tuple(map(lambda x: seeking[x], col_names))) | 302 | tuple(map(lambda x: seeking[x], col_names))) |
292 | # get the assigned id | 303 | # get the assigned id |
293 | c.execute(query, tuple(seeking.values())) | 304 | c.execute(query, tuple(seeking.values())) |
294 | data = c.fetchone() | 305 | row = c.fetchone() |
295 | return data[0] | 306 | return row['id'] |
296 | 307 | ||
297 | def __already_pending(dp, conn): | 308 | def __already_pending(dp, conn): |
298 | seeking = dp_to_db(dp) | 309 | seeking = dp_to_db(dp) |
299 | c = conn.cursor() | 310 | c = conn.cursor() |
300 | c.execute('SELECT COUNT(*) FROM dp_pending WHERE %s' % ' AND '.join(map(lambda x: '%s=?' % x, seeking.iterkeys())), | 311 | c.execute('SELECT COUNT(*) FROM dp_pending WHERE %s' % ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())), |
301 | tuple(seeking.values())) | 312 | tuple(seeking.values())) |
302 | data = c.fetchone() | 313 | row = c.fetchone() |
303 | processed = bool(data[0]) | 314 | processed = bool(row[0]) |
315 | c.close() | ||
304 | return processed | 316 | return processed |
305 | 317 | ||
306 | def already_pending(dp, conn = None, db_name = None): | 318 | def already_pending(dp, conn = None, db_name = None): |
@@ -316,10 +328,11 @@ def __already_processed(dp, conn): | |||
316 | seeking.pop('ts_util', None) | 328 | seeking.pop('ts_util', None) |
317 | c = conn.cursor() | 329 | c = conn.cursor() |
318 | c.execute('SELECT COUNT(*) FROM sched_results as R JOIN dp_ptested as K on R.dp=K.id ' | 330 | c.execute('SELECT COUNT(*) FROM sched_results as R JOIN dp_ptested as K on R.dp=K.id ' |
319 | 'WHERE R.ts_util=? AND %s' % ' AND '.join(map(lambda x: 'K.%s=?' % x, seeking.iterkeys())), | 331 | 'WHERE R.ts_util=%%s AND %s' % ' AND '.join(map(lambda x: 'K.%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())), |
320 | ((dp.sys_util,) + tuple(seeking.values()))) | 332 | ((dp.sys_util,) + tuple(seeking.values()))) |
321 | data = c.fetchone() | 333 | row = c.fetchone() |
322 | processed = bool(data[0]) | 334 | processed = bool(row[0]) |
335 | c.close() | ||
323 | return processed | 336 | return processed |
324 | 337 | ||
325 | def already_processed(dp, conn = None, db_name = None): | 338 | def already_processed(dp, conn = None, db_name = None): |
@@ -332,6 +345,7 @@ def already_processed(dp, conn = None, db_name = None): | |||
332 | 345 | ||
333 | def store_design_points(db_name, dps, clean): | 346 | def store_design_points(db_name, dps, clean): |
334 | global dp_col_names | 347 | global dp_col_names |
348 | global dp_col_type_strs | ||
335 | 349 | ||
336 | conn = connect_db(db_name) | 350 | conn = connect_db(db_name) |
337 | npending = 0 | 351 | npending = 0 |
@@ -345,22 +359,21 @@ def store_design_points(db_name, dps, clean): | |||
345 | dps = [dp_to_db(dp) for dp in dps] | 359 | dps = [dp_to_db(dp) for dp in dps] |
346 | 360 | ||
347 | ################ | 361 | ################ |
348 | begin_sync(conn) | ||
349 | |||
350 | # convert dicts into correctly-ordered list and insert | 362 | # convert dicts into correctly-ordered list and insert |
351 | c.executemany('INSERT INTO dp_pending VALUES(%s)' % ','.join(['?']*len(dp_col_names)), | 363 | c.executemany('INSERT INTO dp_pending VALUES(NULL,%s)' % ','.join(['%s']*len(dp_col_names)), |
352 | [tuple(map(lambda x: d[x], dp_col_names)) for d in dps]) | 364 | [tuple(map(lambda x: d[x], dp_col_names)) for d in dps]) |
353 | # complete | 365 | # complete |
354 | c.execute('SELECT COUNT(*) FROM dp_pending') | 366 | c.execute('SELECT COUNT(*) FROM dp_pending') |
355 | npending = c.fetchone()[0] | 367 | npending = c.fetchone()[0] |
356 | |||
357 | end_sync(conn) | ||
358 | ############## | 368 | ############## |
369 | |||
370 | c.close() | ||
359 | return len(dps), npending | 371 | return len(dps), npending |
360 | 372 | ||
361 | 373 | ||
362 | def get_design_points(db_name, ndp = 1): | 374 | def get_design_points(db_name, ndp = 1): |
363 | global dp_col_names | 375 | global dp_col_names |
376 | global dp_col_type_strs | ||
364 | global max_fail | 377 | global max_fail |
365 | fetched = [] | 378 | fetched = [] |
366 | 379 | ||
@@ -380,16 +393,17 @@ def get_design_points(db_name, ndp = 1): | |||
380 | conn = connect_db(db_name) | 393 | conn = connect_db(db_name) |
381 | with conn: | 394 | with conn: |
382 | ################ | 395 | ################ |
383 | begin_sync(conn) | 396 | c = conn.cursor(db.cursors.DictCursor) |
384 | c = conn.cursor() | 397 | begin_sync(conn, c) |
385 | c.execute('SELECT %s FROM dp_pending LIMIT ?' % (','.join(dp_col_names)), (ndp,)) | 398 | # c.execute('LOCK TABLES dp_pending WRITE') |
399 | c.execute('SELECT %s FROM dp_pending LIMIT %%s FOR UPDATE' % (','.join(dp_col_names)), (ndp,)) | ||
386 | dps = [db_to_dp(d) for d in c.fetchall()] | 400 | dps = [db_to_dp(d) for d in c.fetchall()] |
387 | nfetched = len(dps) if dps else 0 | 401 | nfetched = len(dps) if dps else 0 |
388 | if nfetched > 0: | 402 | if nfetched > 0: |
389 | temp = [dp_to_db(d) for d in dps] | 403 | temp = [dp_to_db(d) for d in dps] |
390 | for i in xrange(nfetched): | 404 | for i in xrange(nfetched): |
391 | c.execute('DELETE FROM dp_pending ' | 405 | c.execute('DELETE FROM dp_pending ' |
392 | 'WHERE %s' % ' AND '.join(map(lambda c: '%s=?'%c, dp_col_names)), | 406 | 'WHERE %s' % ' AND '.join(map(lambda x: '%s=%s'%(x, dp_col_type_strs[x]), dp_col_names)), |
393 | tuple(map(lambda x: temp[i][x], dp_col_names))) | 407 | tuple(map(lambda x: temp[i][x], dp_col_names))) |
394 | if c.rowcount == 1: | 408 | if c.rowcount == 1: |
395 | fetched.append(dps[i]) | 409 | fetched.append(dps[i]) |
@@ -399,11 +413,14 @@ def get_design_points(db_name, ndp = 1): | |||
399 | print 'deleted too many rows... dropping.' | 413 | print 'deleted too many rows... dropping.' |
400 | if len(fetched) == 0: | 414 | if len(fetched) == 0: |
401 | retry = True | 415 | retry = True |
416 | # c.execute('UNLOCK TABLES') | ||
402 | end_sync(conn) | 417 | end_sync(conn) |
418 | c.close() | ||
403 | ############## | 419 | ############## |
404 | # success! | 420 | # success! |
405 | break | 421 | break |
406 | except lite.OperationalError: | 422 | except db.OperationalError, e: |
423 | print e | ||
407 | # make sure the db connection is closed | 424 | # make sure the db connection is closed |
408 | if conn is not None: | 425 | if conn is not None: |
409 | conn.close() | 426 | conn.close() |
@@ -416,9 +433,7 @@ def get_design_points(db_name, ndp = 1): | |||
416 | else: | 433 | else: |
417 | time_to_sleep = (random.random()*20 + 5)*failcount | 434 | time_to_sleep = (random.random()*20 + 5)*failcount |
418 | time_to_sleep = min(time_to_sleep, 4*60) | 435 | time_to_sleep = min(time_to_sleep, 4*60) |
419 | # flush lustre cache??? | ||
420 | print 'retrying get_design_points from scratch. sleep = %.3f' % time_to_sleep | 436 | print 'retrying get_design_points from scratch. sleep = %.3f' % time_to_sleep |
421 | os.system('sync') | ||
422 | time.sleep(time_to_sleep) | 437 | time.sleep(time_to_sleep) |
423 | 438 | ||
424 | if failcount > 0: | 439 | if failcount > 0: |
@@ -430,19 +445,16 @@ def get_design_points(db_name, ndp = 1): | |||
430 | del conn | 445 | del conn |
431 | conn = None | 446 | conn = None |
432 | 447 | ||
433 | os.system('sync') | ||
434 | |||
435 | return fetched | 448 | return fetched |
436 | 449 | ||
437 | 450 | ||
438 | def __store_eff_sched_results(conn, dp_id, stats): | 451 | def __store_eff_sched_results(c, dp_id, stats): |
439 | c = conn.cursor() | ||
440 | for factor,eff_curve in stats.iteritems(): | 452 | for factor,eff_curve in stats.iteritems(): |
441 | for eff_ts_util,sched in eff_curve.iteritems(): | 453 | for eff_ts_util,sched in eff_curve.iteritems(): |
442 | # do prior results exist? | 454 | # do prior results exist? |
443 | c.execute('SELECT ntested, nsched, avg_sched, avg_tard, avg_bandwidth ' | 455 | c.execute('SELECT ntested, nsched, avg_sched, avg_tard, avg_bandwidth ' |
444 | 'FROM scaled_sched_results ' | 456 | 'FROM scaled_sched_results ' |
445 | 'WHERE dp=? AND eff_ts_util=? AND scale_factor=?', | 457 | 'WHERE dp=%s AND eff_ts_util=%s AND scale_factor=%s FOR UPDATE', |
446 | (dp_id, eff_ts_util, factor)) | 458 | (dp_id, eff_ts_util, factor)) |
447 | row = c.fetchone() | 459 | row = c.fetchone() |
448 | if row: | 460 | if row: |
@@ -459,16 +471,17 @@ def __store_eff_sched_results(conn, dp_id, stats): | |||
459 | sched.ntested += rntested | 471 | sched.ntested += rntested |
460 | sched.nsched += rnsched | 472 | sched.nsched += rnsched |
461 | c.execute('UPDATE scaled_sched_results ' | 473 | c.execute('UPDATE scaled_sched_results ' |
462 | 'SET ntested=?, nsched=?, avg_sched=?, avg_tard=?, avg_bandwidth=? ' | 474 | 'SET ntested=%s, nsched=%s, avg_sched=%s, avg_tard=%s, avg_bandwidth=%s ' |
463 | 'WHERE dp=? AND eff_ts_util=? AND scale_factor=?', | 475 | 'WHERE dp=%s AND eff_ts_util=%s AND scale_factor=%s', |
464 | (sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth, dp_id, eff_ts_util, factor)) | 476 | (sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth, dp_id, eff_ts_util, factor)) |
465 | else: | 477 | else: |
466 | c.execute('INSERT INTO scaled_sched_results ' | 478 | c.execute('INSERT INTO scaled_sched_results ' |
467 | 'VALUES(?,?,?,?,?,?,?,?)', | 479 | 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)', |
468 | (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth)) | 480 | (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth)) |
469 | 481 | ||
470 | def store_sched_results(db_name, data, ndp = 0): | 482 | def store_sched_results(db_name, data, ndp = 0): |
471 | global dp_col_names | 483 | global dp_col_names |
484 | global dp_col_type_strs | ||
472 | global max_fail | 485 | global max_fail |
473 | col_names = None | 486 | col_names = None |
474 | 487 | ||
@@ -488,33 +501,32 @@ def store_sched_results(db_name, data, ndp = 0): | |||
488 | col_names.pop(0) # remove the id field | 501 | col_names.pop(0) # remove the id field |
489 | 502 | ||
490 | with conn: | 503 | with conn: |
491 | # if the db hangs, it's always here on the begin transaction... | 504 | c = conn.cursor(db.cursors.DictCursor) |
492 | begin_sync(conn) | ||
493 | 505 | ||
494 | # get IDs for all design points | 506 | # get IDs for all design points |
495 | dp_ids = [__get_dp_id(conn, d.dp, add_if_missing = True) for d in data] | 507 | dp_ids = [__get_dp_id(c, d.dp, add_if_missing = True) for d in data] |
496 | # insert the normal sched data in one go | 508 | # insert the normal sched data in one go |
497 | conn.executemany('INSERT INTO sched_results VALUES(?,?,?,?,?,?,?)', | 509 | c.executemany('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)', |
498 | [(dp_id, ts_util, stats.avg_sched, stats.ntested, stats.nsched, stats.avg_tard, stats.avg_bandwidth) | 510 | [(dp_id, ts_util, stats.avg_sched, stats.ntested, stats.nsched, stats.avg_tard, stats.avg_bandwidth) |
499 | for dp_id, ts_util, stats in zip(dp_ids, [d.dp.sys_util for d in data], [d.sched_stats for d in data])]) | 511 | for dp_id, ts_util, stats in zip(dp_ids, [d.dp.sys_util for d in data], [d.sched_stats for d in data])]) |
500 | 512 | ||
501 | if d.eff_sched_stats is not None: | 513 | if d.eff_sched_stats is not None: |
514 | begin_sync(conn, c) | ||
502 | for dp_id, d in zip(dp_ids, data): | 515 | for dp_id, d in zip(dp_ids, data): |
503 | __store_eff_sched_results(conn, dp_id, d.eff_sched_stats) | 516 | __store_eff_sched_results(c, dp_id, d.eff_sched_stats) |
517 | end_sync(conn) | ||
504 | 518 | ||
505 | # try to fetch the next design points while we hold the db lock | ||
506 | # not the best thing to do for concurrency, but lustre's locking | ||
507 | # is very slow/costly. | ||
508 | if ndp > 0: | 519 | if ndp > 0: |
509 | c = conn.cursor() | 520 | begin_sync(conn, c) |
510 | c.execute('SELECT %s FROM dp_pending LIMIT ?' % (','.join(dp_col_names)), (ndp,)) | 521 | # c.execute('LOCK TABLES dp_pending WRITE') |
522 | c.execute('SELECT %s FROM dp_pending LIMIT %%s FOR UPDATE' % (','.join(dp_col_names)), (ndp,)) | ||
511 | dps = [db_to_dp(d) for d in c.fetchall()] | 523 | dps = [db_to_dp(d) for d in c.fetchall()] |
512 | nfetched = len(dps) if dps else 0 | 524 | nfetched = len(dps) if dps else 0 |
513 | if nfetched > 0: | 525 | if nfetched > 0: |
514 | temp = [dp_to_db(d) for d in dps] | 526 | temp = [dp_to_db(d) for d in dps] |
515 | for i in xrange(nfetched): | 527 | for i in xrange(nfetched): |
516 | c.execute('DELETE FROM dp_pending ' | 528 | c.execute('DELETE FROM dp_pending ' |
517 | 'WHERE %s' % ' AND '.join(map(lambda c: '%s=?'%c, dp_col_names)), | 529 | 'WHERE %s' % ' AND '.join(map(lambda x: '%s=%s'%(x, dp_col_type_strs[x]), dp_col_names)), |
518 | tuple(map(lambda x: temp[i][x], dp_col_names))) | 530 | tuple(map(lambda x: temp[i][x], dp_col_names))) |
519 | if c.rowcount == 1: | 531 | if c.rowcount == 1: |
520 | fetched.append(dps[i]) | 532 | fetched.append(dps[i]) |
@@ -522,12 +534,13 @@ def store_sched_results(db_name, data, ndp = 0): | |||
522 | print 'store_sched_results: raced for design point. dropping.' | 534 | print 'store_sched_results: raced for design point. dropping.' |
523 | else: | 535 | else: |
524 | print 'store_sched_results: deleted too many rows... dropping.' | 536 | print 'store_sched_results: deleted too many rows... dropping.' |
525 | # commit all changes | 537 | # c.execute('UNLOCK TABLES') |
526 | end_sync(conn) | 538 | end_sync(conn) |
527 | 539 | c.close() | |
528 | # success! | 540 | # success! |
529 | break | 541 | break |
530 | except lite.OperationalError: | 542 | except db.OperationalError, e: |
543 | print e | ||
531 | if conn is not None: | 544 | if conn is not None: |
532 | conn.close() | 545 | conn.close() |
533 | del conn | 546 | del conn |
@@ -540,7 +553,6 @@ def store_sched_results(db_name, data, ndp = 0): | |||
540 | time_to_sleep = (random.random()*20 + 5)*failcount | 553 | time_to_sleep = (random.random()*20 + 5)*failcount |
541 | time_to_sleep = min(time_to_sleep, 4*60) | 554 | time_to_sleep = min(time_to_sleep, 4*60) |
542 | print 'retrying store_sched_results from scratch. sleep = %.3f' % time_to_sleep | 555 | print 'retrying store_sched_results from scratch. sleep = %.3f' % time_to_sleep |
543 | os.system('sync') | ||
544 | time.sleep(time_to_sleep) | 556 | time.sleep(time_to_sleep) |
545 | 557 | ||
546 | if failcount > 0: | 558 | if failcount > 0: |
@@ -552,6 +564,4 @@ def store_sched_results(db_name, data, ndp = 0): | |||
552 | del conn | 564 | del conn |
553 | conn = None | 565 | conn = None |
554 | 566 | ||
555 | os.system('sync') | ||
556 | |||
557 | return fetched | 567 | return fetched |
diff --git a/rtss14/rtss14.py b/rtss14/rtss14.py index 6d12361..09529c2 100755 --- a/rtss14/rtss14.py +++ b/rtss14/rtss14.py | |||
@@ -10,7 +10,8 @@ import math | |||
10 | import time | 10 | import time |
11 | import inspect | 11 | import inspect |
12 | 12 | ||
13 | import sqlite3 as lite | 13 | #import sqlite3 as lite |
14 | import MySQLdb | ||
14 | 15 | ||
15 | 16 | ||
16 | 17 | ||
@@ -128,6 +129,7 @@ def create_gpu_task_set(dp, overheads = None): | |||
128 | else: | 129 | else: |
129 | ts[i].uses_gpu = False | 130 | ts[i].uses_gpu = False |
130 | ts[i].nrequests = 0 | 131 | ts[i].nrequests = 0 |
132 | ts[i].nengine_requests = 0 | ||
131 | 133 | ||
132 | for t in ts: | 134 | for t in ts: |
133 | t.wss = dp.wss | 135 | t.wss = dp.wss |
@@ -174,14 +176,19 @@ def create_gpu_task_set(dp, overheads = None): | |||
174 | t.max_np_interval = max(t.max_np_interval, overheads.gpu_xmit.xmit_cost(XmitOverheads.H2D, min(t.stdata, dp.chunk_size))) | 176 | t.max_np_interval = max(t.max_np_interval, overheads.gpu_xmit.xmit_cost(XmitOverheads.H2D, min(t.stdata, dp.chunk_size))) |
175 | t.max_np_interval = max(t.max_np_interval, overheads.gpu_xmit.xmit_cost(XmitOverheads.D2H, min(t.stdata, dp.chunk_size))) | 177 | t.max_np_interval = max(t.max_np_interval, overheads.gpu_xmit.xmit_cost(XmitOverheads.D2H, min(t.stdata, dp.chunk_size))) |
176 | 178 | ||
177 | # one for token lock, one for exec engine, one for each chunk | 179 | t.nkernrequests = 1 |
180 | t.nsendrequests = int(ceil(float(t.sdata)/dp.chunk_size)) | ||
181 | t.nrecvrequests = int(ceil(float(t.rdata)/dp.chunk_size)) | ||
178 | t.nstaterequests = int(ceil(float(t.stdata)/dp.chunk_size)) | 182 | t.nstaterequests = int(ceil(float(t.stdata)/dp.chunk_size)) |
179 | t.nrequests = 1 + \ | 183 | if not dp.p2p: |
180 | 1 + \ | 184 | t.nstaterequests *= 2 |
181 | int(ceil(float(t.sdata)/dp.chunk_size)) + \ | 185 | if dp.rho > 0: |
182 | int(ceil(float(t.rdata)/dp.chunk_size)) + \ | 186 | t.ntokenrequests = 1 |
183 | t.nstaterequests | 187 | else: |
184 | 188 | t.ntokenrequests = 0 | |
189 | t.nengine_requests = t.nkernrequests + t.nsendrequests + t.nrecvrequests + t.nstaterequests | ||
190 | t.nrequests = t.nengine_requests + t.ntokenrequests | ||
191 | |||
185 | return ts | 192 | return ts |
186 | 193 | ||
187 | def complete(results, n): | 194 | def complete(results, n): |
@@ -347,9 +354,11 @@ def is_schedulable(ts, dp, overheads): | |||
347 | if len(g) <= tokensPerClstr or gsize == 1: | 354 | if len(g) <= tokensPerClstr or gsize == 1: |
348 | for t in g: | 355 | for t in g: |
349 | if t.stdata != 0: | 356 | if t.stdata != 0: |
357 | t.nrequests -= t.nstaterequests | ||
358 | t.nengine_requests -= t.nstaterequests | ||
359 | t.nstaterequests = 0 | ||
350 | t.stdata = 0 | 360 | t.stdata = 0 |
351 | t.stcost = 0.0 | 361 | t.stcost = 0.0 |
352 | t.nrequests -= t.nstaterequests | ||
353 | 362 | ||
354 | # do we overutilize any of the clusters, or does migration | 363 | # do we overutilize any of the clusters, or does migration |
355 | # cause per-task constraint violations? | 364 | # cause per-task constraint violations? |
@@ -400,7 +409,7 @@ def is_schedulable(ts, dp, overheads): | |||
400 | if not p or len(p) == 0: | 409 | if not p or len(p) == 0: |
401 | continue | 410 | continue |
402 | 411 | ||
403 | if not jlfp.charge_scheduling_overheads(overheads, size, False, p): | 412 | if not jlfp.charge_scheduling_overheads(overheads, size, False, p, clusters, gclusters, dp): |
404 | # print 'failed in overhead charging' | 413 | # print 'failed in overhead charging' |
405 | success = False | 414 | success = False |
406 | break | 415 | break |
@@ -714,7 +723,7 @@ def process_design_points(args): | |||
714 | 723 | ||
715 | nfinished += nchunk | 724 | nfinished += nchunk |
716 | 725 | ||
717 | except lite.OperationalError: | 726 | except MySQLdb.OperationalError, e: |
718 | print '%d: CRAP. Database Error while %s' % (os.getpid(), 'getting work.' if dps is None else 'storing results.') | 727 | print '%d: CRAP. Database Error while %s' % (os.getpid(), 'getting work.' if dps is None else 'storing results.') |
719 | print traceback.format_exc() | 728 | print traceback.format_exc() |
720 | allend = time.time() | 729 | allend = time.time() |
@@ -731,16 +740,17 @@ def TEST_get_dp_space(cpus): | |||
731 | # system parameters | 740 | # system parameters |
732 | exp.host = ['bonham'] | 741 | exp.host = ['bonham'] |
733 | exp.ncpus = [12] | 742 | exp.ncpus = [12] |
734 | # exp.nclusters = [1, 2, 12] | ||
735 | exp.nclusters = [1, 2, 12] | 743 | exp.nclusters = [1, 2, 12] |
744 | # exp.nclusters = [12] | ||
736 | exp.ngpus = [8] | 745 | exp.ngpus = [8] |
737 | # exp.ngclusters = [1, 4, 8] | ||
738 | exp.ngclusters = [1, 4, 8] | 746 | exp.ngclusters = [1, 4, 8] |
747 | # exp.ngclusters = [8] | ||
748 | exp.release_master = [False] | ||
739 | exp.ovh_type = ['mean'] | 749 | exp.ovh_type = ['mean'] |
740 | exp.polluters = [True] | 750 | exp.polluters = [True] |
741 | 751 | ||
742 | # gpusync config variables | 752 | # gpusync config variables |
743 | exp.rho = [0,3] | 753 | exp.rho = [2] |
744 | exp.dgl = [True,False] | 754 | exp.dgl = [True,False] |
745 | exp.p2p = [True,False] | 755 | exp.p2p = [True,False] |
746 | # exp.ncopy_engines = [1, 2] | 756 | # exp.ncopy_engines = [1, 2] |
@@ -748,7 +758,8 @@ def TEST_get_dp_space(cpus): | |||
748 | exp.chunk_size = [1*1024*1024] # 1MB (2MB?) | 758 | exp.chunk_size = [1*1024*1024] # 1MB (2MB?) |
749 | 759 | ||
750 | # task parameters | 760 | # task parameters |
751 | exp.sys_util = [0.5,5.0,10.0] | 761 | # exp.sys_util = [0.5,5.0,10.0] |
762 | exp.sys_util = [5.0] | ||
752 | # exp.task_util = ['u-uni-medium', 'u-uni-heavy'] | 763 | # exp.task_util = ['u-uni-medium', 'u-uni-heavy'] |
753 | exp.task_util = ['u-uni-medium'] | 764 | exp.task_util = ['u-uni-medium'] |
754 | exp.period = ['p-uni-long'] | 765 | exp.period = ['p-uni-long'] |
@@ -775,7 +786,8 @@ def get_dp_space(cpus): | |||
775 | exp.nclusters = [1, 2, 12] | 786 | exp.nclusters = [1, 2, 12] |
776 | exp.ngpus = [8] | 787 | exp.ngpus = [8] |
777 | # exp.ngclusters = [1, 2, 4, 8] | 788 | # exp.ngclusters = [1, 2, 4, 8] |
778 | exp.ngclusters = [1, 4, 8] | 789 | exp.ngclusters = [1, 2, 4, 8] |
790 | exp.release_master = [False] | ||
779 | exp.polluters = [False, True] | 791 | exp.polluters = [False, True] |
780 | exp.ovh_type = ['mean', 'max'] | 792 | exp.ovh_type = ['mean', 'max'] |
781 | # exp.ovh_type = ['mean'] | 793 | # exp.ovh_type = ['mean'] |
@@ -790,8 +802,8 @@ def get_dp_space(cpus): | |||
790 | 802 | ||
791 | # task parameters | 803 | # task parameters |
792 | # step_size = 0.1 | 804 | # step_size = 0.1 |
793 | step_size = 0.5 | 805 | # step_size = 0.5 |
794 | # step_size = 0.25 | 806 | step_size = 0.25 |
795 | start_pt = step_size | 807 | start_pt = step_size |
796 | # start_pt = 10.0 | 808 | # start_pt = 10.0 |
797 | exp.sys_util = [float(v) for v in arange(start_pt, cpus+step_size, step_size)] | 809 | exp.sys_util = [float(v) for v in arange(start_pt, cpus+step_size, step_size)] |
@@ -807,9 +819,10 @@ def get_dp_space(cpus): | |||
807 | exp.kern = ['k-uni-light', 'k-uni-medium', 'k-uni-heavy'] | 819 | exp.kern = ['k-uni-light', 'k-uni-medium', 'k-uni-heavy'] |
808 | # exp.cpu_csx = ['c-const-light', 'c-const-medium', 'c-const-heavy'] | 820 | # exp.cpu_csx = ['c-const-light', 'c-const-medium', 'c-const-heavy'] |
809 | exp.cpu_csx = ['c-const-light'] | 821 | exp.cpu_csx = ['c-const-light'] |
810 | # exp.data = ['d-const-light', 'd-const-medium', 'd-const-heavy', 'd-const-very-heavy'] | 822 | # exp.data = ['d-const-light', 'd-const-medium', 'd-const-heavy', 'd-const-very-heavy'] |
811 | exp.data = ['d-const-light', 'd-const-medium', 'd-const-heavy'] | 823 | exp.data = ['d-uni-light', 'd-uni-medium', 'd-uni-heavy'] |
812 | exp.state = ['s-const-zero', 's-const-light', 's-const-medium'] | 824 | # exp.state = ['s-const-zero', 's-const-light', 's-const-medium', 's-const-heavy'] |
825 | exp.state = ['s-const-zero', 's-uni-light', 's-uni-medium'] | ||
813 | 826 | ||
814 | return exp | 827 | return exp |
815 | 828 | ||
@@ -844,8 +857,8 @@ def main(): | |||
844 | host = 'bonham' | 857 | host = 'bonham' |
845 | 858 | ||
846 | cpus = 12.0 | 859 | cpus = 12.0 |
847 | exp = get_dp_space(cpus) | 860 | # exp = get_dp_space(cpus) |
848 | # exp = TEST_get_dp_space(cpus) | 861 | exp = TEST_get_dp_space(cpus) |
849 | 862 | ||
850 | def valid(dp): | 863 | def valid(dp): |
851 | # filter out gpus shared between cpu clusters for now | 864 | # filter out gpus shared between cpu clusters for now |
@@ -873,18 +886,47 @@ def main(): | |||
873 | 886 | ||
874 | return True | 887 | return True |
875 | 888 | ||
876 | design_points = [dp for dp in DesignPointGenerator(exp, is_valid = valid)] | 889 | if not args.pretend and not args.resume: |
877 | ndp = len(design_points) | 890 | db.clear_tables(args.database) |
891 | |||
892 | # load design points incrementally as to not overload memory | ||
893 | gen = DesignPointGenerator(exp, is_valid = valid) | ||
894 | dp_chunk_sz = 10000 # number of design points to load at a time | ||
895 | ndp = 0 | ||
878 | if not args.pretend: | 896 | if not args.pretend: |
879 | random.shuffle(design_points) | 897 | nstored = 0 |
880 | if not args.resume: | 898 | npending = 0 |
881 | db.clear_tables(args.database) | 899 | while True: |
882 | nstored, npending = db.store_design_points(args.database, design_points, clean = not args.resume) | 900 | dp_chunk = [] |
883 | print "Loaded %d of %d design points. (%d were completed or pending, %d now pending)" % ( | 901 | for _ in range(dp_chunk_sz): |
884 | nstored, ndp, ndp - nstored, npending) | 902 | try: |
903 | dp_chunk.append(gen.next()) | ||
904 | except StopIteration: | ||
905 | break | ||
906 | if len(dp_chunk) == 0: | ||
907 | break | ||
908 | ndp += len(dp_chunk) | ||
909 | random.shuffle(dp_chunk) | ||
910 | tmpnstored, npending = db.store_design_points(args.database, dp_chunk, clean = not args.resume) | ||
911 | nstored += tmpnstored | ||
912 | print "Loaded %d of %d design points. (%d were completed or pending, %d now pending)" % (nstored, ndp, ndp - nstored, npending) | ||
885 | else: | 913 | else: |
914 | for _ in gen: | ||
915 | ndp += 1 | ||
886 | print "%d design points planned" % ndp | 916 | print "%d design points planned" % ndp |
887 | 917 | ||
918 | # design_points = [dp for dp in DesignPointGenerator(exp, is_valid = valid)] | ||
919 | # ndp = len(design_points) | ||
920 | # if not args.pretend: | ||
921 | # random.shuffle(design_points) | ||
922 | # if not args.resume: | ||
923 | # db.clear_tables(args.database) | ||
924 | # nstored, npending = db.store_design_points(args.database, design_points, clean = not args.resume) | ||
925 | # print "Loaded %d of %d design points. (%d were completed or pending, %d now pending)" % ( | ||
926 | # nstored, ndp, ndp - nstored, npending) | ||
927 | # else: | ||
928 | # print "%d design points planned" % ndp | ||
929 | |||
888 | if args.pretend or args.initonly: | 930 | if args.pretend or args.initonly: |
889 | exit(0) | 931 | exit(0) |
890 | 932 | ||