From 48e1c8de6f8fca6770b669f8dae1ddc52917bead Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Mon, 17 Sep 2012 11:28:55 -0400 Subject: Handle interrupted experiments (by renaming) and failed experiments (cleanup). --- experiment/experiment.py | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) (limited to 'experiment/experiment.py') diff --git a/experiment/experiment.py b/experiment/experiment.py index 29e6bd7..b21397d 100644 --- a/experiment/experiment.py +++ b/experiment/experiment.py @@ -9,26 +9,27 @@ class ExperimentException(Exception): def __init__(self, name): self.name = name - + class ExperimentDone(ExperimentException): """Raised when an experiment looks like it's been run already.""" def __str__(self): return "Experiment finished already: %d" % self.name - + class ExperimentInterrupted(ExperimentException): """Raised when an experiment appears to be interrupted (partial results).""" def __str__(self): return "Experiment was interrupted in progress: %d" % self.name - + class ExperimentFailed(ExperimentException): def __str__(self): return "Experiment failed during execution: %d" % self.name - + class Experiment(object): """Execute one task-set and save the results. Experiments have unique IDs.""" + INTERRUPTED_DIR = ".interrupted" def __init__(self, name, scheduler, working_dir, finished_dir, proc_entries, executables): """Run an experiment, optionally wrapped in tracing.""" @@ -45,28 +46,44 @@ class Experiment(object): self.tracers = [] if SchedTracer.enabled(): + self.log("Enabling sched_trace") self.tracers.append( SchedTracer(working_dir) ) if LinuxTracer.enabled(): + self.log("Enabling trace-cmd / ftrace / kernelshark") self.tracers.append( LinuxTracer(working_dir) ) if LogTracer.enabled(): + self.log("Enabling logging") self.tracers.append( LogTracer(working_dir) ) if PerfTracer.enabled(): + self.log("Tracking CPU performance counters") self.tracers.append( PerfTracer(working_dir) ) # Overhead trace must be handled seperately, see __run_tasks if OverheadTracer.enabled(): + self.log("Enabling overhead tracing") self.overhead_trace = OverheadTracer(working_dir) else: self.overhead_trace = None def __make_dirs(self): + interrupted = None + if os.path.exists(self.finished_dir): raise ExperimentDone(self.name) + if os.path.exists(self.working_dir): - raise ExperimentInterrupted(self.name) - + self.log("Found interrupted experiment, saving in %s" % + Experiment.INTERRUPTED_DIR) + interrupted = "%s/%s" % (os.path.split(self.working_dir)[0], + Experiment.INTERRUPTED_DIR) + os.rename(self.working_dir, interrupted) + os.mkdir(self.working_dir) + if interrupted: + os.rename(interrupted, "%s/%s" % (self.working_dir, + os.path.split(interrupted)[1])) + def __assign_executable_cwds(self): def assign_cwd(executable): executable.cwd = self.working_dir @@ -90,13 +107,16 @@ class Experiment(object): self.log("Starting overhead trace") self.overhead_trace.start_tracing() + self.log("Releasing %d tasks" % len(self.executables)) released = litmus_util.release_tasks() ret = True if released != len(self.executables): + # Some tasks failed to release, kill all tasks and fail + # Need to re-release non-released tasks before we can kill them though self.log("Failed to release %d tasks! Re-releasing and killing".format( len(self.experiments) - released)) - + time.sleep(10) litmus_util.release_tasks() @@ -124,8 +144,10 @@ class Experiment(object): def run_exp(self): self.setup() - self.__run_tasks() - self.teardown() + try: + self.__run_tasks() + finally: + self.teardown() def setup(self): self.log("Switching to %s" % self.scheduler) @@ -133,7 +155,7 @@ class Experiment(object): self.log("Writing %d proc entries" % len(self.proc_entries)) map(methodcaller('write_proc'), self.proc_entries) - + self.log("Starting %d tracers" % len(self.tracers)) map(methodcaller('start_tracing'), self.tracers) -- cgit v1.2.2