From 09bc409657606a37346d82ab1e4c44a165bd3541 Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Fri, 12 Apr 2013 11:30:27 -0400 Subject: Improved error handling in run_exps.py. --- common.py | 6 +- parse/sched.py | 1 + run/executable/executable.py | 6 +- run/experiment.py | 128 +++++++++++++++++++++++++------------------ run/litmus_util.py | 23 ++++++-- run_exps.py | 18 ++++-- 6 files changed, 114 insertions(+), 68 deletions(-) diff --git a/common.py b/common.py index 4920ec8..a2c6224 100644 --- a/common.py +++ b/common.py @@ -26,15 +26,17 @@ def get_executable_hint(prog, hint, optional=False): '''Search for @prog in system PATH. Print @hint if no binary is found. Die if not @optional.''' try: - prog = get_executable(prog) + full_path = get_executable(prog) except IOError: if not optional: sys.stderr.write(("Cannot find executable '%s' in PATH. This is " +\ "a part of '%s' which should be added to PATH.\n")\ % (prog, hint)) sys.exit(1) + else: + full_path = None - return prog + return full_path def get_config_option(option): '''Search for @option in installed kernel config (if present). diff --git a/parse/sched.py b/parse/sched.py index 2da0149..147a2e5 100644 --- a/parse/sched.py +++ b/parse/sched.py @@ -161,6 +161,7 @@ def extract_sched_data(result, data_dir, work_dir): cmd_arr = [conf.BINS['st_show']] cmd_arr.extend(bins) with open(output_file, "w") as f: + print("calling %s" % cmd_arr) subprocess.call(cmd_arr, cwd=data_dir, stdout=f) task_dict = defaultdict(lambda : diff --git a/run/executable/executable.py b/run/executable/executable.py index 02e35ae..263e305 100644 --- a/run/executable/executable.py +++ b/run/executable/executable.py @@ -1,13 +1,13 @@ import sys import subprocess import signal -from common import is_executable +from common import get_executable class Executable(object): '''Parent object that represents an executable for use in task-sets.''' def __init__(self, exec_file, extra_args=None, stdout_file = None, stderr_file = None): - self.exec_file = exec_file + self.exec_file = get_executable(exec_file) self.cwd = None self.stdout_file = stdout_file self.stderr_file = stderr_file @@ -18,7 +18,7 @@ class Executable(object): else: self.extra_args = [str(a) for a in list(extra_args)] # make a duplicate - if not is_executable(self.exec_file): + if not self.exec_file: raise Exception("Not executable ? : %s" % self.exec_file) def __del__(self): diff --git a/run/experiment.py b/run/experiment.py index e5811f8..ff0e9f3 100644 --- a/run/experiment.py +++ b/run/experiment.py @@ -1,7 +1,9 @@ +import common as com import os import time import run.litmus_util as lu import shutil as sh +import traceback from operator import methodcaller class ExperimentException(Exception): @@ -15,17 +17,12 @@ class ExperimentDone(ExperimentException): def __str__(self): return "Experiment finished already: %d" % self.name - -class ExperimentInterrupted(ExperimentException): - '''Raised when an experiment appears to be interrupted (partial results).''' - def __str__(self): - return "Experiment was interrupted in progress: %d" % self.name - - class ExperimentFailed(ExperimentException): def __str__(self): return "Experiment failed during execution: %d" % self.name +class SystemCorrupted(Exception): + pass class Experiment(object): '''Execute one task-set and save the results. Experiments have unique IDs.''' @@ -44,6 +41,8 @@ class Experiment(object): self.exec_out = None self.exec_err = None + self.task_batch = com.num_cpus() + self.__make_dirs() self.__assign_executable_cwds() self.__setup_tracers(tracer_types) @@ -84,65 +83,67 @@ class Experiment(object): executable.cwd = self.working_dir map(assign_cwd, self.executables) - def __run_tasks(self): - already_waiting = lu.waiting_tasks() + def __kill_all(self): + # Give time for whatever failed to finish failing + time.sleep(2) + + released = lu.release_tasks() + self.log("Re-released %d tasks" % released) - if already_waiting: - self.log("Already %d tasks waiting for release!") - self.log("Experiment will fail if any of these tasks are released.") + time.sleep(5) - self.log("Starting the programs") + self.log("Killing all tasks") for e in self.executables: + try: + e.kill() + except: + pass + + time.sleep(2) + + def __run_tasks(self): + self.log("Starting %d tasks" % len(self.executables)) + + for i,e in enumerate(self.executables): try: e.execute() except: raise Exception("Executable failed: %s" % e) self.log("Sleeping until tasks are ready for release...") - start = time.clock() - while (lu.waiting_tasks() - already_waiting) < len(self.executables): - if time.clock() - start > 30.0: - raise Exception("Too much time has passed waiting for tasks!") + start = time.time() + while lu.waiting_tasks() < len(self.executables): + if time.time() - start > 30.0: + s = "waiting: %d, submitted: %d" %\ + (lu.waiting_tasks(), len(self.executables)) + raise Exception("Too much time spent waiting for tasks! %s" % s) time.sleep(1) # Exact tracers (like overheads) must be started right after release or # measurements will be full of irrelevant records self.log("Starting %d released tracers" % len(self.exact_tracers)) map(methodcaller('start_tracing'), self.exact_tracers) + time.sleep(1) - self.log("Releasing %d tasks" % len(self.executables)) - released = lu.release_tasks() - - ret = True - if released != len(self.executables): - # Some tasks failed to release, kill all tasks and fail - # Need to re-release non-released tasks before we can kill them though - self.log("Failed to release {} tasks! Re-releasing and killing".format( - len(self.executables) - released, len(self.executables))) - time.sleep(5) - + try: + self.log("Releasing %d tasks" % len(self.executables)) released = lu.release_tasks() - self.log("Re-released %d tasks" % released) - - time.sleep(5) - - self.log("Killing all tasks") - map(methodcaller('kill'), self.executables) + if released != len(self.executables): + # Some tasks failed to release, kill all tasks and fail + # Need to release non-released tasks before they can be killed + raise Exception("Released %s tasks, expected %s tasks" % + (released, len(self.executables))) - ret = False + self.log("Waiting for program to finish...") + for e in self.executables: + if not e.wait(): + raise Exception("Executable %s failed to complete!" % e) - self.log("Waiting for program to finish...") - for e in self.executables: - if not e.wait(): - ret = False - - # And these must be stopped here for the same reason - self.log("Stopping exact tracers") - map(methodcaller('stop_tracing'), self.exact_tracers) - - if not ret: - raise ExperimentFailed(self.name) + finally: + # And these must be stopped here for the same reason + self.log("Stopping exact tracers") + map(methodcaller('stop_tracing'), self.exact_tracers) def __save_results(self): os.rename(self.working_dir, self.finished_dir) @@ -151,29 +152,48 @@ class Experiment(object): print("[Exp %s]: %s" % (self.name, msg)) def run_exp(self): + self.__check_system() + succ = False try: - self.setup() + self.__setup() try: self.__run_tasks() self.log("Saving results in %s" % self.finished_dir) succ = True + except: + traceback.print_exc() + self.__kill_all() + raise ExperimentFailed(self.name) finally: - self.teardown() + self.__teardown() finally: self.log("Switching to Linux scheduler") - try: - lu.switch_scheduler("Linux") - except: - self.log("Failed to switch back to Linux.") + + # Give the tasks 10 seconds to finish before bailing + start = time.time() + while lu.all_tasks() > 0: + if time.time() - start < 10.0: + raise SystemCorrupted("%d tasks still running!" % + lu.all_tasks()) + + lu.switch_scheduler("Linux") if succ: self.__save_results() self.log("Experiment done!") + def __check_system(self): + running = lu.all_tasks() + if running: + raise SystemCorrupted("%d tasks already running!" % running) + + sched = lu.scheduler() + if sched != "Linux": + raise SystemCorrupted("Scheduler is %s, should be Linux" % sched) - def setup(self): + def __setup(self): self.log("Writing %d proc entries" % len(self.proc_entries)) map(methodcaller('write_proc'), self.proc_entries) @@ -190,7 +210,7 @@ class Experiment(object): executable.stderr_file = self.exec_err map(set_out, self.executables) - def teardown(self): + def __teardown(self): self.exec_out and self.exec_out.close() self.exec_err and self.exec_err.close() diff --git a/run/litmus_util.py b/run/litmus_util.py index 4709b66..70da262 100644 --- a/run/litmus_util.py +++ b/run/litmus_util.py @@ -3,6 +3,11 @@ import time import subprocess import config.config as conf +def scheduler(): + with open('/proc/litmus/active_plugin', 'r') as active_plugin: + cur_plugin = active_plugin.read().strip() + return cur_plugin + def switch_scheduler(switch_to_in): '''Switch the scheduler to whatever is passed in. @@ -18,11 +23,10 @@ def switch_scheduler(switch_to_in): # it takes a bit to do the switch, sleep an arbitrary amount of time time.sleep(2) - with open('/proc/litmus/active_plugin', 'r') as active_plugin: - cur_plugin = active_plugin.read().strip() - + cur_plugin = scheduler() if switch_to != cur_plugin: - raise Exception("Could not switch to '%s' (check dmesg)" % switch_to) + raise Exception("Could not switch to '%s' (check dmesg), current: %s" %\ + (switch_to, cur_plugin)) def waiting_tasks(): reg = re.compile(r'^ready.*?(?P\d+)$', re.M) @@ -35,6 +39,17 @@ def waiting_tasks(): return 0 if not ready else int(ready) +def all_tasks(): + reg = re.compile(r'^real-time.*?(?P\d+)$', re.M) + with open('/proc/litmus/stats', 'r') as f: + data = f.read() + + # Ignore if no tasks are waiting for release + match = re.search(reg, data) + ready = match.group("TASKS") + + return 0 if not ready else int(ready) + def release_tasks(): try: data = subprocess.check_output([conf.BINS['release']]) diff --git a/run_exps.py b/run_exps.py index b9cf88e..6531415 100755 --- a/run_exps.py +++ b/run_exps.py @@ -8,12 +8,11 @@ import re import shutil import sys import run.tracer as trace -import traceback from collections import namedtuple from optparse import OptionParser from run.executable.executable import Executable -from run.experiment import Experiment,ExperimentDone +from run.experiment import Experiment,ExperimentDone,ExperimentFailed,SystemCorrupted from run.proc_entry import ProcEntry '''Customizable experiment parameters''' @@ -330,6 +329,7 @@ def main(): created = True os.mkdir(out_base) + ran = 0 done = 0 succ = 0 failed = 0 @@ -362,15 +362,23 @@ def main(): invalid += 1 print("Invalid environment for experiment '%s'" % exp) print(e) - except: + except KeyboardInterrupt: + print("Keyboard interrupt, quitting") + break + except SystemCorrupted as e: + print("System is corrupted! Fix state before continuing.") + print(e) + break + except ExperimentFailed: print("Failed experiment %s" % exp) - traceback.print_exc() failed += 1 + ran += 1 + if not os.listdir(out_base) and created and not succ: os.rmdir(out_base) - message = "Experiments run:\t%d" % len(args) +\ + message = "Experiments ran:\t%d of %d" % (ran, len(args)) +\ "\n Successful:\t\t%d" % succ +\ "\n Failed:\t\t%d" % failed +\ "\n Already Done:\t\t%d" % done +\ -- cgit v1.2.2