diff options
author | Glenn Elliott <gelliott@cs.unc.edu> | 2014-09-24 21:49:40 -0400 |
---|---|---|
committer | Glenn Elliott <gelliott@cs.unc.edu> | 2014-09-24 21:49:40 -0400 |
commit | b4af97a84c6e0205d45c5356a564ed2d17f2b1dc (patch) | |
tree | 29f2d72dfe282a8a68501e1f0b6dba415cefbd69 | |
parent | 9c09bb4362955af2e81ea5c475cdb9ea166f6a31 (diff) |
Fix race in dp ID generation in dp_ptested
Race allowed a single design point to get multiple
dp ids in dp_ptested.
-rwxr-xr-x | rtss14/database.py | 266 |
1 files changed, 160 insertions, 106 deletions
diff --git a/rtss14/database.py b/rtss14/database.py index c0b099a..e9fe99b 100755 --- a/rtss14/database.py +++ b/rtss14/database.py | |||
@@ -43,6 +43,8 @@ def init_distr_mapper(conn): | |||
43 | break | 43 | break |
44 | except db.OperationalError, e: | 44 | except db.OperationalError, e: |
45 | print e | 45 | print e |
46 | if c is not None: | ||
47 | print c._last_executed | ||
46 | elapsed = time.time() - start | 48 | elapsed = time.time() - start |
47 | if elapsed > timeout: | 49 | if elapsed > timeout: |
48 | raise | 50 | raise |
@@ -78,6 +80,8 @@ def init_dp_col_names(conn): | |||
78 | break | 80 | break |
79 | except db.OperationalError, e: | 81 | except db.OperationalError, e: |
80 | print e | 82 | print e |
83 | if c is not None: | ||
84 | print c._last_executed | ||
81 | elapsed = time.time() - start | 85 | elapsed = time.time() - start |
82 | if elapsed > timeout: | 86 | if elapsed > timeout: |
83 | raise | 87 | raise |
@@ -192,6 +196,8 @@ def begin_sync(conn, c): | |||
192 | break | 196 | break |
193 | except db.OperationalError, e: | 197 | except db.OperationalError, e: |
194 | print e | 198 | print e |
199 | if c is not None: | ||
200 | print c._last_executed | ||
195 | elapsed = time.time() - start | 201 | elapsed = time.time() - start |
196 | if elapsed > transaction_timeout: | 202 | if elapsed > transaction_timeout: |
197 | raise | 203 | raise |
@@ -205,6 +211,9 @@ def end_sync(conn): | |||
205 | conn.autocommit(True) | 211 | conn.autocommit(True) |
206 | return success | 212 | return success |
207 | 213 | ||
214 | def rollback(conn): | ||
215 | conn.rollback() | ||
216 | conn.autocommit(True) | ||
208 | 217 | ||
209 | def connect_db(db_name): | 218 | def connect_db(db_name): |
210 | global distr_mapper | 219 | global distr_mapper |
@@ -249,74 +258,49 @@ def clear_tables(db_name): | |||
249 | c.close() | 258 | c.close() |
250 | ################ | 259 | ################ |
251 | 260 | ||
252 | def __get_dp_id(c, dp, add_if_missing = False): | 261 | def lookup_dp_id(c, dp): |
253 | global dp_col_names | 262 | global dp_col_names |
254 | global dp_col_type_strs | 263 | global dp_col_type_strs |
255 | seeking = dp_to_db(dp) | 264 | seeking = dp_to_db(dp) |
256 | seeking.pop('ts_util', None) | 265 | seeking.pop('ts_util', None) |
257 | query = 'SELECT id FROM dp_ptested WHERE %s' % ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())) | ||
258 | c.execute(query, tuple(seeking.values())) | ||
259 | row = c.fetchone() | ||
260 | if row: | ||
261 | return row['id'] | ||
262 | elif add_if_missing == False: | ||
263 | return None | ||
264 | # add the design point | ||
265 | col_names = copy.deepcopy(dp_col_names) | ||
266 | col_names.pop(0) # remove the id field | ||
267 | c.execute('INSERT INTO dp_ptested VALUES(NULL,%s)' % ','.join(['%s']*len(col_names)), | ||
268 | tuple(map(lambda x: seeking[x], col_names))) | ||
269 | # get the assigned id | ||
270 | c.execute(query, tuple(seeking.values())) | ||
271 | row = c.fetchone() | ||
272 | return row['id'] | ||
273 | |||
274 | #def __already_pending(dp, conn): | ||
275 | # seeking = dp_to_db(dp) | ||
276 | # c = conn.cursor() | ||
277 | # c.execute('SELECT COUNT(*) FROM dp_pending WHERE taken=0 AND %s' % ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())), | ||
278 | # tuple(seeking.values())) | ||
279 | # row = c.fetchone() | ||
280 | # processed = bool(row[0]) | ||
281 | # c.close() | ||
282 | # return processed | ||
283 | |||
284 | #def already_pending(dp, conn = None, db_name = None): | ||
285 | # if conn: | ||
286 | # return __already_pending(dp, conn) | ||
287 | # else: | ||
288 | # conn = connect_db(db_name) | ||
289 | # with conn: | ||
290 | # return __already_pending(dp, conn) | ||
291 | |||
292 | #def __already_processed(dp, conn): | ||
293 | # seeking = dp_to_db(dp) | ||
294 | # seeking.pop('ts_util', None) | ||
295 | # c = conn.cursor() | ||
296 | # c.execute('SELECT COUNT(*) FROM sched_results as R JOIN dp_ptested as K on R.dp=K.id ' | ||
297 | # 'WHERE R.ts_util=%%s AND %s' % ' AND '.join(map(lambda x: 'K.%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())), | ||
298 | # ((dp.sys_util,) + tuple(seeking.values()))) | ||
299 | # row = c.fetchone() | ||
300 | # processed = bool(row[0]) | ||
301 | # c.close() | ||
302 | # return processed | ||
303 | 266 | ||
304 | #def already_processed(dp, conn = None, db_name = None): | 267 | col_names = [n for n in dp_col_names if n != 'ts_util'] |
305 | # if conn: | 268 | |
306 | # return __already_processed(dp, conn) | 269 | # atomically add new dp to dp_ptested. fails if dp already in table |
307 | # else: | 270 | c.execute('INSERT INTO dp_ptested(id,%s) ' |
308 | # conn = connect_db(db_name) | 271 | 'SELECT NULL,%s ' |
309 | # with conn: | 272 | 'FROM dual ' |
310 | # return __already_processed(dp, conn) | 273 | 'WHERE NOT EXISTS ( ' |
274 | 'SELECT 1 ' | ||
275 | 'FROM dp_ptested ' | ||
276 | 'WHERE %s )' % ( ','.join(col_names), | ||
277 | ','.join(['%s']*(len(col_names))), | ||
278 | ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())) ), | ||
279 | tuple([seeking[n] for n in col_names] + seeking.values()) ) | ||
280 | |||
281 | if c.rowcount != 0: | ||
282 | # new id | ||
283 | c.execute('SELECT LAST_INSERT_ID()') | ||
284 | dp_id = c.fetchone().values()[0] | ||
285 | else: | ||
286 | # id already exists. look it up. | ||
287 | c.execute('SELECT id ' | ||
288 | 'FROM dp_ptested ' | ||
289 | 'WHERE %s' % (' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())) ), | ||
290 | tuple(seeking.values()) ) | ||
291 | dp_id = c.fetchone().values()[0] | ||
292 | |||
293 | return dp_id | ||
311 | 294 | ||
312 | def store_design_points(db_name, dps, clean): | 295 | def store_design_points(db_name, dps, clean, fix = False): |
313 | global dp_col_names | 296 | global dp_col_names |
314 | global dp_col_type_strs | 297 | global dp_col_type_strs |
315 | 298 | ||
299 | conn = connect_db(db_name) | ||
300 | |||
316 | dps = [dp_to_db(dp) for dp in dps] | 301 | dps = [dp_to_db(dp) for dp in dps] |
317 | nstored = 0 | 302 | nstored = 0 |
318 | 303 | ||
319 | conn = connect_db(db_name) | ||
320 | with conn: | 304 | with conn: |
321 | c = conn.cursor() | 305 | c = conn.cursor() |
322 | if clean: | 306 | if clean: |
@@ -324,19 +308,30 @@ def store_design_points(db_name, dps, clean): | |||
324 | [tuple(map(lambda x: d[x], dp_col_names)) for d in dps]) | 308 | [tuple(map(lambda x: d[x], dp_col_names)) for d in dps]) |
325 | nstored = len(dps) | 309 | nstored = len(dps) |
326 | else: | 310 | else: |
327 | col_names = copy.deepcopy(dp_col_names) | 311 | col_names = [n for n in dp_col_names if n != 'ts_util'] |
328 | col_names.pop(0) # remove ts_util | ||
329 | # store entire dp set in a temporary staging table | 312 | # store entire dp set in a temporary staging table |
330 | c.execute('CREATE TEMPORARY TABLE tmp_dp LIKE dp_pending') | 313 | c.execute('CREATE TEMPORARY TABLE tmp_dp LIKE dp_pending') |
331 | c.executemany('INSERT INTO tmp_dp VALUES(NULL,0,%s)' % ','.join(['%s']*len(dp_col_names)), | 314 | # filter out dps that are already pending and not taken. |
332 | [tuple(map(lambda x: d[x], dp_col_names)) for d in dps]) | 315 | # entries taken in dp_pending may still be mirrored here. |
316 | c.executemany('INSERT INTO tmp_dp(id,taken,%s) ' | ||
317 | 'SELECT NULL,0,%s ' | ||
318 | 'FROM dual ' | ||
319 | 'WHERE NOT EXISTS ( ' | ||
320 | 'SELECT 1 ' | ||
321 | 'FROM dp_pending ' | ||
322 | 'WHERE taken=0 AND %s )' % ( ','.join(dp_col_names), | ||
323 | ','.join(['%s']*len(dp_col_names)), | ||
324 | ' AND '.join(map(lambda x: '%s=%%s' % x, dp_col_names)) ), | ||
325 | [tuple(map(lambda x: d[x], dp_col_names)*2) for d in dps]) | ||
333 | # mask out the dps that have already been processed | 326 | # mask out the dps that have already been processed |
334 | c.execute('UPDATE tmp_dp as S JOIN sched_results as R on S.ts_util=R.ts_util JOIN dp_ptested as T on R.dp=T.id ' | 327 | c.execute('UPDATE tmp_dp as S JOIN sched_results as R on S.ts_util=R.ts_util JOIN dp_ptested as T on R.dp=T.id ' |
335 | 'SET S.taken=1 WHERE %s' % ' AND '.join(map(lambda x: 'S.%s=T.%s' % (x,x), col_names))) | 328 | 'SET S.taken=1 WHERE %s' % ' AND '.join(map(lambda x: 'S.%s=T.%s' % (x,x), col_names))) |
336 | # mask out the dps that are already pending | 329 | # reset dp_pending.taken to 0 if the dp is actually unfinished, and mask out the duplicate in tmp_dp |
337 | c.execute('UPDATE tmp_dp as S JOIN dp_pending as P on %s SET S.taken=1' % ' AND '.join(map(lambda x: 'S.%s=P.%s' % (x,x), dp_col_names))) | 330 | c.execute('UPDATE dp_pending as P JOIN tmp_dp as S on %s ' |
331 | 'SET P.taken=0, S.taken=1 ' | ||
332 | 'WHERE P.taken=1 AND S.taken=0' % ' AND '.join(map(lambda x: 'P.%s=S.%s' % (x,x), dp_col_names))) | ||
338 | # copy any remaining dps into the pending table | 333 | # copy any remaining dps into the pending table |
339 | c.execute('INSERT INTO dp_pending(taken,%s) SELECT taken,%s FROM tmp_dp WHERE tmp_dp.taken=0' % (','.join(dp_col_names), ','.join(dp_col_names))) | 334 | c.execute('INSERT INTO dp_pending(id,taken,%s) SELECT NULL,0,%s FROM tmp_dp WHERE tmp_dp.taken=0' % (','.join(dp_col_names), ','.join(dp_col_names))) |
340 | nstored = c.rowcount | 335 | nstored = c.rowcount |
341 | c.execute('DROP TEMPORARY TABLE tmp_dp') | 336 | c.execute('DROP TEMPORARY TABLE tmp_dp') |
342 | c.close() | 337 | c.close() |
@@ -351,9 +346,83 @@ def num_pending_design_points(db_name): | |||
351 | c.close() | 346 | c.close() |
352 | return npending | 347 | return npending |
353 | 348 | ||
354 | def get_design_points(db_name, ndp = 1): | 349 | def ___get_design_points(conn, ndp): |
355 | global dp_col_names | 350 | global dp_col_names |
356 | global dp_col_type_strs | 351 | fetched = [] |
352 | c = conn.cursor(db.cursors.DictCursor) | ||
353 | #################### | ||
354 | begin_sync(conn, c) | ||
355 | c.execute('SELECT id FROM dp_pending WHERE taken=0 LIMIT %s FOR UPDATE', (ndp,)) | ||
356 | dp_ids = [d['id'] for d in c.fetchall()] | ||
357 | nfetched = len(dp_ids) if dp_ids else 0 | ||
358 | if nfetched > 0: | ||
359 | c.execute('UPDATE dp_pending SET taken=1 WHERE %s' % ' OR '.join(['id=%s']*len(dp_ids)), tuple(dp_ids)) | ||
360 | end_sync(conn) | ||
361 | #################### | ||
362 | if nfetched > 0: | ||
363 | c.execute('SELECT %s FROM dp_pending WHERE %s' % (','.join(dp_col_names), ' OR '.join(['id=%s']*len(dp_ids))), tuple(dp_ids)) | ||
364 | fetched = [db_to_dp(d) for d in c.fetchall()] | ||
365 | c.close() | ||
366 | return fetched, nfetched | ||
367 | |||
368 | |||
369 | #def ___get_design_points(conn, ndp): | ||
370 | # global dp_col_names | ||
371 | # fetched = [] | ||
372 | # c = conn.cursor(db.cursors.DictCursor) | ||
373 | # # get a unique id to prevent races | ||
374 | # c.execute('INSERT INTO dp_id_gen DEFAULT VALUES') | ||
375 | # begin_sync(conn, c) | ||
376 | # # try to get some ids of up to 'ndp' design points | ||
377 | # c.execute('SELECT id FROM dp_pending WHERE taken=0 LIMIT %s FOR UPDATE', (ndp,)) | ||
378 | # dp_ids = [d['id'] for d in c.fetchall()] | ||
379 | # nfetched = len(dp_ids) if dp_ids else 0 | ||
380 | # nclaimed = 0 | ||
381 | # if nfetched > 0: | ||
382 | # # if there was work... | ||
383 | # # take fetched rows and assign our unique id | ||
384 | # # conjunction dp ids with 'taken=0' elimates races | ||
385 | # c.execute('UPDATE dp_pending SET taken=LAST_INSERT_ID() WHERE (taken=0) AND (%s)' % ' OR '.join(['id=%s']*len(dp_ids)), tuple(dp_ids)) | ||
386 | # nclaimed = c.rowcount | ||
387 | # end_sync(conn) | ||
388 | # if nclaimed > 0: | ||
389 | # # fetch our design points | ||
390 | # c.execute('SELECT %s FROM dp_pending WHERE taken=LAST_INSERT_ID()' % (','.join(dp_col_names))) | ||
391 | # fetched = [db_to_dp(d) for d in c.fetchall()] | ||
392 | # # reset taken to be '1' | ||
393 | # c.execute('UPDATE dp_pending SET taken=1 WHERE taken=LAST_INSERT_ID()') | ||
394 | # # release our id | ||
395 | # c.execute('DELETE FROM dp_id_gen WHERE id=LAST_INSERT_ID()') | ||
396 | # c.close() | ||
397 | # if nfetched > 0 and nclaimed == 0: | ||
398 | # # there was a race of some sort | ||
399 | # nfetched = = -1 | ||
400 | # return fetched, nfetched | ||
401 | |||
402 | def __get_design_points(conn, ndp = 1): | ||
403 | failcount = 0 | ||
404 | nfetched = -1 | ||
405 | fetched = [] | ||
406 | |||
407 | while nfetched < 0 and failcount < 5: | ||
408 | |||
409 | # sleep a moment before we retry | ||
410 | if failcount > 0: | ||
411 | time_to_sleep = (random.random()*20 + 5)*failcount | ||
412 | time_to_sleep = min(time_to_sleep, 4*60) | ||
413 | print 'retrying ___get_design_points due to race. sleep = %.3f' % time_to_sleep | ||
414 | time.sleep(time_to_sleep) | ||
415 | |||
416 | # try to get design points | ||
417 | fetched, nfetched = ___get_design_points(conn, ndp) | ||
418 | |||
419 | if nfetched < 0: | ||
420 | failcount += 1 | ||
421 | |||
422 | return fetched | ||
423 | |||
424 | |||
425 | def get_design_points(db_name, ndp = 1): | ||
357 | global max_fail | 426 | global max_fail |
358 | fetched = [] | 427 | fetched = [] |
359 | 428 | ||
@@ -368,24 +437,9 @@ def get_design_points(db_name, ndp = 1): | |||
368 | 437 | ||
369 | conn = connect_db(db_name) | 438 | conn = connect_db(db_name) |
370 | with conn: | 439 | with conn: |
371 | ################ | 440 | fetched = __get_design_points(conn, ndp) |
372 | c = conn.cursor(db.cursors.DictCursor) | ||
373 | |||
374 | begin_sync(conn, c) | ||
375 | c.execute('SELECT id FROM dp_pending WHERE taken=0 LIMIT %s FOR UPDATE', (ndp,)) | ||
376 | dp_ids = [d['id'] for d in c.fetchall()] | ||
377 | nfetched = len(dp_ids) if dp_ids else 0 | ||
378 | if nfetched > 0: | ||
379 | c.execute('UPDATE dp_pending SET taken=1 WHERE %s' % ' OR '.join(['id=%s']*len(dp_ids)), tuple(dp_ids)) | ||
380 | end_sync(conn) | ||
381 | |||
382 | if nfetched > 0: | ||
383 | c.execute('SELECT %s FROM dp_pending WHERE %s' % (','.join(dp_col_names), ' OR '.join(['id=%s']*len(dp_ids))), tuple(dp_ids)) | ||
384 | fetched = [db_to_dp(d) for d in c.fetchall()] | ||
385 | |||
386 | c.close() | ||
387 | break | 441 | break |
388 | ############## | 442 | |
389 | except db.OperationalError, e: | 443 | except db.OperationalError, e: |
390 | print e | 444 | print e |
391 | # make sure the db connection is closed | 445 | # make sure the db connection is closed |
@@ -445,7 +499,7 @@ def __store_eff_sched_results(c, dp_id, stats): | |||
445 | c.execute('INSERT INTO scaled_sched_results ' | 499 | c.execute('INSERT INTO scaled_sched_results ' |
446 | 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)', | 500 | 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)', |
447 | (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth)) | 501 | (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth)) |
448 | 502 | ||
449 | def store_sched_results(db_name, data, ndp = 0): | 503 | def store_sched_results(db_name, data, ndp = 0): |
450 | global dp_col_names | 504 | global dp_col_names |
451 | global dp_col_type_strs | 505 | global dp_col_type_strs |
@@ -456,6 +510,8 @@ def store_sched_results(db_name, data, ndp = 0): | |||
456 | 510 | ||
457 | conn = None | 511 | conn = None |
458 | failcount = 0 | 512 | failcount = 0 |
513 | saved = False | ||
514 | |||
459 | while True: | 515 | while True: |
460 | try: | 516 | try: |
461 | # data is a list of storage(), with fields 'dp, 'sched_stats', and 'eff_sched_stats'. | 517 | # data is a list of storage(), with fields 'dp, 'sched_stats', and 'eff_sched_stats'. |
@@ -464,43 +520,41 @@ def store_sched_results(db_name, data, ndp = 0): | |||
464 | conn = connect_db(db_name) | 520 | conn = connect_db(db_name) |
465 | 521 | ||
466 | if col_names is None: | 522 | if col_names is None: |
467 | col_names = copy.deepcopy(dp_col_names) | 523 | col_names = [n for n in dp_col_names if n != 'id'] |
468 | col_names.pop(0) # remove the id field | ||
469 | 524 | ||
470 | with conn: | 525 | with conn: |
471 | c = conn.cursor(db.cursors.DictCursor) | 526 | c = conn.cursor(db.cursors.DictCursor) |
527 | if not saved: | ||
528 | # get IDs for all design points | ||
529 | dp_ids = [lookup_dp_id(c, d.dp) for d in data] | ||
472 | 530 | ||
473 | # get IDs for all design points | 531 | # insert the normal sched data in one go |
474 | dp_ids = [__get_dp_id(c, d.dp, add_if_missing = True) for d in data] | ||
475 | # insert the normal sched data in one go | ||
476 | c.executemany('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)', | ||
477 | [(dp_id, ts_util, stats.avg_sched, stats.ntested, stats.nsched, stats.avg_tard, stats.avg_bandwidth) | ||
478 | for dp_id, ts_util, stats in zip(dp_ids, [d.dp.sys_util for d in data], [d.sched_stats for d in data])]) | ||
479 | |||
480 | if d.eff_sched_stats is not None: | ||
481 | begin_sync(conn, c) | 532 | begin_sync(conn, c) |
482 | for dp_id, d in zip(dp_ids, data): | 533 | |
483 | __store_eff_sched_results(c, dp_id, d.eff_sched_stats) | 534 | # save results |
535 | c.executemany('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)', | ||
536 | [(dp_id, ts_util, stats.avg_sched, stats.ntested, stats.nsched, stats.avg_tard, stats.avg_bandwidth) | ||
537 | for dp_id, ts_util, stats in zip(dp_ids, [d.dp.sys_util for d in data], [d.sched_stats for d in data])]) | ||
538 | |||
539 | # save scaled results | ||
540 | if d.eff_sched_stats is not None: | ||
541 | for dp_id, d in zip(dp_ids, data): | ||
542 | __store_eff_sched_results(c, dp_id, d.eff_sched_stats) | ||
543 | |||
484 | end_sync(conn) | 544 | end_sync(conn) |
545 | saved = True | ||
485 | 546 | ||
486 | if ndp > 0: | 547 | if ndp > 0: |
487 | begin_sync(conn, c) | 548 | fetched = __get_design_points(conn, ndp) |
488 | c.execute('SELECT id FROM dp_pending WHERE taken=0 LIMIT %s FOR UPDATE', (ndp,)) | ||
489 | dp_ids = [d['id'] for d in c.fetchall()] | ||
490 | nfetched = len(dp_ids) if dp_ids else 0 | ||
491 | if nfetched > 0: | ||
492 | c.execute('UPDATE dp_pending SET taken=1 WHERE %s' % ' OR '.join(['id=%s']*len(dp_ids)), tuple(dp_ids)) | ||
493 | end_sync(conn) | ||
494 | 549 | ||
495 | if nfetched > 0: | ||
496 | c.execute('SELECT %s FROM dp_pending WHERE %s' % (','.join(dp_col_names), ' OR '.join(['id=%s']*len(dp_ids))), tuple(dp_ids)) | ||
497 | fetched = [db_to_dp(d) for d in c.fetchall()] | ||
498 | c.close() | 550 | c.close() |
499 | # success! | ||
500 | break | 551 | break |
501 | except db.OperationalError, e: | 552 | except db.OperationalError, e: |
502 | print e | 553 | print e |
554 | if c is not None: | ||
555 | print c._last_executed | ||
503 | if conn is not None: | 556 | if conn is not None: |
557 | rollback(conn) | ||
504 | conn.close() | 558 | conn.close() |
505 | del conn | 559 | del conn |
506 | conn = None | 560 | conn = None |