From 25ccdb0cbc6b959b1f96c89b8bce91963cb67b4c Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Mon, 22 Apr 2013 15:32:12 -0400 Subject: Improved robustness of run_exps.py execution. Thanks to bcw and gelliott for debugging and ideas. * Print out experiment number and total experiments when starting experiments. * Only sleep and re-release tasks if tasks are waiting to release. * Fail experiment with verbose messages if any tasks fail before becoming ready to release. * When waiting for tasks to become ready for release, reset the waiting time whenever a new task (or task(s)) become ready. * Start regular tracers BEFORE the plugin switch to log data from the switch. * Check the number of running tasks AFTER trying to switch the linux scheduler. This gives plugin deactivate code the opportunity to kill these tasks. * If an invalid executable is specified in the schedule file, fail before attempting to run the experiment and print out the problem. * Propogate exceptions up from experiment failures instead of creating ExperimentFailed exceptions. This commit also made clock-frequency automatically ignored by parse_exps.py. The value of this would change by +- a Mhz between experiments, ruining graphs. --- run/experiment.py | 185 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 113 insertions(+), 72 deletions(-) (limited to 'run/experiment.py') diff --git a/run/experiment.py b/run/experiment.py index ff0e9f3..b0e46b6 100644 --- a/run/experiment.py +++ b/run/experiment.py @@ -1,9 +1,7 @@ -import common as com import os import time import run.litmus_util as lu import shutil as sh -import traceback from operator import methodcaller class ExperimentException(Exception): @@ -11,16 +9,11 @@ class ExperimentException(Exception): def __init__(self, name): self.name = name - class ExperimentDone(ExperimentException): '''Raised when an experiment looks like it's been run already.''' def __str__(self): return "Experiment finished already: %d" % self.name -class ExperimentFailed(ExperimentException): - def __str__(self): - return "Experiment failed during execution: %d" % self.name - class SystemCorrupted(Exception): pass @@ -31,7 +24,6 @@ class Experiment(object): def __init__(self, name, scheduler, working_dir, finished_dir, proc_entries, executables, tracer_types): '''Run an experiment, optionally wrapped in tracing.''' - self.name = name self.scheduler = scheduler self.working_dir = working_dir @@ -40,16 +32,10 @@ class Experiment(object): self.executables = executables self.exec_out = None self.exec_err = None + self.tracer_types = tracer_types - self.task_batch = com.num_cpus() - - self.__make_dirs() - self.__assign_executable_cwds() - self.__setup_tracers(tracer_types) - - - def __setup_tracers(self, tracer_types): - tracers = [ t(self.working_dir) for t in tracer_types ] + def __setup_tracers(self): + tracers = [ t(self.working_dir) for t in self.tracer_types ] self.regular_tracers = [t for t in tracers if not t.is_exact()] self.exact_tracers = [t for t in tracers if t.is_exact()] @@ -84,13 +70,11 @@ class Experiment(object): map(assign_cwd, self.executables) def __kill_all(self): - # Give time for whatever failed to finish failing - time.sleep(2) - - released = lu.release_tasks() - self.log("Re-released %d tasks" % released) + if lu.waiting_tasks(): + released = lu.release_tasks() + self.log("Re-released %d tasks" % released) - time.sleep(5) + time.sleep(1) self.log("Killing all tasks") for e in self.executables: @@ -99,7 +83,62 @@ class Experiment(object): except: pass - time.sleep(2) + time.sleep(1) + + def __strip_path(self, path): + '''Shorten path to something more readable.''' + file_dir = os.path.split(self.working_dir)[0] + if path.index(file_dir) == 0: + path = path[len(file_dir):] + + return path.strip("/") + + def __check_tasks_status(self): + '''Raises an exception if any tasks have failed.''' + msgs = [] + + for e in self.executables: + status = e.poll() + if status != None and status: + err_msg = "Task %s failed with status: %s" % (e.wait(), status) + msgs += [err_msg] + + if msgs: + # Show at most 3 messages so that every task failing doesn't blow + # up the terminal + if len(msgs) > 3: + num_errs = len(msgs) - 3 + msgs = msgs[0:4] + ["...%d more task errors..." % num_errs] + + out_name = self.__strip_path(self.exec_out.name) + err_name = self.__strip_path(self.exec_err.name) + help = "Check dmesg, %s, and %s" % (out_name, err_name) + + raise Exception("\n".join(msgs + [help])) + + def __wait_for_ready(self): + self.log("Sleeping until tasks are ready for release...") + + wait_start = time.time() + num_ready = lu.waiting_tasks() + + while num_ready < len(self.executables): + # Quit if too much time passes without a task becoming ready + if time.time() - wait_start > 180.0: + s = "waiting: %d, submitted: %d" %\ + (lu.waiting_tasks(), len(self.executables)) + raise Exception("Too much time spent waiting for tasks! %s" % s) + + time.sleep(1) + + # Quit if any tasks fail + self.__check_tasks_status() + + # Reset the waiting time whenever more tasks become ready + now_ready = lu.waiting_tasks() + if now_ready != num_ready: + wait_start = time.time() + num_ready = lu.now_ready def __run_tasks(self): self.log("Starting %d tasks" % len(self.executables)) @@ -108,16 +147,9 @@ class Experiment(object): try: e.execute() except: - raise Exception("Executable failed: %s" % e) + raise Exception("Executable failed to start: %s" % e) - self.log("Sleeping until tasks are ready for release...") - start = time.time() - while lu.waiting_tasks() < len(self.executables): - if time.time() - start > 30.0: - s = "waiting: %d, submitted: %d" %\ - (lu.waiting_tasks(), len(self.executables)) - raise Exception("Too much time spent waiting for tasks! %s" % s) - time.sleep(1) + self.__wait_for_ready() # Exact tracers (like overheads) must be started right after release or # measurements will be full of irrelevant records @@ -148,60 +180,40 @@ class Experiment(object): def __save_results(self): os.rename(self.working_dir, self.finished_dir) - def log(self, msg): - print("[Exp %s]: %s" % (self.name, msg)) - - def run_exp(self): - self.__check_system() - - succ = False - try: - self.__setup() + def __to_linux(self): + msgs = [] + sched = lu.scheduler() + if sched != "Linux": try: - self.__run_tasks() - self.log("Saving results in %s" % self.finished_dir) - succ = True + lu.switch_scheduler("Linux") except: - traceback.print_exc() - self.__kill_all() - raise ExperimentFailed(self.name) - finally: - self.__teardown() - finally: - self.log("Switching to Linux scheduler") - - # Give the tasks 10 seconds to finish before bailing - start = time.time() - while lu.all_tasks() > 0: - if time.time() - start < 10.0: - raise SystemCorrupted("%d tasks still running!" % - lu.all_tasks()) - - lu.switch_scheduler("Linux") - - if succ: - self.__save_results() - self.log("Experiment done!") + msgs += ["Scheduler is %s, cannot switch to Linux!" % sched] - def __check_system(self): running = lu.all_tasks() if running: - raise SystemCorrupted("%d tasks already running!" % running) + msgs += ["%d real-time tasks still running!" % running] - sched = lu.scheduler() - if sched != "Linux": - raise SystemCorrupted("Scheduler is %s, should be Linux" % sched) + if msgs: + raise SystemCorrupted("\n".join(msgs)) def __setup(self): + self.__make_dirs() + self.__assign_executable_cwds() + self.__setup_tracers() + self.log("Writing %d proc entries" % len(self.proc_entries)) map(methodcaller('write_proc'), self.proc_entries) + self.log("Starting %d regular tracers" % len(self.regular_tracers)) + map(methodcaller('start_tracing'), self.regular_tracers) + + time.sleep(1) + self.log("Switching to %s" % self.scheduler) lu.switch_scheduler(self.scheduler) - self.log("Starting %d regular tracers" % len(self.regular_tracers)) - map(methodcaller('start_tracing'), self.regular_tracers) + time.sleep(1) self.exec_out = open('%s/exec-out.txt' % self.working_dir, 'w') self.exec_err = open('%s/exec-err.txt' % self.working_dir, 'w') @@ -217,3 +229,32 @@ class Experiment(object): self.log("Stopping regular tracers") map(methodcaller('stop_tracing'), self.regular_tracers) + def log(self, msg): + print("[Exp %s]: %s" % (self.name, msg)) + + def run_exp(self): + self.__to_linux() + + succ = False + try: + self.__setup() + + try: + self.__run_tasks() + self.log("Saving results in %s" % self.finished_dir) + succ = True + except Exception as e: + # Give time for whatever failed to finish failing + time.sleep(2) + self.__kill_all() + + raise e + finally: + self.__teardown() + finally: + self.log("Switching back to Linux scheduler") + self.__to_linux() + + if succ: + self.__save_results() + self.log("Experiment done!") -- cgit v1.2.2