diff options
author | Glenn Elliott <gelliott@cs.unc.edu> | 2014-10-02 11:52:29 -0400 |
---|---|---|
committer | Glenn Elliott <gelliott@cs.unc.edu> | 2014-10-02 11:52:29 -0400 |
commit | a9e0c2a30411e282401082338e72baf4ddb81875 (patch) | |
tree | 40045cac11eb9f30121b5e8c847e934042a841ee | |
parent | d0b039c266dca961537a97136518c4bc913ef1f9 (diff) |
retry on transaction deadlock
-rwxr-xr-x | rtss14/database.py | 230 |
1 files changed, 148 insertions, 82 deletions
diff --git a/rtss14/database.py b/rtss14/database.py index 1939056..c2597f6 100755 --- a/rtss14/database.py +++ b/rtss14/database.py | |||
@@ -172,6 +172,7 @@ def dp_to_db_vals(dp): | |||
172 | vals[key] = to_db_val(value) | 172 | vals[key] = to_db_val(value) |
173 | return vals | 173 | return vals |
174 | 174 | ||
175 | |||
175 | def begin_sync(conn, c): | 176 | def begin_sync(conn, c): |
176 | global timeout | 177 | global timeout |
177 | start = time.time() | 178 | start = time.time() |
@@ -216,6 +217,7 @@ def connect_db(db_name): | |||
216 | init_dp_col_names(conn) | 217 | init_dp_col_names(conn) |
217 | return conn | 218 | return conn |
218 | 219 | ||
220 | |||
219 | def clear_tables(db_name): | 221 | def clear_tables(db_name): |
220 | # tables to clear | 222 | # tables to clear |
221 | tables = ['sched_results', 'scaled_sched_results', 'dp_ptested', 'dp_pending'] | 223 | tables = ['sched_results', 'scaled_sched_results', 'dp_ptested', 'dp_pending'] |
@@ -230,43 +232,6 @@ def clear_tables(db_name): | |||
230 | ################ | 232 | ################ |
231 | c.close() | 233 | c.close() |
232 | 234 | ||
233 | def lookup_dp_id(c, dp): | ||
234 | global dp_col_names | ||
235 | global dp_col_type_strs | ||
236 | seeking = dp_to_db(dp) | ||
237 | seeking.pop('ts_util', None) | ||
238 | |||
239 | col_names = [n for n in dp_col_names if n != 'ts_util'] | ||
240 | |||
241 | # atomically add new dp to dp_ptested. fails if dp already in table | ||
242 | c.execute('INSERT INTO dp_ptested(id,%s) ' | ||
243 | 'SELECT NULL,%s ' | ||
244 | 'FROM dual ' | ||
245 | 'WHERE NOT EXISTS ( ' | ||
246 | 'SELECT 1 ' | ||
247 | 'FROM dp_ptested ' | ||
248 | 'WHERE %s )' % ( ','.join(col_names), | ||
249 | ','.join(['%s']*(len(col_names))), | ||
250 | ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())) ), | ||
251 | tuple([seeking[n] for n in col_names] + seeking.values()) ) | ||
252 | |||
253 | if c.rowcount != 0: | ||
254 | # new id | ||
255 | c.execute('SELECT LAST_INSERT_ID()') | ||
256 | row = c.fetchone() | ||
257 | assert row is not None | ||
258 | dp_id = row.values()[0] | ||
259 | else: | ||
260 | # id already exists. look it up. | ||
261 | c.execute('SELECT id ' | ||
262 | 'FROM dp_ptested ' | ||
263 | 'WHERE %s' % (' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())) ), | ||
264 | tuple(seeking.values()) ) | ||
265 | row = c.fetchone() | ||
266 | assert row is not None | ||
267 | dp_id = row['id'] | ||
268 | return dp_id | ||
269 | |||
270 | 235 | ||
271 | def __repair_design_points(conn): | 236 | def __repair_design_points(conn): |
272 | global dp_col_names | 237 | global dp_col_names |
@@ -288,6 +253,7 @@ def repair_design_points(db_name): | |||
288 | nrepaired = __repair_design_points(conn) | 253 | nrepaired = __repair_design_points(conn) |
289 | return nrepaired | 254 | return nrepaired |
290 | 255 | ||
256 | |||
291 | repaired = False | 257 | repaired = False |
292 | def store_design_points(db_name, dps, clean): | 258 | def store_design_points(db_name, dps, clean): |
293 | global dp_col_names | 259 | global dp_col_names |
@@ -335,6 +301,7 @@ def store_design_points(db_name, dps, clean): | |||
335 | c.close() | 301 | c.close() |
336 | return nstored | 302 | return nstored |
337 | 303 | ||
304 | |||
338 | def num_pending_design_points(db_name): | 305 | def num_pending_design_points(db_name): |
339 | conn = connect_db(db_name) | 306 | conn = connect_db(db_name) |
340 | with conn: | 307 | with conn: |
@@ -344,27 +311,49 @@ def num_pending_design_points(db_name): | |||
344 | c.close() | 311 | c.close() |
345 | return npending | 312 | return npending |
346 | 313 | ||
314 | |||
347 | def __get_design_points(conn, ndp): | 315 | def __get_design_points(conn, ndp): |
348 | global dp_col_names | 316 | global dp_col_names |
349 | fetched = [] | 317 | fetched = [] |
350 | c = conn.cursor(db.cursors.DictCursor) | 318 | c = conn.cursor(db.cursors.DictCursor) |
351 | #################### | 319 | #################### |
352 | try: | 320 | done = False |
353 | begin_sync(conn, c) | 321 | while not done: |
354 | c.execute('SELECT id,%s FROM dp_pending WHERE taken=0 LIMIT %%s FOR UPDATE' % ','.join(dp_col_names), (ndp,)) | 322 | |
355 | rows = c.fetchall() | 323 | try: |
356 | nfetched = c.rowcount | 324 | begin_sync(conn, c) |
357 | if nfetched > 0: | 325 | c.execute('SELECT id,%s FROM dp_pending WHERE taken=0 LIMIT %%s FOR UPDATE' % ','.join(dp_col_names), (ndp,)) |
358 | dp_ids = [r['id'] for r in rows] | 326 | rows = c.fetchall() |
359 | c.execute('UPDATE dp_pending SET taken=1 WHERE id IN (%s)' % ','.join(['%s']*len(dp_ids)), tuple(dp_ids)) | 327 | nfetched = c.rowcount |
360 | end_sync(conn) | 328 | if nfetched > 0: |
361 | fetched = [db_to_dp(r) for r in rows] | 329 | dp_ids = [r['id'] for r in rows] |
362 | except db.OperationalError, e: | 330 | c.execute('UPDATE dp_pending SET taken=1 WHERE id IN (%s)' % ','.join(['%s']*len(dp_ids)), tuple(dp_ids)) |
363 | print e | 331 | end_sync(conn) |
364 | print 'Last Query: %s' % c._last_executed | 332 | fetched = [db_to_dp(r) for r in rows] |
365 | rollback(conn) | 333 | done = True |
366 | c.close() | 334 | |
367 | raise e | 335 | except db.OperationalError, e: |
336 | errcode = e[0] | ||
337 | if errcode == 1213: | ||
338 | # deadlock - retry | ||
339 | print '(__get_design_points) Error is transaction deadlock. Will retry.' | ||
340 | print '(__get_design_points) Last query: %s' % c._last_executed | ||
341 | rollback(conn) | ||
342 | else: | ||
343 | print e | ||
344 | print '(__get_design_points) Last query: %s' % c._last_executed | ||
345 | rollback(conn) | ||
346 | c.close() | ||
347 | # rethrow | ||
348 | raise e | ||
349 | |||
350 | except BaseException, e: | ||
351 | print e | ||
352 | print '(__get_design_points) Last query: %s' % c._last_executed | ||
353 | rollback(conn) | ||
354 | c.close() | ||
355 | raise e | ||
356 | |||
368 | c.close() | 357 | c.close() |
369 | return fetched | 358 | return fetched |
370 | 359 | ||
@@ -375,6 +364,60 @@ def get_design_points(db_name, ndp = 1): | |||
375 | fetched = __get_design_points(conn, ndp) | 364 | fetched = __get_design_points(conn, ndp) |
376 | return fetched | 365 | return fetched |
377 | 366 | ||
367 | |||
368 | def lookup_dp_id(c, dp): | ||
369 | global dp_col_names | ||
370 | global dp_col_type_strs | ||
371 | seeking = dp_to_db(dp) | ||
372 | seeking.pop('ts_util', None) | ||
373 | |||
374 | col_names = [n for n in dp_col_names if n != 'ts_util'] | ||
375 | |||
376 | dp_id = None | ||
377 | while dp_id is None: | ||
378 | try: | ||
379 | # atomically add new dp to dp_ptested. fails if dp already in table | ||
380 | c.execute('INSERT INTO dp_ptested(id,%s) ' | ||
381 | 'SELECT NULL,%s ' | ||
382 | 'FROM dual ' | ||
383 | 'WHERE NOT EXISTS ( ' | ||
384 | 'SELECT 1 ' | ||
385 | 'FROM dp_ptested ' | ||
386 | 'WHERE %s )' % ( ','.join(col_names), | ||
387 | ','.join(['%s']*(len(col_names))), | ||
388 | ' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())) ), | ||
389 | tuple([seeking[n] for n in col_names] + seeking.values()) ) | ||
390 | |||
391 | if c.rowcount != 0: | ||
392 | # new id | ||
393 | c.execute('SELECT LAST_INSERT_ID()') | ||
394 | row = c.fetchone() | ||
395 | assert row is not None | ||
396 | dp_id = row.values()[0] | ||
397 | else: | ||
398 | # id already exists. look it up. | ||
399 | c.execute('SELECT id ' | ||
400 | 'FROM dp_ptested ' | ||
401 | 'WHERE %s' % (' AND '.join(map(lambda x: '%s=%s' % (x, dp_col_type_strs[x]), seeking.iterkeys())) ), | ||
402 | tuple(seeking.values()) ) | ||
403 | row = c.fetchone() | ||
404 | assert row is not None | ||
405 | dp_id = row['id'] | ||
406 | |||
407 | except db.OperationalError, e: | ||
408 | errcode = e[0] | ||
409 | if errcode == 1213: | ||
410 | # deadlock - retry | ||
411 | print '(lookup_dp_id) Error is transaction deadlock. Will retry.' | ||
412 | print '(lookup_dp_id) Last query: %s' % c._last_executed | ||
413 | else: | ||
414 | print e | ||
415 | print '(lookup_dp_id) Last query: %s' % c._last_executed | ||
416 | raise e | ||
417 | |||
418 | return dp_id | ||
419 | |||
420 | |||
378 | def __store_eff_sched_results(c, dp_id, stats): | 421 | def __store_eff_sched_results(c, dp_id, stats): |
379 | for factor,eff_curve in stats.iteritems(): | 422 | for factor,eff_curve in stats.iteritems(): |
380 | for eff_ts_util,sched in eff_curve.iteritems(): | 423 | for eff_ts_util,sched in eff_curve.iteritems(): |
@@ -406,53 +449,76 @@ def __store_eff_sched_results(c, dp_id, stats): | |||
406 | 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)', | 449 | 'VALUES(%s,%s,%s,%s,%s,%s,%s,%s)', |
407 | (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth)) | 450 | (dp_id, eff_ts_util, factor, sched.ntested, sched.nsched, sched.avg_sched, sched.avg_tard, sched.avg_bandwidth)) |
408 | 451 | ||
452 | def __store_sched_result(c, dp_id, ts_util, stats): | ||
453 | c.execute('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)', | ||
454 | (dp_id, ts_util, | ||
455 | stats.avg_sched, | ||
456 | stats.ntested, | ||
457 | stats.nsched, | ||
458 | stats.avg_tard, | ||
459 | stats.avg_bandwidth) ) | ||
460 | |||
461 | def store_sched_result(c, dp_id, result): | ||
462 | done = False | ||
463 | while not done: | ||
464 | try: | ||
465 | begin_sync(conn, c) | ||
466 | __store_sched_result(c, dp_id, result.dp.sys_util, result.sched_stats) | ||
467 | if result.eff_sched_stats is not None: | ||
468 | __store_eff_sched_results(c, dp_id, result.eff_sched_stats) | ||
469 | end_sync(conn) | ||
470 | done = True | ||
471 | |||
472 | except db.OperationalError, e: | ||
473 | errcode = e[0] | ||
474 | if errcode == 1213: | ||
475 | # deadlock - retry | ||
476 | print '(store_sched_result) Error is transaction deadlock. Will retry.' | ||
477 | print '(store_sched_result) Last query: %s' % c._last_executed | ||
478 | rollback(conn) | ||
479 | else: | ||
480 | print e | ||
481 | print '(store_sched_result) Last query: %s' % c._last_executed | ||
482 | rollback(conn) | ||
483 | raise e | ||
484 | |||
485 | except BaseException, e: | ||
486 | print e | ||
487 | print '(store_sched_result) Last query: %s' % c._last_executed | ||
488 | rollback(conn) | ||
489 | raise e | ||
490 | |||
409 | def store_sched_results(db_name, data, ndp = 0): | 491 | def store_sched_results(db_name, data, ndp = 0): |
410 | global dp_col_names | 492 | global dp_col_names |
411 | global dp_col_type_strs | ||
412 | col_names = None | ||
413 | fetched = [] | 493 | fetched = [] |
414 | 494 | ||
415 | conn = connect_db(db_name) | 495 | conn = connect_db(db_name) |
416 | |||
417 | if col_names is None: | ||
418 | col_names = [n for n in dp_col_names if n != 'id'] | ||
419 | |||
420 | with conn: | 496 | with conn: |
421 | # data is a list of storage(), with fields 'dp, 'sched_stats', and 'eff_sched_stats'. | 497 | # data is a list of storage(), with fields 'dp, 'sched_stats', and 'eff_sched_stats'. |
422 | # 'eff_sched_stats' is a dictionary keyed on scale-factor. stats in 'eff_sched_stats' | 498 | # 'eff_sched_stats' is a dictionary keyed on scale-factor. stats in 'eff_sched_stats' |
423 | # must be merged with prior results in the 'scaled_sched_results' table | 499 | # must be merged with prior results in the 'scaled_sched_results' table |
424 | c = conn.cursor(db.cursors.DictCursor) | 500 | c = conn.cursor(db.cursors.DictCursor) |
425 | try: | ||
426 | bundle = [(lookup_dp_id(c, d.dp), d) for d in data] | ||
427 | 501 | ||
502 | col_names = [n for n in dp_col_names if n != 'id'] | ||
503 | |||
504 | try: | ||
428 | # get IDs for all design points. | 505 | # get IDs for all design points. |
429 | # sort to ensure that we update scaled design points | 506 | # sort to reduce (eliminate?) chance of deadlock. |
430 | # in a consistent order (no deadlocks!). | 507 | id_and_data = sorted([(lookup_dp_id(c, d.dp), d) for d in data], key=lambda x: x[0]) |
431 | bundle = sorted(bundle, key=lambda x: x[0]) | 508 | |
509 | for dp_id, result in id_and_data: | ||
510 | store_sched_result(c, dp_id, result) | ||
432 | 511 | ||
433 | # insert the normal schedulability data | ||
434 | begin_sync(conn, c) | ||
435 | # save results | ||
436 | c.executemany('INSERT INTO sched_results VALUES(%s,%s,%s,%s,%s,%s,%s)', | ||
437 | [(dp_id, d.dp.sys_util, d.sched_stats.avg_sched, d.sched_stats.ntested, d.sched_stats.nsched, d.sched_stats.avg_tard, d.sched_stats.avg_bandwidth) | ||
438 | for dp_id, d in bundle]) | ||
439 | # insert the effective utilization-based schedulability data | ||
440 | for dp_id, d in bundle: | ||
441 | if d.eff_sched_stats is not None: | ||
442 | __store_eff_sched_results(c, dp_id, d.eff_sched_stats) | ||
443 | end_sync(conn) | ||
444 | except db.OperationalError, e: | 512 | except db.OperationalError, e: |
445 | print e | 513 | print '(store_sched_results) Caught database exception.' |
446 | print 'Last Query: %s' % c._last_executed | ||
447 | rollback(conn) | ||
448 | c.close() | 514 | c.close() |
449 | # TODO: mark unsaved design points as not taken in dp_pending | ||
450 | # rethrow | ||
451 | raise e | 515 | raise e |
516 | |||
452 | except BaseException, e: | 517 | except BaseException, e: |
453 | print 'Caught unknown exception. Rolling back transaction before exiting.' | 518 | print '(store_sched_results) Caught unknown exception.' |
454 | rollback(conn) | 519 | c.close() |
455 | raise e | 520 | raise e |
521 | |||
456 | c.close() | 522 | c.close() |
457 | if ndp > 0: | 523 | if ndp > 0: |
458 | fetched = __get_design_points(conn, ndp) | 524 | fetched = __get_design_points(conn, ndp) |