From cd9f1b026cc5c4526dfbd2f7b1c5f39edb6a7309 Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Wed, 1 May 2013 15:48:01 -0400 Subject: Added --crontab option to run_exps.py This will use crontab to automatically restart the machine and resume the script when the machine crashes. An additional option, -k, is provided to cancel this operation. --- common.py | 5 +- run/crontab.py | 151 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ run/experiment.py | 8 +++ run_exps.py | 82 +++++++++++++++++++++++++---- 4 files changed, 234 insertions(+), 12 deletions(-) create mode 100644 run/crontab.py diff --git a/common.py b/common.py index 7abf0f2..9ea2915 100644 --- a/common.py +++ b/common.py @@ -182,7 +182,7 @@ def ft_freq(): def kernel(): - return subprocess.check_output(["uname", "-r"]) + return subprocess.check_output(["uname", "-r"]).strip("\n") def is_executable(fname): '''Return whether the file passed in is executable''' @@ -210,3 +210,6 @@ def log_once(id, msg = None, indent = True): if indent: msg = ' ' + msg.strip('\t').replace('\n', '\n\t') sys.stderr.write('\n' + msg.strip('\n') + '\n') + +def get_cmd(): + return os.path.split(sys.argv[0])[1] diff --git a/run/crontab.py b/run/crontab.py new file mode 100644 index 0000000..87d71b1 --- /dev/null +++ b/run/crontab.py @@ -0,0 +1,151 @@ +from __future__ import print_function + +import common +import os +import re +import sys + +from subprocess import Popen, PIPE, check_output + +PANIC_DUR = 10 +DELAY = 30 +DELAY_INTERVAL = 10 + +def get_cron_data(): + try: + return check_output(['crontab', '-l']) + except: + return "" + +def wall(message): + '''A wall command with no header''' + return "echo '%s' | wall -n" % message + +def sanitize(args, ignored): + ret_args = [] + for a in args: + if a in ignored: + continue + if '-' == a[0] and '--' != a[0:2]: + for i in ignored: + a = a.replace(i, '') + ret_args += [a] + return ret_args + +def get_outfname(): + return "cron-%s.txt" % common.get_cmd() + +def get_boot_cron(ignored_params, extra=""): + '''Turn current python script into a crontab reboot entry''' + job_args = sanitize(sys.argv, ignored_params) + job = " ".join(job_args) + out_fname = get_outfname() + + short_job = " ".join([common.get_cmd()] + job_args[1:]) + msg = "Job '%s' will write output to '%s'" % (short_job, out_fname) + + sys.stderr.write("%s %d seconds after reboot.\n" % (msg, DELAY)) + + # Create sleep and wall commands which will countdown DELAY seconds + # before executing the job + cmds = ["sleep %d" % DELAY_INTERVAL] + delay_rem = DELAY - DELAY_INTERVAL + while delay_rem > 0: + wmsg = "Restarting experiments in %d seconds. %s" % (delay_rem, extra) + cmds += [wall(wmsg)] + cmds += ["sleep %d" % min(DELAY_INTERVAL, delay_rem)] + delay_rem -= DELAY_INTERVAL + delay_cmd = ";".join(cmds) + + # Create command which will only execute if the same kernel is running + kern = common.kernel() + fail_wall = wall("Need matching kernel '%s' to run!" % kern) + run_cmd = "echo '%s' | grep -q `uname -r` && %s && %s && %s >> %s 2>>%s || %s" %\ + (kern, wall(msg), wall("Starting..."), job, out_fname, out_fname, fail_wall) + + return "@reboot cd %s; %s; %s;" % (os.getcwd(), delay_cmd, run_cmd) + +def set_panic_restart(bool_val): + '''Enable / disable restart on panics''' + if bool_val: + sys.stderr.write("Kernel will reboot after panic.\n") + dur = PANIC_DUR + else: + sys.stderr.write("Kernel will no longer reboot after panic.\n") + dur = 0 + + check_output(['sysctl', '-w', "kernel.panic=%d" % dur, + "kernel.panic_on_oops=%d" % dur]) + +def write_cron_data(data): + '''Write new crontab entry. No blank lines are written''' + + # I don't know why "^\s*$" doesn't match, hence this ugly regex + data = re.sub(r"\n\s*\n", "\n", data, re.M) + + sp = Popen(["crontab", "-"], stdin=PIPE) + stdout, stderr = sp.communicate(input=data) + +def install_path(): + '''Place the current path in the crontab entry''' + data = get_cron_data() + curr_line = re.findall(r"PATH=.*", data) + + if curr_line: + curr_paths = re.findall(r"((?:\/\w+)+)", curr_line[0]) + data = re.sub(curr_line[0], "", data) + else: + curr_paths = [] + curr_paths = set(curr_paths) + + for path in os.environ["PATH"].split(os.pathsep): + curr_paths.add(path) + + data = "PATH=" + os.pathsep.join(curr_paths) + "\n" + data + + write_cron_data(data) + +def install_boot_job(ignored_params, reboot_message): + '''Re-run the current python script on system reboot using crontab''' + remove_boot_job() + + data = get_cron_data() + job = get_boot_cron(ignored_params, reboot_message) + + set_panic_restart(True) + + write_cron_data(data + job + "\n") + + if job not in get_cron_data(): + raise IOError("Failed to write %s into cron!" % job) + else: + install_path() + +def clean_output(): + fname = get_outfname() + if os.path.exists(fname): + os.remove(fname) + +def kill_boot_job(): + remove_boot_job() + + cmd = common.get_cmd() + + procs = check_output("ps -eo pid,args".split(" ")) + pairs = re.findall("(\d+) (.*)", procs) + + for pid, args in pairs: + if re.search(r"/bin/sh -c.*%s"%cmd, args): + sys.stderr.write("Killing job %s\n" % pid) + check_output(("kill -9 %s" % pid).split(" ")) + +def remove_boot_job(): + '''Remove installed reboot job from crontab''' + data = get_cron_data() + regex = re.compile(r".*%s.*" % re.escape(common.get_cmd()), re.M) + + if regex.search(data): + new_cron = regex.sub("", data) + write_cron_data(new_cron) + + set_panic_restart(False) diff --git a/run/experiment.py b/run/experiment.py index 9a70414..da0e32e 100644 --- a/run/experiment.py +++ b/run/experiment.py @@ -35,6 +35,9 @@ class Experiment(object): self.exec_err = None self.tracer_types = tracer_types + self.regular_tracers = [] + self.exact_tracers = [] + def __setup_tracers(self): tracers = [ t(self.working_dir) for t in self.tracer_types ] @@ -55,8 +58,13 @@ class Experiment(object): Experiment.INTERRUPTED_DIR) interrupted = "%s/%s" % (os.path.split(self.working_dir)[0], Experiment.INTERRUPTED_DIR) + old_int = "%s/%s" % (self.working_dir, Experiment.INTERRUPTED_DIR) + if os.path.exists(interrupted): sh.rmtree(interrupted) + if os.path.exists(old_int): + sh.rmtree(old_int) + os.rename(self.working_dir, interrupted) os.mkdir(self.working_dir) diff --git a/run_exps.py b/run_exps.py index 1bad2a3..21666a9 100755 --- a/run_exps.py +++ b/run_exps.py @@ -3,10 +3,12 @@ from __future__ import print_function import common as com import os +import pickle import pprint import re import shutil import sys +import run.crontab as cron import run.tracer as trace from config.config import PARAMS,DEFAULTS,FILES @@ -17,9 +19,6 @@ from run.executable.executable import Executable from run.experiment import Experiment,ExperimentDone,SystemCorrupted from run.proc_entry import ProcEntry -'''Maximum times an experiment will be retried''' -MAX_RETRY = 5 - '''Customizable experiment parameters''' ExpParams = namedtuple('ExpParams', ['scheduler', 'duration', 'tracers', 'kernel', 'config_options', 'file_params', @@ -31,6 +30,11 @@ ExpData = com.recordtype('ExpData', ['name', 'params', 'sched_file', 'out_dir', '''Comparison of requested versus actual kernel compile parameter value''' ConfigResult = namedtuple('ConfigResult', ['param', 'wanted', 'actual']) +'''Maximum times an experiment will be retried''' +MAX_RETRY = 5 +'''Location experiment retry count is stored''' +TRIES_FNAME = ".tries.pkl" + class InvalidKernel(Exception): def __init__(self, kernel): @@ -88,6 +92,9 @@ def parse_args(): action='store_true', default=False, help='use crontab to resume interrupted script after ' 'system restarts. implies --retry') + group.add_option('-k', '--kill-crontab', dest='kill', + action='store_true', default=False, + help='kill existing script crontabs and exit') parser.add_option_group(group) return parser.parse_args() @@ -314,7 +321,7 @@ def run_experiment(data, start_message, ignore, jabber): def make_paths(exp, opts, out_base_dir): '''Translate experiment name to (schedule file, output directory) paths''' - path = "%s/%s" % (os.getcwd(), exp) + path = os.path.abspath(exp) out_dir = "%s/%s" % (out_base_dir, os.path.split(exp.strip('/'))[1]) if not os.path.exists(path): @@ -408,10 +415,35 @@ def setup_email(target): return None +def tries_file(exp): + return "%s/%s" % (os.path.split(exp.sched_file)[0], TRIES_FNAME) + + +def get_tries(exp): + if not os.path.exists(tries_file(exp)): + return 0 + with open(tries_file(exp), 'r') as f: + return int(pickle.load(f)) + + +def set_tries(exp, val): + if not val: + if os.path.exists(tries_file(exp)): + os.remove(tries_file(exp)) + else: + with open(tries_file(exp), 'w') as f: + pickle.dump(str(val), f) + os.system('sync') + + def run_exps(exps, opts): jabber = setup_jabber(opts.jabber) if opts.jabber else None - exps_remaining = list(enumerate(exps)) + # Give each experiment a unique id + exps_remaining = enumerate(exps) + # But run experiments which have failed the most last + exps_remaining = sorted(exps_remaining, key=lambda x: get_tries(x[1])) + while exps_remaining: i, exp = exps_remaining.pop(0) @@ -419,17 +451,26 @@ def run_exps(exps, opts): start_message = "%s experiment %d of %d." % (verb, i+1, len(exps)) try: + set_tries(exp, get_tries(exp) + 1) + if get_tries(exp) > MAX_RETRY: + raise Exception("Hit maximum retries of %d" % MAX_RETRY) + run_experiment(exp, start_message, opts.ignore, jabber) + + set_tries(exp, 0) exp.state = ExpState.Succeeded except KeyboardInterrupt: sys.stderr.write("Keyboard interrupt, quitting\n") + set_tries(exp, get_tries(exp) - 1) break except ExperimentDone: sys.stderr.write("Experiment already completed at '%s'\n" % exp.out_dir) + set_tries(exp, 0) exp.state = ExpState.Done except (InvalidKernel, InvalidConfig) as e: sys.stderr.write("Invalid environment for experiment '%s'\n" % exp.name) sys.stderr.write("%s\n" % e) + set_tries(exp, get_tries(exp) - 1) exp.state = ExpState.Invalid except SystemCorrupted as e: sys.stderr.write("System is corrupted! Fix state before continuing.\n") @@ -445,17 +486,19 @@ def run_exps(exps, opts): exp.state = ExpState.Failed if exp.state is ExpState.Failed and opts.retry: - if exp.retries < MAX_RETRY: - exps_remaining += [(i, exp)] - exp.retries += 1 - else: - sys.stderr.write("Hit maximum retries of %d\n" % MAX_RETRY) + exps_remaining += [(i, exp)] + def main(): opts, args = parse_args() + if opts.kill: + cron.kill_boot_job() + sys.exit(1) + email = setup_email(opts.email) if opts.email else None + # Create base output directory for run data out_base = os.path.abspath(opts.out_dir) created = False if not os.path.exists(out_base): @@ -464,7 +507,24 @@ def main(): exps = get_exps(opts, args, out_base) - run_exps(exps, opts) + if opts.crontab: + # Resume script on startup + opts.retry = True + cron.install_boot_job(['f', '--forced'], + "Stop with %s -k" % com.get_cmd()) + + if opts.force or not opts.retry: + cron.clean_output() + for e in exps: + set_tries(e, 0) + + try: + run_exps(exps, opts) + finally: + # Remove persistent state + for e in exps: + set_tries(e, 0) + cron.remove_boot_job() def state_count(state): return len(filter(lambda x: x.state is state, exps)) -- cgit v1.2.2