From 25ccdb0cbc6b959b1f96c89b8bce91963cb67b4c Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Mon, 22 Apr 2013 15:32:12 -0400 Subject: Improved robustness of run_exps.py execution. Thanks to bcw and gelliott for debugging and ideas. * Print out experiment number and total experiments when starting experiments. * Only sleep and re-release tasks if tasks are waiting to release. * Fail experiment with verbose messages if any tasks fail before becoming ready to release. * When waiting for tasks to become ready for release, reset the waiting time whenever a new task (or task(s)) become ready. * Start regular tracers BEFORE the plugin switch to log data from the switch. * Check the number of running tasks AFTER trying to switch the linux scheduler. This gives plugin deactivate code the opportunity to kill these tasks. * If an invalid executable is specified in the schedule file, fail before attempting to run the experiment and print out the problem. * Propogate exceptions up from experiment failures instead of creating ExperimentFailed exceptions. This commit also made clock-frequency automatically ignored by parse_exps.py. The value of this would change by +- a Mhz between experiments, ruining graphs. --- config/config.py | 2 +- parse_exps.py | 4 +- run/executable/executable.py | 3 + run/experiment.py | 185 ++++++++++++++++++++++++++----------------- run/litmus_util.py | 13 ++- run_exps.py | 170 +++++++++++++++++++++------------------ 6 files changed, 216 insertions(+), 161 deletions(-) diff --git a/config/config.py b/config/config.py index 1ac468b..b631aa2 100644 --- a/config/config.py +++ b/config/config.py @@ -9,7 +9,7 @@ BINS = {'rtspin' : get_executable_hint('rtspin', 'liblitmus'), 'ftsplit' : get_executable_hint('ft2csv', 'feather-trace-tools'), 'ftsort' : get_executable_hint('ftsort', 'feather-trace-tools'), 'st_trace' : get_executable_hint('st_trace', 'feather-trace-tools'), - # Option, as not everyone uses kernelshark yet + # Optional, as not everyone uses kernelshark yet 'trace-cmd' : get_executable_hint('trace-cmd', 'rt-kernelshark', True), # Optional, as sched_trace is not a publically supported repository 'st_show' : get_executable_hint('st_show', 'sched_trace', True)} diff --git a/parse_exps.py b/parse_exps.py index d07378c..c2cbedb 100755 --- a/parse_exps.py +++ b/parse_exps.py @@ -140,8 +140,8 @@ def main(): if opts.ignore: for param in opts.ignore.split(","): builder.try_remove(param) - # Always average multiple trials - builder.try_remove(PARAMS['trial']) + builder.try_remove(PARAMS['trial']) # Always average multiple trials + builder.try_remove(PARAMS['cycles']) # Only need for feather-trace parsing col_map = builder.build() result_table = TupleTable(col_map) diff --git a/run/executable/executable.py b/run/executable/executable.py index e6f2003..a2426f1 100644 --- a/run/executable/executable.py +++ b/run/executable/executable.py @@ -59,6 +59,9 @@ class Executable(object): def interrupt(self): self.sp.send_signal(signal.SIGINT) + def poll(self): + return self.sp.poll() + def terminate(self): '''Send the terminate signal to the binary.''' self.sp.terminate() diff --git a/run/experiment.py b/run/experiment.py index ff0e9f3..b0e46b6 100644 --- a/run/experiment.py +++ b/run/experiment.py @@ -1,9 +1,7 @@ -import common as com import os import time import run.litmus_util as lu import shutil as sh -import traceback from operator import methodcaller class ExperimentException(Exception): @@ -11,16 +9,11 @@ class ExperimentException(Exception): def __init__(self, name): self.name = name - class ExperimentDone(ExperimentException): '''Raised when an experiment looks like it's been run already.''' def __str__(self): return "Experiment finished already: %d" % self.name -class ExperimentFailed(ExperimentException): - def __str__(self): - return "Experiment failed during execution: %d" % self.name - class SystemCorrupted(Exception): pass @@ -31,7 +24,6 @@ class Experiment(object): def __init__(self, name, scheduler, working_dir, finished_dir, proc_entries, executables, tracer_types): '''Run an experiment, optionally wrapped in tracing.''' - self.name = name self.scheduler = scheduler self.working_dir = working_dir @@ -40,16 +32,10 @@ class Experiment(object): self.executables = executables self.exec_out = None self.exec_err = None + self.tracer_types = tracer_types - self.task_batch = com.num_cpus() - - self.__make_dirs() - self.__assign_executable_cwds() - self.__setup_tracers(tracer_types) - - - def __setup_tracers(self, tracer_types): - tracers = [ t(self.working_dir) for t in tracer_types ] + def __setup_tracers(self): + tracers = [ t(self.working_dir) for t in self.tracer_types ] self.regular_tracers = [t for t in tracers if not t.is_exact()] self.exact_tracers = [t for t in tracers if t.is_exact()] @@ -84,13 +70,11 @@ class Experiment(object): map(assign_cwd, self.executables) def __kill_all(self): - # Give time for whatever failed to finish failing - time.sleep(2) - - released = lu.release_tasks() - self.log("Re-released %d tasks" % released) + if lu.waiting_tasks(): + released = lu.release_tasks() + self.log("Re-released %d tasks" % released) - time.sleep(5) + time.sleep(1) self.log("Killing all tasks") for e in self.executables: @@ -99,7 +83,62 @@ class Experiment(object): except: pass - time.sleep(2) + time.sleep(1) + + def __strip_path(self, path): + '''Shorten path to something more readable.''' + file_dir = os.path.split(self.working_dir)[0] + if path.index(file_dir) == 0: + path = path[len(file_dir):] + + return path.strip("/") + + def __check_tasks_status(self): + '''Raises an exception if any tasks have failed.''' + msgs = [] + + for e in self.executables: + status = e.poll() + if status != None and status: + err_msg = "Task %s failed with status: %s" % (e.wait(), status) + msgs += [err_msg] + + if msgs: + # Show at most 3 messages so that every task failing doesn't blow + # up the terminal + if len(msgs) > 3: + num_errs = len(msgs) - 3 + msgs = msgs[0:4] + ["...%d more task errors..." % num_errs] + + out_name = self.__strip_path(self.exec_out.name) + err_name = self.__strip_path(self.exec_err.name) + help = "Check dmesg, %s, and %s" % (out_name, err_name) + + raise Exception("\n".join(msgs + [help])) + + def __wait_for_ready(self): + self.log("Sleeping until tasks are ready for release...") + + wait_start = time.time() + num_ready = lu.waiting_tasks() + + while num_ready < len(self.executables): + # Quit if too much time passes without a task becoming ready + if time.time() - wait_start > 180.0: + s = "waiting: %d, submitted: %d" %\ + (lu.waiting_tasks(), len(self.executables)) + raise Exception("Too much time spent waiting for tasks! %s" % s) + + time.sleep(1) + + # Quit if any tasks fail + self.__check_tasks_status() + + # Reset the waiting time whenever more tasks become ready + now_ready = lu.waiting_tasks() + if now_ready != num_ready: + wait_start = time.time() + num_ready = lu.now_ready def __run_tasks(self): self.log("Starting %d tasks" % len(self.executables)) @@ -108,16 +147,9 @@ class Experiment(object): try: e.execute() except: - raise Exception("Executable failed: %s" % e) + raise Exception("Executable failed to start: %s" % e) - self.log("Sleeping until tasks are ready for release...") - start = time.time() - while lu.waiting_tasks() < len(self.executables): - if time.time() - start > 30.0: - s = "waiting: %d, submitted: %d" %\ - (lu.waiting_tasks(), len(self.executables)) - raise Exception("Too much time spent waiting for tasks! %s" % s) - time.sleep(1) + self.__wait_for_ready() # Exact tracers (like overheads) must be started right after release or # measurements will be full of irrelevant records @@ -148,60 +180,40 @@ class Experiment(object): def __save_results(self): os.rename(self.working_dir, self.finished_dir) - def log(self, msg): - print("[Exp %s]: %s" % (self.name, msg)) - - def run_exp(self): - self.__check_system() - - succ = False - try: - self.__setup() + def __to_linux(self): + msgs = [] + sched = lu.scheduler() + if sched != "Linux": try: - self.__run_tasks() - self.log("Saving results in %s" % self.finished_dir) - succ = True + lu.switch_scheduler("Linux") except: - traceback.print_exc() - self.__kill_all() - raise ExperimentFailed(self.name) - finally: - self.__teardown() - finally: - self.log("Switching to Linux scheduler") - - # Give the tasks 10 seconds to finish before bailing - start = time.time() - while lu.all_tasks() > 0: - if time.time() - start < 10.0: - raise SystemCorrupted("%d tasks still running!" % - lu.all_tasks()) - - lu.switch_scheduler("Linux") - - if succ: - self.__save_results() - self.log("Experiment done!") + msgs += ["Scheduler is %s, cannot switch to Linux!" % sched] - def __check_system(self): running = lu.all_tasks() if running: - raise SystemCorrupted("%d tasks already running!" % running) + msgs += ["%d real-time tasks still running!" % running] - sched = lu.scheduler() - if sched != "Linux": - raise SystemCorrupted("Scheduler is %s, should be Linux" % sched) + if msgs: + raise SystemCorrupted("\n".join(msgs)) def __setup(self): + self.__make_dirs() + self.__assign_executable_cwds() + self.__setup_tracers() + self.log("Writing %d proc entries" % len(self.proc_entries)) map(methodcaller('write_proc'), self.proc_entries) + self.log("Starting %d regular tracers" % len(self.regular_tracers)) + map(methodcaller('start_tracing'), self.regular_tracers) + + time.sleep(1) + self.log("Switching to %s" % self.scheduler) lu.switch_scheduler(self.scheduler) - self.log("Starting %d regular tracers" % len(self.regular_tracers)) - map(methodcaller('start_tracing'), self.regular_tracers) + time.sleep(1) self.exec_out = open('%s/exec-out.txt' % self.working_dir, 'w') self.exec_err = open('%s/exec-err.txt' % self.working_dir, 'w') @@ -217,3 +229,32 @@ class Experiment(object): self.log("Stopping regular tracers") map(methodcaller('stop_tracing'), self.regular_tracers) + def log(self, msg): + print("[Exp %s]: %s" % (self.name, msg)) + + def run_exp(self): + self.__to_linux() + + succ = False + try: + self.__setup() + + try: + self.__run_tasks() + self.log("Saving results in %s" % self.finished_dir) + succ = True + except Exception as e: + # Give time for whatever failed to finish failing + time.sleep(2) + self.__kill_all() + + raise e + finally: + self.__teardown() + finally: + self.log("Switching back to Linux scheduler") + self.__to_linux() + + if succ: + self.__save_results() + self.log("Experiment done!") diff --git a/run/litmus_util.py b/run/litmus_util.py index 70da262..81f690b 100644 --- a/run/litmus_util.py +++ b/run/litmus_util.py @@ -20,7 +20,7 @@ def switch_scheduler(switch_to_in): with open('/proc/litmus/active_plugin', 'w') as active_plugin: subprocess.Popen(["echo", switch_to], stdout=active_plugin) - # it takes a bit to do the switch, sleep an arbitrary amount of time + # It takes a bit to do the switch, sleep an arbitrary amount of time time.sleep(2) cur_plugin = scheduler() @@ -29,24 +29,21 @@ def switch_scheduler(switch_to_in): (switch_to, cur_plugin)) def waiting_tasks(): - reg = re.compile(r'^ready.*?(?P\d+)$', re.M) + reg = re.compile(r'^ready.*?(?P\d+)$', re.M) with open('/proc/litmus/stats', 'r') as f: data = f.read() # Ignore if no tasks are waiting for release - match = re.search(reg, data) - ready = match.group("READY") + waiting = re.search(reg, data).group("WAITING") - return 0 if not ready else int(ready) + return 0 if not waiting else int(waiting) def all_tasks(): reg = re.compile(r'^real-time.*?(?P\d+)$', re.M) with open('/proc/litmus/stats', 'r') as f: data = f.read() - # Ignore if no tasks are waiting for release - match = re.search(reg, data) - ready = match.group("TASKS") + ready = re.search(reg, data).group("TASKS") return 0 if not ready else int(ready) diff --git a/run_exps.py b/run_exps.py index d7a06b5..a15018d 100755 --- a/run_exps.py +++ b/run_exps.py @@ -12,12 +12,13 @@ from config.config import PARAMS,DEFAULTS from collections import namedtuple from optparse import OptionParser from run.executable.executable import Executable -from run.experiment import Experiment,ExperimentDone,ExperimentFailed,SystemCorrupted +from run.experiment import Experiment,ExperimentDone,SystemCorrupted from run.proc_entry import ProcEntry '''Customizable experiment parameters''' ExpParams = namedtuple('ExpParams', ['scheduler', 'duration', 'tracers', - 'kernel', 'config_options']) + 'kernel', 'config_options', 'file_params', + 'pre_script', 'post_script']) '''Comparison of requested versus actual kernel compile parameter value''' ConfigResult = namedtuple('ConfigResult', ['param', 'wanted', 'actual']) @@ -113,8 +114,8 @@ def fix_paths(schedule, exp_dir, sched_file): args = args.replace(arg, abspath) break elif re.match(r'.*\w+\.[a-zA-Z]\w*', arg): - print("WARNING: non-existent file '%s' may be referenced:\n\t%s" - % (arg, sched_file)) + sys.stderr.write("WARNING: non-existent file '%s' " % arg + + "may be referenced:\n\t%s" % sched_file) schedule['task'][idx] = (task, args) @@ -182,22 +183,21 @@ def verify_environment(exp_params): raise InvalidConfig(results) -def run_parameter(exp_dir, out_dir, params, param_name): - '''Run an executable (arguments optional) specified as a configurable - @param_name in @params.''' - if PARAMS[param_name] not in params: +def run_script(script_params, exp, exp_dir, out_dir): + '''Run an executable (arguments optional)''' + if not script_params: return - script_params = params[PARAMS[param_name]] - # Split into arguments and program name if type(script_params) != type([]): script_params = [script_params] - script_name = script_params.pop(0) + exp.log("Running %s" % script_params.join(" ")) + + script_name = script_params.pop(0) script = com.get_executable(script_name, cwd=exp_dir) - out = open('%s/%s-out.txt' % (out_dir, param_name), 'w') + out = open('%s/%s-out.txt' % (out_dir, script_name), 'w') prog = Executable(script, script_params, cwd=out_dir, stderr_file=out, stdout_file=out) @@ -207,28 +207,41 @@ def run_parameter(exp_dir, out_dir, params, param_name): out.close() -def get_exp_params(cmd_scheduler, cmd_duration, file_params): +def make_exp_params(cmd_scheduler, cmd_duration, sched_dir, param_file): '''Return ExpParam with configured values of all hardcoded params.''' kernel = copts = "" - scheduler = cmd_scheduler or file_params[PARAMS['sched']] - duration = cmd_duration or file_params[PARAMS['dur']] or\ + # Load parameter file + param_file = param_file or "%s/%s" % (sched_dir, DEFAULTS['params_file']) + if os.path.isfile(param_file): + fparams = com.load_params(param_file) + else: + fparams = {} + + scheduler = cmd_scheduler or fparams[PARAMS['sched']] + duration = cmd_duration or fparams[PARAMS['dur']] or\ DEFAULTS['duration'] # Experiments can specify required kernel name - if PARAMS['kernel'] in file_params: - kernel = file_params[PARAMS['kernel']] + if PARAMS['kernel'] in fparams: + kernel = fparams[PARAMS['kernel']] # Or required config options - if PARAMS['copts'] in file_params: - copts = file_params[PARAMS['copts']] + if PARAMS['copts'] in fparams: + copts = fparams[PARAMS['copts']] # Or required tracers requested = [] - if PARAMS['trace'] in file_params: - requested = file_params[PARAMS['trace']] + if PARAMS['trace'] in fparams: + requested = fparams[PARAMS['trace']] tracers = trace.get_tracer_types(requested) + # Or scripts to run before and after experiments + def get_script(name): + return fparams[name] if name in fparams else None + pre_script = get_script('pre') + post_script = get_script('post') + # But only these two are mandatory if not scheduler: raise IOError("No scheduler found in param file!") @@ -236,48 +249,39 @@ def get_exp_params(cmd_scheduler, cmd_duration, file_params): raise IOError("No duration found in param file!") return ExpParams(scheduler=scheduler, kernel=kernel, duration=duration, - config_options=copts, tracers=tracers) - + config_options=copts, tracers=tracers, file_params=fparams, + pre_script=pre_script, post_script=post_script) -def load_experiment(sched_file, cmd_scheduler, cmd_duration, - param_file, out_dir, ignore, jabber): +def run_experiment(name, sched_file, exp_params, out_dir, + start_message, ignore, jabber): '''Load and parse data from files and run result.''' if not os.path.isfile(sched_file): raise IOError("Cannot find schedule file: %s" % sched_file) dir_name, fname = os.path.split(sched_file) - exp_name = os.path.split(dir_name)[1] + "/" + fname work_dir = "%s/tmp" % dir_name - # Load parameter file - param_file = param_file or \ - "%s/%s" % (dir_name, DEFAULTS['params_file']) - if os.path.isfile(param_file): - file_params = com.load_params(param_file) - else: - file_params = {} - - # Create input needed by Experiment - exp_params = get_exp_params(cmd_scheduler, cmd_duration, file_params) - procs, execs = load_schedule(exp_name, sched_file, exp_params.duration) + procs, execs = load_schedule(name, sched_file, exp_params.duration) - exp = Experiment(exp_name, exp_params.scheduler, work_dir, out_dir, + exp = Experiment(name, exp_params.scheduler, work_dir, out_dir, procs, execs, exp_params.tracers) + exp.log(start_message) + if not ignore: verify_environment(exp_params) - run_parameter(dir_name, work_dir, file_params, 'pre') + run_script(exp_params.pre_script, exp, dir_name, work_dir) exp.run_exp() - run_parameter(dir_name, out_dir, file_params, 'post') + run_script(exp_params.post_script, exp, dir_name, out_dir) if jabber: - jabber.send("Completed '%s'" % exp_name) + jabber.send("Completed '%s'" % name) # Save parameters used to run experiment in out_dir - out_params = dict(file_params.items() + + out_params = dict(exp_params.file_params.items() + [(PARAMS['sched'], exp_params.scheduler), (PARAMS['tasks'], len(execs)), (PARAMS['dur'], exp_params.duration)]) @@ -292,6 +296,7 @@ def load_experiment(sched_file, cmd_scheduler, cmd_duration, def get_exps(opts, args): + '''Return list of experiment files or directories''' if args: return args @@ -333,63 +338,72 @@ def setup_email(target): return None +def make_paths(exp, out_base_dir, opts): + '''Translate experiment name to (schedule file, output directory) paths''' + path = "%s/%s" % (os.getcwd(), exp) + out_dir = "%s/%s" % (out_base_dir, os.path.split(exp.strip('/'))[1]) + + if not os.path.exists(path): + raise IOError("Invalid experiment: %s" % path) + + if opts.force and os.path.exists(out_dir): + shutil.rmtree(out_dir) + + if os.path.isdir(path): + sched_file = "%s/%s" % (path, opts.sched_file) + else: + sched_file = path + + return sched_file, out_dir + + def main(): opts, args = parse_args() - scheduler = opts.scheduler - duration = opts.duration - param_file = opts.param_file - out_base = os.path.abspath(opts.out_dir) - exps = get_exps(opts, args) - created = False + jabber = setup_jabber(opts.jabber) if opts.jabber else None + email = setup_email(opts.email) if opts.email else None + + out_base = os.path.abspath(opts.out_dir) + created = False if not os.path.exists(out_base): created = True os.mkdir(out_base) - ran = 0 - done = 0 - succ = 0 - failed = 0 - invalid = 0 - - jabber = setup_jabber(opts.jabber) if opts.jabber else None - email = setup_email(opts.email) if opts.email else None - - for exp in exps: - path = "%s/%s" % (os.getcwd(), exp) - out_dir = "%s/%s" % (out_base, os.path.split(exp.strip('/'))[1]) + ran = done = succ = failed = invalid = 0 - if not os.path.exists(path): - raise IOError("Invalid experiment: %s" % path) + for i, exp in enumerate(exps): + sched_file, out_dir = make_paths(exp, out_base, opts) + sched_dir = os.path.split(sched_file)[0] - if opts.force and os.path.exists(out_dir): - shutil.rmtree(out_dir) + try: + start_message = "Loading experiment %d of %d." % (i+1, len(exps)) + exp_params = make_exp_params(opts.scheduler, opts.duration, + sched_dir, opts.param_file) - if os.path.isdir(exp): - path = "%s/%s" % (path, opts.sched_file) + run_experiment(exp, sched_file, exp_params, out_dir, + start_message, opts.ignore, jabber) - try: - load_experiment(path, scheduler, duration, param_file, - out_dir, opts.ignore, jabber) succ += 1 except ExperimentDone: + sys.stderr.write("Experiment '%s' already completed " % exp + + "at '%s'\n" % out_base) done += 1 - print("Experiment '%s' already completed at '%s'" % (exp, out_base)) except (InvalidKernel, InvalidConfig) as e: + sys.stderr.write("Invalid environment for experiment '%s'\n" % exp) + sys.stderr.write("%s\n" % e) invalid += 1 - print("Invalid environment for experiment '%s'" % exp) - print(e) except KeyboardInterrupt: - print("Keyboard interrupt, quitting") + sys.stderr.write("Keyboard interrupt, quitting\n") break except SystemCorrupted as e: - print("System is corrupted! Fix state before continuing.") - print(e) + sys.stderr.write("System is corrupted! Fix state before continuing.\n") + sys.stderr.write("%s\n" % e) break - except ExperimentFailed: - print("Failed experiment %s" % exp) + except Exception as e: + sys.stderr.write("Failed experiment %s\n" % exp) + sys.stderr.write("%s\n" % e) failed += 1 ran += 1 @@ -398,7 +412,7 @@ def main(): if not os.listdir(out_base) and created and not succ: os.rmdir(out_base) - message = "Experiments ran:\t%d of %d" % (ran, len(args)) +\ + message = "Experiments ran:\t%d of %d" % (ran, len(exps)) +\ "\n Successful:\t\t%d" % succ +\ "\n Failed:\t\t%d" % failed +\ "\n Already Done:\t\t%d" % done +\ -- cgit v1.2.2