From 09bc409657606a37346d82ab1e4c44a165bd3541 Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Fri, 12 Apr 2013 11:30:27 -0400 Subject: Improved error handling in run_exps.py. --- run/experiment.py | 128 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 74 insertions(+), 54 deletions(-) (limited to 'run/experiment.py') diff --git a/run/experiment.py b/run/experiment.py index e5811f8..ff0e9f3 100644 --- a/run/experiment.py +++ b/run/experiment.py @@ -1,7 +1,9 @@ +import common as com import os import time import run.litmus_util as lu import shutil as sh +import traceback from operator import methodcaller class ExperimentException(Exception): @@ -15,17 +17,12 @@ class ExperimentDone(ExperimentException): def __str__(self): return "Experiment finished already: %d" % self.name - -class ExperimentInterrupted(ExperimentException): - '''Raised when an experiment appears to be interrupted (partial results).''' - def __str__(self): - return "Experiment was interrupted in progress: %d" % self.name - - class ExperimentFailed(ExperimentException): def __str__(self): return "Experiment failed during execution: %d" % self.name +class SystemCorrupted(Exception): + pass class Experiment(object): '''Execute one task-set and save the results. Experiments have unique IDs.''' @@ -44,6 +41,8 @@ class Experiment(object): self.exec_out = None self.exec_err = None + self.task_batch = com.num_cpus() + self.__make_dirs() self.__assign_executable_cwds() self.__setup_tracers(tracer_types) @@ -84,65 +83,67 @@ class Experiment(object): executable.cwd = self.working_dir map(assign_cwd, self.executables) - def __run_tasks(self): - already_waiting = lu.waiting_tasks() + def __kill_all(self): + # Give time for whatever failed to finish failing + time.sleep(2) + + released = lu.release_tasks() + self.log("Re-released %d tasks" % released) - if already_waiting: - self.log("Already %d tasks waiting for release!") - self.log("Experiment will fail if any of these tasks are released.") + time.sleep(5) - self.log("Starting the programs") + self.log("Killing all tasks") for e in self.executables: + try: + e.kill() + except: + pass + + time.sleep(2) + + def __run_tasks(self): + self.log("Starting %d tasks" % len(self.executables)) + + for i,e in enumerate(self.executables): try: e.execute() except: raise Exception("Executable failed: %s" % e) self.log("Sleeping until tasks are ready for release...") - start = time.clock() - while (lu.waiting_tasks() - already_waiting) < len(self.executables): - if time.clock() - start > 30.0: - raise Exception("Too much time has passed waiting for tasks!") + start = time.time() + while lu.waiting_tasks() < len(self.executables): + if time.time() - start > 30.0: + s = "waiting: %d, submitted: %d" %\ + (lu.waiting_tasks(), len(self.executables)) + raise Exception("Too much time spent waiting for tasks! %s" % s) time.sleep(1) # Exact tracers (like overheads) must be started right after release or # measurements will be full of irrelevant records self.log("Starting %d released tracers" % len(self.exact_tracers)) map(methodcaller('start_tracing'), self.exact_tracers) + time.sleep(1) - self.log("Releasing %d tasks" % len(self.executables)) - released = lu.release_tasks() - - ret = True - if released != len(self.executables): - # Some tasks failed to release, kill all tasks and fail - # Need to re-release non-released tasks before we can kill them though - self.log("Failed to release {} tasks! Re-releasing and killing".format( - len(self.executables) - released, len(self.executables))) - time.sleep(5) - + try: + self.log("Releasing %d tasks" % len(self.executables)) released = lu.release_tasks() - self.log("Re-released %d tasks" % released) - - time.sleep(5) - - self.log("Killing all tasks") - map(methodcaller('kill'), self.executables) + if released != len(self.executables): + # Some tasks failed to release, kill all tasks and fail + # Need to release non-released tasks before they can be killed + raise Exception("Released %s tasks, expected %s tasks" % + (released, len(self.executables))) - ret = False + self.log("Waiting for program to finish...") + for e in self.executables: + if not e.wait(): + raise Exception("Executable %s failed to complete!" % e) - self.log("Waiting for program to finish...") - for e in self.executables: - if not e.wait(): - ret = False - - # And these must be stopped here for the same reason - self.log("Stopping exact tracers") - map(methodcaller('stop_tracing'), self.exact_tracers) - - if not ret: - raise ExperimentFailed(self.name) + finally: + # And these must be stopped here for the same reason + self.log("Stopping exact tracers") + map(methodcaller('stop_tracing'), self.exact_tracers) def __save_results(self): os.rename(self.working_dir, self.finished_dir) @@ -151,29 +152,48 @@ class Experiment(object): print("[Exp %s]: %s" % (self.name, msg)) def run_exp(self): + self.__check_system() + succ = False try: - self.setup() + self.__setup() try: self.__run_tasks() self.log("Saving results in %s" % self.finished_dir) succ = True + except: + traceback.print_exc() + self.__kill_all() + raise ExperimentFailed(self.name) finally: - self.teardown() + self.__teardown() finally: self.log("Switching to Linux scheduler") - try: - lu.switch_scheduler("Linux") - except: - self.log("Failed to switch back to Linux.") + + # Give the tasks 10 seconds to finish before bailing + start = time.time() + while lu.all_tasks() > 0: + if time.time() - start < 10.0: + raise SystemCorrupted("%d tasks still running!" % + lu.all_tasks()) + + lu.switch_scheduler("Linux") if succ: self.__save_results() self.log("Experiment done!") + def __check_system(self): + running = lu.all_tasks() + if running: + raise SystemCorrupted("%d tasks already running!" % running) + + sched = lu.scheduler() + if sched != "Linux": + raise SystemCorrupted("Scheduler is %s, should be Linux" % sched) - def setup(self): + def __setup(self): self.log("Writing %d proc entries" % len(self.proc_entries)) map(methodcaller('write_proc'), self.proc_entries) @@ -190,7 +210,7 @@ class Experiment(object): executable.stderr_file = self.exec_err map(set_out, self.executables) - def teardown(self): + def __teardown(self): self.exec_out and self.exec_out.close() self.exec_err and self.exec_err.close() -- cgit v1.2.2