aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGlenn Elliott <gelliott@cs.unc.edu>2014-09-24 21:49:40 -0400
committerGlenn Elliott <gelliott@cs.unc.edu>2014-09-24 21:49:40 -0400
commitb4af97a84c6e0205d45c5356a564ed2d17f2b1dc (patch)
tree29f2d72dfe282a8a68501e1f0b6dba415cefbd69
parent9c09bb4362955af2e81ea5c475cdb9ea166f6a31 (diff)
Fix race in dp ID generation in dp_ptested
Race allowed a single design point to get multiple dp ids in dp_ptested.
-rwxr-xr-xrtss14/database.py266
1 files changed, 160 insertions, 106 deletions
diff --git a/rtss14/database.py b/rtss14/database.py
index c0b099a..e9fe99b 100755
--- a/rtss14/database.py
+++ b/rtss14/database.py
@@ -43,6 +43,8 @@ def init_distr_mapper(conn):
43 break 43 break
44 except db.OperationalError, e: 44 except db.OperationalError, e:
45 print e 45 print e
46 if c is not None:
47 print c._last_executed
46 elapsed = time.time() - start 48 elapsed = time.time() - start
47 if elapsed > timeout: 49 if elapsed > timeout:
48 raise 50 raise
@@ -78,6 +80,8 @@ def init_dp_col_names(conn):
78 break 80 break
79 except db.OperationalError, e: 81 except db.OperationalError, e:
80 print e 82 print e
83 if c is not None:
84 print c._last_executed
81 elapsed = time.time() - start 85 elapsed = time.time() - start
82 if elapsed > timeout: 86 if elapsed > timeout:
83 raise 87 raise
@@ -192,6 +196,8 @@ def begin_sync(conn, c):
192 break 196 break
193 except db.OperationalError, e: 197 except db.OperationalError, e:
194 print e 198 print e
199 if c is not None:
200 print c._last_executed
195 elapsed = time.time() - start 201 elapsed = time.time() - start
196 if elapsed > transaction_timeout: 202 if elapsed > transaction_timeout:
197 raise 203 raise
@@ -205,6 +211,9 @@ def end_sync(conn):
205 conn.autocommit(True) 211 conn.autocommit(True)
206 return success 212 return success
207 213
214def rollback(conn):
215 conn.rollback()
216 conn.autocommit(True)
208 217
209def connect_db(db_name): 218def connect_db(db_name):
210 global distr_mapper 219 global distr_mapper
@@ -249,74 +258,49 @@ def clear_tables(db_name):
249 c.close() 258 c.close()
250 ################ 259 ################
251 260
252def __get_dp_id(c, dp, add_if_missing = False): 261def lookup_dp_id(c, dp):
253 global dp_col_names 262 global dp_col_names
254 global dp_col_type_strs 263 global dp_col_type_strs
255 seeking = dp_to_db(dp) 264 seeking = dp_to_db(dp)
256 seeking.pop('ts_util', None) 265 seeking.pop('ts_util', None)
257 query = 'SELECT id FROM dp_ptested WHERE %s' % ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys()))
258 c.execute(query, tuple(seeking.values()))
259 row = c.fetchone()
260 if row:
261 return row['id']
262 elif add_if_missing == False:
263 return None
264 # add the design point
265 col_names = copy.deepcopy(dp_col_names)
266 col_names.pop(0) # remove the id field
267 c.execute('INSERT INTO dp_ptested VALUES(NULL,%s)' % ','.join(['%s']*len(col_names)),
268 tuple(map(lambda x: seeking[x], col_names)))
269 # get the assigned id
270 c.execute(query, tuple(seeking.values()))
271 row = c.fetchone()
272 return row['id']
273
274#def __already_pending(dp, conn):
275# seeking = dp_to_db(dp)
276# c = conn.cursor()
277# c.execute('SELECT COUNT(*) FROM dp_pending WHERE taken=0 AND %s' % ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())),
278# tuple(seeking.values()))
279# row = c.fetchone()
280# processed = bool(row[0])
281# c.close()
282# return processed
283
284#def already_pending(dp, conn = None, db_name = None):
285# if conn:
286# return __already_pending(dp, conn)
287# else:
288# conn = connect_db(db_name)
289# with conn:
290# return __already_pending(dp, conn)
291
292#def __already_processed(dp, conn):
293# seeking = dp_to_db(dp)
294# seeking.pop('ts_util', None)
295# c = conn.cursor()
296# c.execute('SELECT COUNT(*) FROM sched_results as R JOIN dp_ptested as K on R.dp=K.id '
297# 'WHERE R.ts_util=%%s AND %s' % ' AND '.join(map(lambda x: 'K.%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())),
298# ((dp.sys_util,) + tuple(seeking.values())))
299# row = c.fetchone()
300# processed = bool(row[0])
301# c.close()
302# return processed
303 266
304#def already_processed(dp, conn = None, db_name = None): 267 col_names = [n for n in dp_col_names if n != 'ts_util']
305# if conn: 268
306# return __already_processed(dp, conn) 269 # atomically add new dp to dp_ptested. fails if dp already in table
307# else: 270 c.execute('INSERT INTO dp_ptested(id,%s) '
308# conn = connect_db(db_name) 271 'SELECT NULL,%s '
309# with conn: 272 'FROM dual '
310# return __already_processed(dp, conn) 273 'WHERE NOT EXISTS ( '
274 'SELECT 1 '
275 'FROM dp_ptested '
276 'WHERE %s )' % ( ','.join(col_names),
277 ','.join(['%s']*(len(col_names))),
278 ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())) ),
279 tuple([seeking[n] for n in col_names] + seeking.values()) )
280
281 if c.rowcount != 0:
282 # new id
283 c.execute('SELECT LAST_INSERT_ID()')
284 dp_id = c.fetchone().values()[0]
285 else:
286 # id already exists. look it up.
287 c.execute('SELECT id '
288 'FROM dp_ptested '
289 'WHERE %s' % (' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())) ),
290 tuple(seeking.values()) )
291 dp_id = c.fetchone().values()[0]
292
293 return dp_id
311 294
312def store_design_points(db_name, dps, clean): 295def store_design_points(db_name, dps, clean, fix = False):
313 global dp_col_names 296 global dp_col_names
314 global dp_col_type_strs 297 global dp_col_type_strs
315 298
299 conn = connect_db(db_name)
300
316 dps = [dp_to_db(dp) for dp in dps] 301 dps = [dp_to_db(dp) for dp in dps]
317 nstored = 0 302 nstored = 0
318 303
319 conn = connect_db(db_name)
320 with conn: 304 with conn:
321 c = conn.cursor() 305 c = conn.cursor()
322 if clean: 306 if clean:
@@ -324,19 +308,30 @@ def store_design_points(db_name, dps, clean):
324 [tuple(map(lambda x: d[x], dp_col_names)) for d in dps]) 308 [tuple(map(lambda x: d[x], dp_col_names)) for d in dps])
325 nstored = len(dps) 309 nstored = len(dps)
326 else: 310 else:
327 col_names = copy.deepcopy(dp_col_names) 311 col_names = [n for n in dp_col_names if n != 'ts_util']
328 col_names.pop(0) # remove ts_util
329 # store entire dp set in a temporary staging table 312 # store entire dp set in a temporary staging table
330 c.execute('CREATE TEMPORARY TABLE tmp_dp LIKE dp_pending') 313 c.execute('CREATE TEMPORARY TABLE tmp_dp LIKE dp_pending')
331 c.executemany('INSERT INTO tmp_dp VALUES(NULL,0,%s)' % ','.join(['%s']*len(dp_col_names)), 314 # filter out dps that are already pending and not taken.
332 [tuple(map(lambda x: d[x], dp_col_names)) for d in dps]) 315 # entries taken in dp_pending may still be mirrored here.
316 c.executemany('INSERT INTO tmp_dp(id,taken,%s) '
317 'SELECT NULL,0,%s '
318 'FROM dual '
319 'WHERE NOT EXISTS ( '
320 'SELECT 1 '
321 'FROM dp_pending '
322 'WHERE taken=0 AND %s )' % ( ','.join(dp_col_names),
323 ','.join(['%s']*len(dp_col_names)),
324 ' AND '.join(map(lambda x: '%s=%%s' % x, dp_col_names)) ),
325 [tuple(map(lambda x: d[x], dp_col_names)*2) for d in dps])
333 # mask out the dps that have already been processed 326 # mask out the dps that have already been processed
334 c.execute('UPDATE tmp_dp as S JOIN sched_results as R on S.ts_util=R.ts_util JOIN dp_ptested as T on R.dp=T.id ' 327 c.execute('UPDATE tmp_dp as S JOIN sched_results as R on S.ts_util=R.ts_util JOIN dp_ptested as T on R.dp=T.id '
335 'SET S.taken=1 WHERE %s' % ' AND '.join(map(lambda x: 'S.%s=T.%s' % (x,x), col_names))) 328 'SET S.taken=1 WHERE %s' % ' AND '.join(map(lambda x: 'S.%s=T.%s' % (x,x), col_names)))
336 # mask out the dps that are already pending 329 # reset dp_pending.taken to 0 if the dp is actually unfinished, and mask out the duplicate in tmp_dp
337 c.execute('UPDATE tmp_dp as S JOIN dp_pending as P on %s SET S.taken=1' % ' AND '.join(map(lambda x: 'S.%s=P.%s' % (x,x), dp_col_names))) 330 c.execute('UPDATE dp_pending as P JOIN tmp_dp as S on %s '
331 'SET P.taken=0, S.taken=1 '
332 'WHERE P.taken=1 AND S.taken=0' % ' AND '.join(map(lambda x: 'P.%s=S.%s' % (x,x), dp_col_names)))
338 # copy any remaining dps into the pending table 333 # copy any remaining dps into the pending table
339 c.execute('INSERT INTO dp_pending(taken,%s) SELECT taken,%s FROM tmp_dp WHERE tmp_dp.taken=0' % (','.join(dp_col_names), ','.join(dp_col_names))) 334 c.execute('INSERT INTO dp_pending(id,taken,%s) SELECT NULL,0,%s FROM tmp_dp WHERE tmp_dp.taken=0' % (','.join(dp_col_names), ','.join(dp_col_names)))
340 nstored = c.rowcount 335 nstored = c.rowcount
341 c.execute('DROP TEMPORARY TABLE tmp_dp') 336 c.execute('DROP TEMPORARY TABLE tmp_dp')
342 c.close() 337 c.close()
@@ -351,9 +346,83 @@ def num_pending_design_points(db_name):
351 c.close() 346 c.close()
352 return npending 347 return npending
353 348
354def get_design_points(db_name, ndp = 1): 349def ___get_design_points(conn, ndp):
355 global dp_col_names 350 global dp_col_names
356 global dp_col_type_strs 351 fetched = []
352 c = conn.cursor(db.cursors.DictCursor)
353 ####################
354 begin_sync(conn, c)
355 c.execute('SELECT id FROM dp_pending WHERE taken=0 LIMIT %s FOR UPDATE', (ndp,))
356 dp_ids = [d['id'] for d in c.fetchall()]
357 nfetched = len(dp_ids) if dp_ids else 0
358 if nfetched > 0:
359 c.execute('UPDATE dp_pending SET taken=1 WHERE %s' % ' OR '.join(['id=%s']*len(dp_ids)), tuple(dp_ids))
360 end_sync(conn)
361 ####################
362 if nfetched > 0:
363 c.execute('SELECT %s FROM dp_pending WHERE %s' % (','.join(dp_col_names), ' OR '.join(['id=%s']*len(dp_ids))), tuple(dp_ids))
364 fetched = [db_to_dp(d) for d in c.fetchall()]
365 c.close()
366 return fetched, nfetched
367
368
369#def ___get_design_points(conn, ndp):
370# global dp_col_names
371# fetched = []
372# c = conn.cursor(db.cursors.DictCursor)
373# # get a unique id to prevent races
374# c.execute('INSERT INTO dp_id_gen DEFAULT VALUES')
375# begin_sync(conn, c)
376# # try to get some ids of up to 'ndp' design points
377# c.execute('SELECT id FROM dp_pending WHERE taken=0 LIMIT %s FOR UPDATE', (ndp,))
378# dp_ids = [d['id'] for d in c.fetchall()]
379# nfetched = len(dp_ids) if dp_ids else 0
380# nclaimed = 0
381# if nfetched > 0:
382# # if there was work...
383# # take fetched rows and assign our unique id
384# # conjunction dp ids with 'taken=0' elimates races
385# c.execute('UPDATE dp_pending SET taken=LAST_INSERT_ID() WHERE (taken=0) AND (%s)' % ' OR '.join(['id=%s']*len(dp_ids)), tuple(dp_ids))
386# nclaimed = c.rowcount
387# end_sync(conn)
388# if nclaimed > 0:
389# # fetch our design points
390# c.execute('SELECT %s FROM dp_pending WHERE taken=LAST_INSERT_ID()' % (','.join(dp_col_names)))
391# fetched = [db_to_dp(d) for d in c.fetchall()]
392# # reset taken to be '1'
393# c.execute('UPDATE dp_pending SET taken=1 WHERE taken=LAST_INSERT_ID()')
394# # release our id
395# c.execute('DELETE FROM dp_id_gen WHERE id=LAST_INSERT_ID()')
396# c.close()
397# if nfetched > 0 and nclaimed == 0:
398# # there was a race of some sort
399# nfetched = = -1
400# return fetched, nfetched
401
402def __get_design_points(conn, ndp = 1):
403 failcount = 0
404 nfetched = -1
405 fetched = []
406
407 while nfetched < 0 and failcount < 5:
408
409 # sleep a moment before we retry
410 if failcount > 0:
411 time_to_sleep = (random.random()*20 + 5)*failcount
412 time_to_sleep = min(time_to_sleep, 4*60)
413 print 'retrying ___get_design_points due to race. sleep = %.3f' % time_to_sleep
414 time.sleep(time_to_sleep)
415
416 # try to get design points
417 fetched, nfetched = ___get_design_points(conn, ndp)
418
419 if nfetched < 0:
420 failcount += 1
421
422 return fetched
423
424
425def get_design_points(db_name, ndp = 1):
357 global max_fail 426 global max_fail
358 fetched = [] 427 fetched = []
359 428
@@ -368,24 +437,9 @@ def get_design_points(db_name, ndp = 1):
368 437
369 conn = connect_db(db_name) 438 conn = connect_db(db_name)
370 with conn: 439 with conn:
371 ################ 440 fetched = __get_design_points(conn, ndp)
372 c = conn.cursor(db.cursors.DictCursor)
373
374 begin_sync(conn, c)
375 c.execute('SELECT id FROM dp_pending WHERE taken=0 LIMIT %s FOR UPDATE', (ndp,))
376 dp_ids = [d['id'] for d in c.fetchall()]
377 nfetched = len(dp_ids) if dp_ids else 0
378 if nfetched > 0:
379 c.execute('UPDATE dp_pending SET taken=1 WHERE %s' % ' OR '.join(['id=%s']*len(dp_ids)), tuple(dp_ids))
380 end_sync(conn)
381
382 if nfetched > 0:
383 c.execute('SELECT %s FROM dp_pending WHERE %s' % (','.join(dp_col_names), ' OR '.join(['id=%s']*len(dp_ids))), tuple(dp_ids))
384 fetched = [db_to_dp(d) for d in c.fetchall()]
385
386 c.close()
387 break 441 break
388 ############## 442
389 except db.OperationalError, e: 443 except db.OperationalError, e:
390 print e 444 print e
391 # make sure the db connection is closed 445 # make sure the db connection is closed
@@ -445,7 +499,7 @@ def __store_eff_sched_results(c, dp_id, stats):
445 c.execute('INSERT INTO scaled_sched_results ' 499 c.execute('INSERT INTO scaled_sched_results '
446 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)', 500 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)',
447 (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth)) 501 (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth))
448 502
449def store_sched_results(db_name, data, ndp = 0): 503def store_sched_results(db_name, data, ndp = 0):
450 global dp_col_names 504 global dp_col_names
451 global dp_col_type_strs 505 global dp_col_type_strs
@@ -456,6 +510,8 @@ def store_sched_results(db_name, data, ndp = 0):
456 510
457 conn = None 511 conn = None
458 failcount = 0 512 failcount = 0
513 saved = False
514
459 while True: 515 while True:
460 try: 516 try:
461 # data is a list of storage(), with fields 'dp, 'sched_stats', and 'eff_sched_stats'. 517 # data is a list of storage(), with fields 'dp, 'sched_stats', and 'eff_sched_stats'.
@@ -464,43 +520,41 @@ def store_sched_results(db_name, data, ndp = 0):
464 conn = connect_db(db_name) 520 conn = connect_db(db_name)
465 521
466 if col_names is None: 522 if col_names is None:
467 col_names = copy.deepcopy(dp_col_names) 523 col_names = [n for n in dp_col_names if n != 'id']
468 col_names.pop(0) # remove the id field
469 524
470 with conn: 525 with conn:
471 c = conn.cursor(db.cursors.DictCursor) 526 c = conn.cursor(db.cursors.DictCursor)
527 if not saved:
528 # get IDs for all design points
529 dp_ids = [lookup_dp_id(c, d.dp) for d in data]
472 530
473 # get IDs for all design points 531 # insert the normal sched data in one go
474 dp_ids = [__get_dp_id(c, d.dp, add_if_missing = True) for d in data]
475 # insert the normal sched data in one go
476 c.executemany('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)',
477 [(dp_id, ts_util, stats.avg_sched, stats.ntested, stats.nsched, stats.avg_tard, stats.avg_bandwidth)
478 for dp_id, ts_util, stats in zip(dp_ids, [d.dp.sys_util for d in data], [d.sched_stats for d in data])])
479
480 if d.eff_sched_stats is not None:
481 begin_sync(conn, c) 532 begin_sync(conn, c)
482 for dp_id, d in zip(dp_ids, data): 533
483 __store_eff_sched_results(c, dp_id, d.eff_sched_stats) 534 # save results
535 c.executemany('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)',
536 [(dp_id, ts_util, stats.avg_sched, stats.ntested, stats.nsched, stats.avg_tard, stats.avg_bandwidth)
537 for dp_id, ts_util, stats in zip(dp_ids, [d.dp.sys_util for d in data], [d.sched_stats for d in data])])
538
539 # save scaled results
540 if d.eff_sched_stats is not None:
541 for dp_id, d in zip(dp_ids, data):
542 __store_eff_sched_results(c, dp_id, d.eff_sched_stats)
543
484 end_sync(conn) 544 end_sync(conn)
545 saved = True
485 546
486 if ndp > 0: 547 if ndp > 0:
487 begin_sync(conn, c) 548 fetched = __get_design_points(conn, ndp)
488 c.execute('SELECT id FROM dp_pending WHERE taken=0 LIMIT %s FOR UPDATE', (ndp,))
489 dp_ids = [d['id'] for d in c.fetchall()]
490 nfetched = len(dp_ids) if dp_ids else 0
491 if nfetched > 0:
492 c.execute('UPDATE dp_pending SET taken=1 WHERE %s' % ' OR '.join(['id=%s']*len(dp_ids)), tuple(dp_ids))
493 end_sync(conn)
494 549
495 if nfetched > 0:
496 c.execute('SELECT %s FROM dp_pending WHERE %s' % (','.join(dp_col_names), ' OR '.join(['id=%s']*len(dp_ids))), tuple(dp_ids))
497 fetched = [db_to_dp(d) for d in c.fetchall()]
498 c.close() 550 c.close()
499 # success!
500 break 551 break
501 except db.OperationalError, e: 552 except db.OperationalError, e:
502 print e 553 print e
554 if c is not None:
555 print c._last_executed
503 if conn is not None: 556 if conn is not None:
557 rollback(conn)
504 conn.close() 558 conn.close()
505 del conn 559 del conn
506 conn = None 560 conn = None