From 7e32c3915e7ea27d2533d99a22fa53ef923198f5 Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Mon, 29 Apr 2013 16:50:23 -0400 Subject: Added run_exps.py option to --retry failed experiments. If the retry flag is specified, failed experiments will be re-run after all other experiments have run. They can be re-run at most 5 times. This commit required a refactoring of run_exps.py to clean up the main experiment running loop. --- gen/edf_generators.py | 1 + gen_exps.py | 15 +-- run/experiment.py | 50 ++++++---- run_exps.py | 246 +++++++++++++++++++++++++++++++------------------- 4 files changed, 194 insertions(+), 118 deletions(-) diff --git a/gen/edf_generators.py b/gen/edf_generators.py index a722c21..8e4b8df 100644 --- a/gen/edf_generators.py +++ b/gen/edf_generators.py @@ -28,6 +28,7 @@ class EdfGenerator(gen.Generator): pdist = self._create_dist('period', exp_params['periods'], gen.NAMED_PERIODS) + udist = self._create_dist('utilization', exp_params['utils'], gen.NAMED_UTILIZATIONS) diff --git a/gen_exps.py b/gen_exps.py index b847661..00ce27b 100755 --- a/gen_exps.py +++ b/gen_exps.py @@ -43,6 +43,14 @@ def load_file(fname): except: raise IOError("Invalid generation file: %s" % fname) +def print_descriptions(described): + for generator in described.split(','): + if generator not in gen.get_generators(): + sys.stderr.write("No generator '%s'\n" % generator) + else: + print("Generator '%s', " % generator) + gen.get_generators()[generator]().print_help() + def main(): opts, args = parse_args() @@ -50,12 +58,7 @@ def main(): if opts.list_gens: print(", ".join(gen.get_generators())) if opts.described != None: - for generator in opts.described.split(','): - if generator not in gen.get_generators(): - sys.stderr.write("No generator '%s'\n" % generator) - else: - print("Generator '%s', " % generator) - gen.get_generators()[generator]().print_help() + print_descriptions(opts.described) if opts.list_gens or opts.described: return 0 diff --git a/run/experiment.py b/run/experiment.py index b0e46b6..9a70414 100644 --- a/run/experiment.py +++ b/run/experiment.py @@ -2,6 +2,7 @@ import os import time import run.litmus_util as lu import shutil as sh + from operator import methodcaller class ExperimentException(Exception): @@ -69,21 +70,24 @@ class Experiment(object): executable.cwd = self.working_dir map(assign_cwd, self.executables) - def __kill_all(self): - if lu.waiting_tasks(): - released = lu.release_tasks() - self.log("Re-released %d tasks" % released) + def __try_kill_all(self): + try: + if lu.waiting_tasks(): + released = lu.release_tasks() + self.log("Re-released %d tasks" % released) - time.sleep(1) + time.sleep(1) - self.log("Killing all tasks") - for e in self.executables: - try: - e.kill() - except: - pass + self.log("Killing all tasks") + for e in self.executables: + try: + e.kill() + except: + pass - time.sleep(1) + time.sleep(1) + except: + self.log("Failed to kill all tasks.") def __strip_path(self, path): '''Shorten path to something more readable.''' @@ -138,7 +142,7 @@ class Experiment(object): now_ready = lu.waiting_tasks() if now_ready != num_ready: wait_start = time.time() - num_ready = lu.now_ready + num_ready = now_ready def __run_tasks(self): self.log("Starting %d tasks" % len(self.executables)) @@ -185,6 +189,7 @@ class Experiment(object): sched = lu.scheduler() if sched != "Linux": + self.log("Switching back to Linux scheduler") try: lu.switch_scheduler("Linux") except: @@ -229,6 +234,8 @@ class Experiment(object): self.log("Stopping regular tracers") map(methodcaller('stop_tracing'), self.regular_tracers) + os.system('sync') + def log(self, msg): print("[Exp %s]: %s" % (self.name, msg)) @@ -236,6 +243,7 @@ class Experiment(object): self.__to_linux() succ = False + exception = None try: self.__setup() @@ -244,16 +252,20 @@ class Experiment(object): self.log("Saving results in %s" % self.finished_dir) succ = True except Exception as e: + exception = e + # Give time for whatever failed to finish failing time.sleep(2) - self.__kill_all() - raise e - finally: - self.__teardown() + self.__try_kill_all() finally: - self.log("Switching back to Linux scheduler") - self.__to_linux() + try: + self.__teardown() + self.__to_linux() + except Exception as e: + exception = exception or e + finally: + if exception: raise exception if succ: self.__save_results() diff --git a/run_exps.py b/run_exps.py index a15018d..1d46b45 100755 --- a/run_exps.py +++ b/run_exps.py @@ -3,6 +3,7 @@ from __future__ import print_function import common as com import os +import pprint import re import shutil import sys @@ -11,17 +12,26 @@ import run.tracer as trace from config.config import PARAMS,DEFAULTS from collections import namedtuple from optparse import OptionParser +from parse.enum import Enum from run.executable.executable import Executable from run.experiment import Experiment,ExperimentDone,SystemCorrupted from run.proc_entry import ProcEntry +'''Maximum times an experiment will be retried''' +MAX_RETRY = 5 + '''Customizable experiment parameters''' ExpParams = namedtuple('ExpParams', ['scheduler', 'duration', 'tracers', 'kernel', 'config_options', 'file_params', 'pre_script', 'post_script']) +'''Tracked with each experiment''' +ExpState = Enum(['Failed', 'Succeeded', 'Invalid', 'Done', 'None']) +ExpData = com.recordtype('ExpData', ['name', 'params', 'sched_file', 'out_dir', + 'retries', 'state']) '''Comparison of requested versus actual kernel compile parameter value''' ConfigResult = namedtuple('ConfigResult', ['param', 'wanted', 'actual']) + class InvalidKernel(Exception): def __init__(self, kernel): self.kernel = kernel @@ -72,6 +82,9 @@ def parse_args(): parser.add_option('-e', '--email', metavar='username@server', dest='email', default=None, help='send an email when all experiments complete') + parser.add_option('-r', '--retry', dest='retry', + action='store_true', default=False, + help='retry failed experiments') return parser.parse_args() @@ -252,65 +265,118 @@ def make_exp_params(cmd_scheduler, cmd_duration, sched_dir, param_file): config_options=copts, tracers=tracers, file_params=fparams, pre_script=pre_script, post_script=post_script) -def run_experiment(name, sched_file, exp_params, out_dir, - start_message, ignore, jabber): +def run_experiment(data, start_message, ignore, jabber): '''Load and parse data from files and run result.''' - if not os.path.isfile(sched_file): - raise IOError("Cannot find schedule file: %s" % sched_file) + if not os.path.isfile(data.sched_file): + raise IOError("Cannot find schedule file: %s" % data.sched_file) - dir_name, fname = os.path.split(sched_file) + dir_name, fname = os.path.split(data.sched_file) work_dir = "%s/tmp" % dir_name - procs, execs = load_schedule(name, sched_file, exp_params.duration) + procs, execs = load_schedule(data.name, data.sched_file, data.params.duration) - exp = Experiment(name, exp_params.scheduler, work_dir, out_dir, - procs, execs, exp_params.tracers) + exp = Experiment(data.name, data.params.scheduler, work_dir, + data.out_dir, procs, execs, data.params.tracers) exp.log(start_message) if not ignore: - verify_environment(exp_params) + verify_environment(data.params) - run_script(exp_params.pre_script, exp, dir_name, work_dir) + run_script(data.params.pre_script, exp, dir_name, work_dir) exp.run_exp() - run_script(exp_params.post_script, exp, dir_name, out_dir) + run_script(data.params.post_script, exp, dir_name, data.out_dir) if jabber: - jabber.send("Completed '%s'" % name) + jabber.send("Completed '%s'" % data.name) - # Save parameters used to run experiment in out_dir - out_params = dict(exp_params.file_params.items() + - [(PARAMS['sched'], exp_params.scheduler), + # Save parameters used to run dataeriment in out_dir + out_params = dict([(PARAMS['sched'], data.params.scheduler), (PARAMS['tasks'], len(execs)), - (PARAMS['dur'], exp_params.duration)]) + (PARAMS['dur'], data.params.duration)] + + data.params.file_params.items()) # Feather-trace clock frequency saved for accurate overhead parsing ft_freq = com.ft_freq() if ft_freq: out_params[PARAMS['cycles']] = ft_freq - with open("%s/%s" % (out_dir, DEFAULTS['params_file']), 'w') as f: - f.write(str(out_params)) + out_param_f = "%s/%s" % (data.out_dir, DEFAULTS['params_file']) + with open(out_param_f, 'w') as f: + pprint.pprint(out_params, f) -def get_exps(opts, args): - '''Return list of experiment files or directories''' - if args: - return args +def make_paths(exp, opts, out_base_dir): + '''Translate experiment name to (schedule file, output directory) paths''' + path = "%s/%s" % (os.getcwd(), exp) + out_dir = "%s/%s" % (out_base_dir, os.path.split(exp.strip('/'))[1]) + + if not os.path.exists(path): + raise IOError("Invalid experiment: %s" % path) + + if opts.force and os.path.exists(out_dir): + shutil.rmtree(out_dir) - # Default to sched_file > generated dirs - if os.path.exists(opts.sched_file): - sys.stderr.write("Reading schedule from %s.\n" % opts.sched_file) - return [opts.sched_file] - elif os.path.exists(DEFAULTS['out-gen']): - sys.stderr.write("Reading schedules from %s/*.\n" % DEFAULTS['out-gen']) - sched_dirs = os.listdir(DEFAULTS['out-gen']) - return ['%s/%s' % (DEFAULTS['out-gen'], d) for d in sched_dirs] + if os.path.isdir(path): + sched_file = "%s/%s" % (path, opts.sched_file) else: - sys.stderr.write("Run with -h to view options.\n"); - sys.exit(1) + sched_file = path + + return sched_file, out_dir + + +def get_common_header(args): + common = "" + done = False + + if len(args) == 1: + return common + + while not done: + common += args[0][len(common)] + for path in args: + if path.find(common, 0, len(common)): + done = True + break + + return common[:len(common)-1] + + +def get_exps(opts, args, out_base_dir): + '''Return list of ExpDatas''' + + if not args: + if os.path.exists(opts.sched_file): + # Default to sched_file in current directory + sys.stderr.write("Reading schedule from %s.\n" % opts.sched_file) + args = [opts.sched_file] + elif os.path.exists(DEFAULTS['out-gen']): + # Then try experiments created by gen_exps + sys.stderr.write("Reading schedules from %s/*.\n" % DEFAULTS['out-gen']) + sched_dirs = os.listdir(DEFAULTS['out-gen']) + args = ['%s/%s' % (DEFAULTS['out-gen'], d) for d in sched_dirs] + else: + sys.stderr.write("Run with -h to view options.\n"); + sys.exit(1) + + # Part of arg paths which is identical for each arg + common = get_common_header(args) + + exps = [] + for path in args: + sched_file, out_dir = make_paths(path, opts, out_base_dir) + name = path[len(common):] + + sched_dir = os.path.split(sched_file)[0] + exp_params = make_exp_params(opts.scheduler, opts.duration, + sched_dir, opts.param_file) + + exps += [ExpData(name, exp_params, sched_file, out_dir, + 0, ExpState.None)] + + return exps def setup_jabber(target): @@ -338,32 +404,53 @@ def setup_email(target): return None -def make_paths(exp, out_base_dir, opts): - '''Translate experiment name to (schedule file, output directory) paths''' - path = "%s/%s" % (os.getcwd(), exp) - out_dir = "%s/%s" % (out_base_dir, os.path.split(exp.strip('/'))[1]) - - if not os.path.exists(path): - raise IOError("Invalid experiment: %s" % path) +def run_exps(exps, opts): + jabber = setup_jabber(opts.jabber) if opts.jabber else None - if opts.force and os.path.exists(out_dir): - shutil.rmtree(out_dir) + exps_remaining = list(enumerate(exps)) + while exps_remaining: + i, exp = exps_remaining.pop(0) - if os.path.isdir(path): - sched_file = "%s/%s" % (path, opts.sched_file) - else: - sched_file = path + verb = "Loading" if exp.state == ExpState.None else "Re-running failed" + start_message = "%s experiment %d of %d." % (verb, i+1, len(exps)) - return sched_file, out_dir + try: + run_experiment(exp, start_message, opts.ignore, jabber) + exp.state = ExpState.Succeeded + except KeyboardInterrupt: + sys.stderr.write("Keyboard interrupt, quitting\n") + break + except ExperimentDone: + sys.stderr.write("Experiment already completed at '%s'\n" % exp.out_dir) + exp.state = ExpState.Done + except (InvalidKernel, InvalidConfig) as e: + sys.stderr.write("Invalid environment for experiment '%s'\n" % exp.name) + sys.stderr.write("%s\n" % e) + exp.state = ExpState.Invalid + except SystemCorrupted as e: + sys.stderr.write("System is corrupted! Fix state before continuing.\n") + sys.stderr.write("%s\n" % e) + exp.state = ExpState.Failed + if not opts.retry: + break + else: + sys.stderr.write("Remaining experiments may fail\n") + except Exception as e: + sys.stderr.write("Failed experiment %s\n" % exp.name) + sys.stderr.write("%s\n" % e) + exp.state = ExpState.Failed + if exp.state is ExpState.Failed and opts.retry: + if exp.retries < MAX_RETRY: + exps_remaining += [(i, exp)] + exp.retries += 1 + else: + sys.stderr.write("Hit maximum retries of %d\n" % MAX_RETRY) def main(): opts, args = parse_args() - exps = get_exps(opts, args) - - jabber = setup_jabber(opts.jabber) if opts.jabber else None - email = setup_email(opts.email) if opts.email else None + email = setup_email(opts.email) if opts.email else None out_base = os.path.abspath(opts.out_dir) created = False @@ -371,62 +458,35 @@ def main(): created = True os.mkdir(out_base) - ran = done = succ = failed = invalid = 0 + exps = get_exps(opts, args, out_base) - for i, exp in enumerate(exps): - sched_file, out_dir = make_paths(exp, out_base, opts) - sched_dir = os.path.split(sched_file)[0] + run_exps(exps, opts) - try: - start_message = "Loading experiment %d of %d." % (i+1, len(exps)) - exp_params = make_exp_params(opts.scheduler, opts.duration, - sched_dir, opts.param_file) + def state_count(state): + return len(filter(lambda x: x.state is state, exps)) - run_experiment(exp, sched_file, exp_params, out_dir, - start_message, opts.ignore, jabber) - - succ += 1 - except ExperimentDone: - sys.stderr.write("Experiment '%s' already completed " % exp + - "at '%s'\n" % out_base) - done += 1 - except (InvalidKernel, InvalidConfig) as e: - sys.stderr.write("Invalid environment for experiment '%s'\n" % exp) - sys.stderr.write("%s\n" % e) - invalid += 1 - except KeyboardInterrupt: - sys.stderr.write("Keyboard interrupt, quitting\n") - break - except SystemCorrupted as e: - sys.stderr.write("System is corrupted! Fix state before continuing.\n") - sys.stderr.write("%s\n" % e) - break - except Exception as e: - sys.stderr.write("Failed experiment %s\n" % exp) - sys.stderr.write("%s\n" % e) - failed += 1 - - ran += 1 - - # Clean out directory if it failed immediately - if not os.listdir(out_base) and created and not succ: - os.rmdir(out_base) + ran = len(filter(lambda x: x.state is not ExpState.None, exps)) + succ = state_count(ExpState.Succeeded) message = "Experiments ran:\t%d of %d" % (ran, len(exps)) +\ "\n Successful:\t\t%d" % succ +\ - "\n Failed:\t\t%d" % failed +\ - "\n Already Done:\t\t%d" % done +\ - "\n Invalid Environment:\t%d" % invalid + "\n Failed:\t\t%d" % state_count(ExpState.Failed) +\ + "\n Already Done:\t\t%d" % state_count(ExpState.Done) +\ + "\n Invalid Environment:\t%d" % state_count(ExpState.Invalid) print(message) + if email: + email.send(message) + email.close() + if succ: sys.stderr.write("Successful experiment data saved in %s.\n" % opts.out_dir) + elif not os.listdir(out_base) and created: + # Remove directory if no data was put into it + os.rmdir(out_base) - if email: - email.send(message) - email.close() if __name__ == '__main__': main() -- cgit v1.2.2