aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGlenn Elliott <gelliott@cs.unc.edu>2014-10-02 11:52:29 -0400
committerGlenn Elliott <gelliott@cs.unc.edu>2014-10-02 11:52:29 -0400
commita9e0c2a30411e282401082338e72baf4ddb81875 (patch)
tree40045cac11eb9f30121b5e8c847e934042a841ee
parentd0b039c266dca961537a97136518c4bc913ef1f9 (diff)
retry on transaction deadlock
-rwxr-xr-xrtss14/database.py230
1 files changed, 148 insertions, 82 deletions
diff --git a/rtss14/database.py b/rtss14/database.py
index 1939056..c2597f6 100755
--- a/rtss14/database.py
+++ b/rtss14/database.py
@@ -172,6 +172,7 @@ def dp_to_db_vals(dp):
172 vals[key] = to_db_val(value) 172 vals[key] = to_db_val(value)
173 return vals 173 return vals
174 174
175
175def begin_sync(conn, c): 176def begin_sync(conn, c):
176 global timeout 177 global timeout
177 start = time.time() 178 start = time.time()
@@ -216,6 +217,7 @@ def connect_db(db_name):
216 init_dp_col_names(conn) 217 init_dp_col_names(conn)
217 return conn 218 return conn
218 219
220
219def clear_tables(db_name): 221def clear_tables(db_name):
220 # tables to clear 222 # tables to clear
221 tables = ['sched_results', 'scaled_sched_results', 'dp_ptested', 'dp_pending'] 223 tables = ['sched_results', 'scaled_sched_results', 'dp_ptested', 'dp_pending']
@@ -230,43 +232,6 @@ def clear_tables(db_name):
230 ################ 232 ################
231 c.close() 233 c.close()
232 234
233def lookup_dp_id(c, dp):
234 global dp_col_names
235 global dp_col_type_strs
236 seeking = dp_to_db(dp)
237 seeking.pop('ts_util', None)
238
239 col_names = [n for n in dp_col_names if n != 'ts_util']
240
241 # atomically add new dp to dp_ptested. fails if dp already in table
242 c.execute('INSERT INTO dp_ptested(id,%s) '
243 'SELECT NULL,%s '
244 'FROM dual '
245 'WHERE NOT EXISTS ( '
246 'SELECT 1 '
247 'FROM dp_ptested '
248 'WHERE %s )' % ( ','.join(col_names),
249 ','.join(['%s']*(len(col_names))),
250 ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())) ),
251 tuple([seeking[n] for n in col_names] + seeking.values()) )
252
253 if c.rowcount != 0:
254 # new id
255 c.execute('SELECT LAST_INSERT_ID()')
256 row = c.fetchone()
257 assert row is not None
258 dp_id = row.values()[0]
259 else:
260 # id already exists. look it up.
261 c.execute('SELECT id '
262 'FROM dp_ptested '
263 'WHERE %s' % (' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())) ),
264 tuple(seeking.values()) )
265 row = c.fetchone()
266 assert row is not None
267 dp_id = row['id']
268 return dp_id
269
270 235
271def __repair_design_points(conn): 236def __repair_design_points(conn):
272 global dp_col_names 237 global dp_col_names
@@ -288,6 +253,7 @@ def repair_design_points(db_name):
288 nrepaired = __repair_design_points(conn) 253 nrepaired = __repair_design_points(conn)
289 return nrepaired 254 return nrepaired
290 255
256
291repaired = False 257repaired = False
292def store_design_points(db_name, dps, clean): 258def store_design_points(db_name, dps, clean):
293 global dp_col_names 259 global dp_col_names
@@ -335,6 +301,7 @@ def store_design_points(db_name, dps, clean):
335 c.close() 301 c.close()
336 return nstored 302 return nstored
337 303
304
338def num_pending_design_points(db_name): 305def num_pending_design_points(db_name):
339 conn = connect_db(db_name) 306 conn = connect_db(db_name)
340 with conn: 307 with conn:
@@ -344,27 +311,49 @@ def num_pending_design_points(db_name):
344 c.close() 311 c.close()
345 return npending 312 return npending
346 313
314
347def __get_design_points(conn, ndp): 315def __get_design_points(conn, ndp):
348 global dp_col_names 316 global dp_col_names
349 fetched = [] 317 fetched = []
350 c = conn.cursor(db.cursors.DictCursor) 318 c = conn.cursor(db.cursors.DictCursor)
351 #################### 319 ####################
352 try: 320 done = False
353 begin_sync(conn, c) 321 while not done:
354 c.execute('SELECT id,%s FROM dp_pending WHERE taken=0 LIMIT %%s FOR UPDATE' % ','.join(dp_col_names), (ndp,)) 322
355 rows = c.fetchall() 323 try:
356 nfetched = c.rowcount 324 begin_sync(conn, c)
357 if nfetched > 0: 325 c.execute('SELECT id,%s FROM dp_pending WHERE taken=0 LIMIT %%s FOR UPDATE' % ','.join(dp_col_names), (ndp,))
358 dp_ids = [r['id'] for r in rows] 326 rows = c.fetchall()
359 c.execute('UPDATE dp_pending SET taken=1 WHERE id IN (%s)' % ','.join(['%s']*len(dp_ids)), tuple(dp_ids)) 327 nfetched = c.rowcount
360 end_sync(conn) 328 if nfetched > 0:
361 fetched = [db_to_dp(r) for r in rows] 329 dp_ids = [r['id'] for r in rows]
362 except db.OperationalError, e: 330 c.execute('UPDATE dp_pending SET taken=1 WHERE id IN (%s)' % ','.join(['%s']*len(dp_ids)), tuple(dp_ids))
363 print e 331 end_sync(conn)
364 print 'Last Query: %s' % c._last_executed 332 fetched = [db_to_dp(r) for r in rows]
365 rollback(conn) 333 done = True
366 c.close() 334
367 raise e 335 except db.OperationalError, e:
336 errcode = e[0]
337 if errcode == 1213:
338 # deadlock - retry
339 print '(__get_design_points) Error is transaction deadlock. Will retry.'
340 print '(__get_design_points) Last query: %s' % c._last_executed
341 rollback(conn)
342 else:
343 print e
344 print '(__get_design_points) Last query: %s' % c._last_executed
345 rollback(conn)
346 c.close()
347 # rethrow
348 raise e
349
350 except BaseException, e:
351 print e
352 print '(__get_design_points) Last query: %s' % c._last_executed
353 rollback(conn)
354 c.close()
355 raise e
356
368 c.close() 357 c.close()
369 return fetched 358 return fetched
370 359
@@ -375,6 +364,60 @@ def get_design_points(db_name, ndp = 1):
375 fetched = __get_design_points(conn, ndp) 364 fetched = __get_design_points(conn, ndp)
376 return fetched 365 return fetched
377 366
367
368def lookup_dp_id(c, dp):
369 global dp_col_names
370 global dp_col_type_strs
371 seeking = dp_to_db(dp)
372 seeking.pop('ts_util', None)
373
374 col_names = [n for n in dp_col_names if n != 'ts_util']
375
376 dp_id = None
377 while dp_id is None:
378 try:
379 # atomically add new dp to dp_ptested. fails if dp already in table
380 c.execute('INSERT INTO dp_ptested(id,%s) '
381 'SELECT NULL,%s '
382 'FROM dual '
383 'WHERE NOT EXISTS ( '
384 'SELECT 1 '
385 'FROM dp_ptested '
386 'WHERE %s )' % ( ','.join(col_names),
387 ','.join(['%s']*(len(col_names))),
388 ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())) ),
389 tuple([seeking[n] for n in col_names] + seeking.values()) )
390
391 if c.rowcount != 0:
392 # new id
393 c.execute('SELECT LAST_INSERT_ID()')
394 row = c.fetchone()
395 assert row is not None
396 dp_id = row.values()[0]
397 else:
398 # id already exists. look it up.
399 c.execute('SELECT id '
400 'FROM dp_ptested '
401 'WHERE %s' % (' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())) ),
402 tuple(seeking.values()) )
403 row = c.fetchone()
404 assert row is not None
405 dp_id = row['id']
406
407 except db.OperationalError, e:
408 errcode = e[0]
409 if errcode == 1213:
410 # deadlock - retry
411 print '(lookup_dp_id) Error is transaction deadlock. Will retry.'
412 print '(lookup_dp_id) Last query: %s' % c._last_executed
413 else:
414 print e
415 print '(lookup_dp_id) Last query: %s' % c._last_executed
416 raise e
417
418 return dp_id
419
420
378def __store_eff_sched_results(c, dp_id, stats): 421def __store_eff_sched_results(c, dp_id, stats):
379 for factor,eff_curve in stats.iteritems(): 422 for factor,eff_curve in stats.iteritems():
380 for eff_ts_util,sched in eff_curve.iteritems(): 423 for eff_ts_util,sched in eff_curve.iteritems():
@@ -406,53 +449,76 @@ def __store_eff_sched_results(c, dp_id, stats):
406 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)', 449 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)',
407 (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth)) 450 (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth))
408 451
452def __store_sched_result(c, dp_id, ts_util, stats):
453 c.execute('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)',
454 (dp_id, ts_util,
455 stats.avg_sched,
456 stats.ntested,
457 stats.nsched,
458 stats.avg_tard,
459 stats.avg_bandwidth) )
460
461def store_sched_result(c, dp_id, result):
462 done = False
463 while not done:
464 try:
465 begin_sync(conn, c)
466 __store_sched_result(c, dp_id, result.dp.sys_util, result.sched_stats)
467 if result.eff_sched_stats is not None:
468 __store_eff_sched_results(c, dp_id, result.eff_sched_stats)
469 end_sync(conn)
470 done = True
471
472 except db.OperationalError, e:
473 errcode = e[0]
474 if errcode == 1213:
475 # deadlock - retry
476 print '(store_sched_result) Error is transaction deadlock. Will retry.'
477 print '(store_sched_result) Last query: %s' % c._last_executed
478 rollback(conn)
479 else:
480 print e
481 print '(store_sched_result) Last query: %s' % c._last_executed
482 rollback(conn)
483 raise e
484
485 except BaseException, e:
486 print e
487 print '(store_sched_result) Last query: %s' % c._last_executed
488 rollback(conn)
489 raise e
490
409def store_sched_results(db_name, data, ndp = 0): 491def store_sched_results(db_name, data, ndp = 0):
410 global dp_col_names 492 global dp_col_names
411 global dp_col_type_strs
412 col_names = None
413 fetched = [] 493 fetched = []
414 494
415 conn = connect_db(db_name) 495 conn = connect_db(db_name)
416
417 if col_names is None:
418 col_names = [n for n in dp_col_names if n != 'id']
419
420 with conn: 496 with conn:
421 # data is a list of storage(), with fields 'dp, 'sched_stats', and 'eff_sched_stats'. 497 # data is a list of storage(), with fields 'dp, 'sched_stats', and 'eff_sched_stats'.
422 # 'eff_sched_stats' is a dictionary keyed on scale-factor. stats in 'eff_sched_stats' 498 # 'eff_sched_stats' is a dictionary keyed on scale-factor. stats in 'eff_sched_stats'
423 # must be merged with prior results in the 'scaled_sched_results' table 499 # must be merged with prior results in the 'scaled_sched_results' table
424 c = conn.cursor(db.cursors.DictCursor) 500 c = conn.cursor(db.cursors.DictCursor)
425 try:
426 bundle = [(lookup_dp_id(c, d.dp), d) for d in data]
427 501
502 col_names = [n for n in dp_col_names if n != 'id']
503
504 try:
428 # get IDs for all design points. 505 # get IDs for all design points.
429 # sort to ensure that we update scaled design points 506 # sort to reduce (eliminate?) chance of deadlock.
430 # in a consistent order (no deadlocks!). 507 id_and_data = sorted([(lookup_dp_id(c, d.dp), d) for d in data], key=lambda x: x[0])
431 bundle = sorted(bundle, key=lambda x: x[0]) 508
509 for dp_id, result in id_and_data:
510 store_sched_result(c, dp_id, result)
432 511
433 # insert the normal schedulability data
434 begin_sync(conn, c)
435 # save results
436 c.executemany('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)',
437 [(dp_id, d.dp.sys_util, d.sched_stats.avg_sched, d.sched_stats.ntested, d.sched_stats.nsched, d.sched_stats.avg_tard, d.sched_stats.avg_bandwidth)
438 for dp_id, d in bundle])
439 # insert the effective utilization-based schedulability data
440 for dp_id, d in bundle:
441 if d.eff_sched_stats is not None:
442 __store_eff_sched_results(c, dp_id, d.eff_sched_stats)
443 end_sync(conn)
444 except db.OperationalError, e: 512 except db.OperationalError, e:
445 print e 513 print '(store_sched_results) Caught database exception.'
446 print 'Last Query: %s' % c._last_executed
447 rollback(conn)
448 c.close() 514 c.close()
449 # TODO: mark unsaved design points as not taken in dp_pending
450 # rethrow
451 raise e 515 raise e
516
452 except BaseException, e: 517 except BaseException, e:
453 print 'Caught unknown exception. Rolling back transaction before exiting.' 518 print '(store_sched_results) Caught unknown exception.'
454 rollback(conn) 519 c.close()
455 raise e 520 raise e
521
456 c.close() 522 c.close()
457 if ndp > 0: 523 if ndp > 0:
458 fetched = __get_design_points(conn, ndp) 524 fetched = __get_design_points(conn, ndp)