From c8cb14963511d5d1a3eb46624bcc0d2bcdf3b9bc Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Sun, 30 Sep 2012 18:25:38 -0400 Subject: Added more robust error handling inspired by color tests. --- experiment/executable/executable.py | 2 +- experiment/experiment.py | 53 +++++++++++---- experiment/tracer.py | 3 +- parse/sched.py | 125 ++++++++++++++++++++++++++---------- parse/tuple_table.py | 3 + parse_exps.py | 5 ++ run_exps.py | 52 +++++++++------ 7 files changed, 173 insertions(+), 70 deletions(-) diff --git a/experiment/executable/executable.py b/experiment/executable/executable.py index 897c2d9..09b7370 100644 --- a/experiment/executable/executable.py +++ b/experiment/executable/executable.py @@ -71,4 +71,4 @@ class Executable(object): self.sp.wait() if self.sp.returncode != 0: - print >>sys.stderr, "Non-zero return: %s %s" % (self.exec_file, self.extra_args) + print >>sys.stderr, "Non-zero return: %s %s" % (self.exec_file, " ".join(self.extra_args)) diff --git a/experiment/experiment.py b/experiment/experiment.py index a95ca42..a44f798 100644 --- a/experiment/experiment.py +++ b/experiment/experiment.py @@ -91,16 +91,16 @@ class Experiment(object): def __run_tasks(self): exec_pause = 0.3 - self.log("Starting the program in ({0} seconds)".format( + self.log("Starting the programs over ({0} seconds)".format( len(self.executables) * exec_pause)) for e in self.executables: try: e.execute() except: - raise Exception("Executable failed: %s" % e) + raise Exception("Executable failed: %s" % e) time.sleep(exec_pause) - sleep_time = 2 + sleep_time = len(self.executables) / litmus_util.num_cpus() self.log("Sleeping for %d seconds before release" % sleep_time) time.sleep(sleep_time) @@ -117,13 +117,18 @@ class Experiment(object): if released != len(self.executables): # Some tasks failed to release, kill all tasks and fail # Need to re-release non-released tasks before we can kill them though - self.log("Failed to release %d tasks! Re-releasing and killing".format( - len(self.experiments) - released)) + self.log("Failed to release {} tasks! Re-releasing and killing".format( + len(self.executables) - released, len(self.executables))) - time.sleep(10) - litmus_util.release_tasks() + time.sleep(5) - time.sleep(20) + released = litmus_util.release_tasks() + + self.log("Re-released %d tasks" % released) + + time.sleep(5) + + self.log("Killing all tasks") map(methodcaller('kill'), self.executables) ret = False @@ -147,23 +152,46 @@ class Experiment(object): def run_exp(self): self.setup() + + succ = False + try: self.__run_tasks() + self.log("Saving results in %s" % self.finished_dir) + succ = True finally: self.teardown() - def setup(self): - self.log("Switching to %s" % self.scheduler) - litmus_util.switch_scheduler(self.scheduler) + if succ: + self.__save_results() + self.log("Experiment done!") + + def setup(self): self.log("Writing %d proc entries" % len(self.proc_entries)) map(methodcaller('write_proc'), self.proc_entries) + time.sleep(5) + + self.log("Switching to %s" % self.scheduler) + litmus_util.switch_scheduler(self.scheduler) + self.log("Starting %d tracers" % len(self.tracers)) map(methodcaller('start_tracing'), self.tracers) + + self.exec_out = open('%s/exec-out.txt' % self.working_dir, 'w') + self.exec_err = open('%s/exec-err.txt' % self.working_dir, 'w') + def set_out(executable): + executable.stdout_file = self.exec_out + executable.stderr_file = self.exec_err + map(set_out, self.executables) + time.sleep(4) def teardown(self): + self.exec_out.close() + self.exec_err.close() + sleep_time = 5 self.log("Sleeping %d seconds to allow buffer flushing" % sleep_time) time.sleep(sleep_time) @@ -174,6 +202,3 @@ class Experiment(object): self.log("Switching to Linux scheduler") litmus_util.switch_scheduler("Linux") - self.log("Saving results in %s" % self.finished_dir) - self.__save_results() - self.log("Experiment done!") diff --git a/experiment/tracer.py b/experiment/tracer.py index ad4ebfe..4949927 100644 --- a/experiment/tracer.py +++ b/experiment/tracer.py @@ -27,13 +27,14 @@ class LinuxTracer(Tracer): def __init__(self, output_dir): super(LinuxTracer, self).__init__("trace-cmd", output_dir) - extra_args = ["record", "-e", "sched:sched_switch", + extra_args = ["record", # "-e", "sched:sched_switch", "-e", "litmus:*", "-o", "%s/%s" % (output_dir, conf.FILES['linux_data'])] stdout = open('%s/trace-cmd-stdout.txt' % self.output_dir, 'w') stderr = open('%s/trace-cmd-stderr.txt' % self.output_dir, 'w') execute = Executable(conf.BINS['trace-cmd'], extra_args, stdout, stderr) + execute.cwd = output_dir self.bins.append(execute) @staticmethod diff --git a/parse/sched.py b/parse/sched.py index 5e3ba6b..b84e16e 100644 --- a/parse/sched.py +++ b/parse/sched.py @@ -1,6 +1,5 @@ """ -TODO: make regexes indexable by name - +TODO: No longer very pythonic, lot of duplicate code """ import config.config as conf @@ -12,7 +11,27 @@ import subprocess from collections import namedtuple,defaultdict from point import Measurement,Type -TaskConfig = namedtuple('TaskConfig', ['cpu','wcet','period']) +PARAM_RECORD = r"(?P" +\ + r"PARAM *?(?P\d+)\/.*?" +\ + r"cost:\s+(?P[\d\.]+)ms.*?" +\ + r"period.*?(?P[\d.]+)ms.*?" +\ + r"part.*?(?P\d+)[, ]*" +\ + r"(?:class=(?P\w+))?[, ]*" +\ + r"(?:level=(?P\w+))?).*$" +EXIT_RECORD = r"(?P" +\ + r"TASK_EXIT *?(?P\d+)/.*?" +\ + r"Avg.*?(?P\d+).*?" +\ + r"Max.*?(?P\d+))" +TARDY_RECORD = r"(?P" +\ + r"TARDY.*?(?P\d+)/(?P\d+).*?" +\ + r"Tot.*?(?P[\d\.]+).*?ms.*?" +\ + r"(?P[\d\.]+).*?ms.*?" +\ + r"(?P[\d\.]+))" +COMPLETION_RECORD = r"(?P" +\ + r"COMPLETION.*?(?P\d+)/.*?" +\ + r"(?P[\d\.]+)ms)" + +TaskConfig = namedtuple('TaskConfig', ['cpu','wcet','period','type','level']) Task = namedtuple('Task', ['pid', 'config']) def get_st_output(data_dir, out_dir): @@ -34,18 +53,43 @@ def get_st_output(data_dir, out_dir): return output_file def get_tasks(data): - reg = r"PARAM *?(\d+)\/.*?cost:\s+([\d\.]+)ms.*?period.*?([\d.]+)ms.*?part.*?(\d+)" ret = [] - for match in re.findall(reg, data): - t = Task(match[0], TaskConfig(match[3],match[1],match[2])) - ret += [t] + for match in re.finditer(PARAM_RECORD, data, re.M): + try: + t = Task( int(match.group('PID')), + TaskConfig( int(match.group('CPU')), + float(match.group('WCET')), + float(match.group('PERIOD')), + match.group("CLASS"), + match.group("LEVEL"))) + if not (t.config.period and t.pid): + raise Exception() + ret += [t] + except Exception as e: + raise Exception("Invalid task record: %s\nparsed:\n\t%s\n\t%s" % + (e, match.groupdict(), match.group('RECORD'))) return ret +def get_tasks_dict(data): + tasks_list = get_tasks(data) + tasks_dict = {} + for t in tasks_list: + tasks_dict[t.pid] = t + return tasks_dict + def get_task_exits(data): - reg = r"TASK_EXIT *?(\d+)/.*?Avg.*?(\d+).*?Max.*?(\d+)" ret = [] - for match in re.findall(reg, data): - m = Measurement(match[0], {Type.Max : match[2], Type.Avg : match[1]}) + for match in re.finditer(EXIT_RECORD, data): + try: + m = Measurement( int(match.group('PID')), + {Type.Max : float(match.group('MAX')), + Type.Avg : float(match.group('AVG'))}) + for (type, value) in m: + if not value: raise Exception() + except: + raise Exception("Invalid exit record, parsed:\n\t%s\n\t%s" % + (match.groupdict(), m.group('RECORD'))) + ret += [m] return ret @@ -55,40 +99,51 @@ def extract_tardy_vals(data, exp_point): avg_tards = [] max_tards = [] - for t in get_tasks(data): - reg = r"TARDY.*?" + t.pid + "/(\d+).*?Tot.*?([\d\.]+).*?ms.*?([\d\.]+).*?ms.*?([\d\.]+)" - matches = re.findall(reg, data) - if len(matches) != 0: - jobs = float(matches[0][0]) + tasks = get_tasks_dict(data) - total_tard = float(matches[0][1]) - avg_tard = (total_tard / jobs) / float(t.config.period) - max_tard = float(matches[0][2]) / float(t.config.period) + for match in re.finditer(TARDY_RECORD, data): + try: + pid = int(match.group("PID")) + jobs = int(match.group("JOB")) + misses = int(match.group("MISSES")) + total_tard = float(match.group("TOTAL")) + max_tard = float(match.group("MAX")) - misses = float(matches[0][3]) - if misses != 0: - miss_ratio = (misses / jobs) - else: - miss_ratio = 0 + if not (jobs and pid): raise Exception() + except: + raise Exception("Invalid tardy record:\n\t%s\n\t%s" % + (match.groupdict(), match.group("RECORD"))) - ratios += [miss_ratio] - avg_tards += [avg_tard] - max_tards += [max_tard] + if pid not in tasks: + raise Exception("Invalid pid '%d' in tardy record:\n\t%s" % + match.group("RECORD")) + + t = tasks[pid] + avg_tards += [ total_tard / (jobs * t.config.period) ] + max_tards += [ max_tard / t.config.period ] + ratios += [ misses / jobs ] exp_point["avg-rel-tard"] = Measurement().from_array(avg_tards) exp_point["max-rel-tard"] = Measurement().from_array(max_tards) - exp_point["miss-ratio"] = Measurement().from_array(ratios) + exp_point["miss-ratio"] = Measurement().from_array(ratios) def extract_variance(data, exp_point): varz = [] - for t in get_tasks(data): - reg = r"COMPLETION.*?" + t.pid + r".*?([\d\.]+)ms" - matches = re.findall(reg, data) + completions = defaultdict(lambda: []) + + for match in re.finditer(COMPLETION_RECORD, data): + try: + pid = int(match.group("PID")) + duration = float(match.group("EXEC")) - if len(matches) == 0: - return 0 + if not (duration and pid): raise Exception() + except: + raise Exception("Invalid completion record:\n\t%s\n\t%s" % + (match.groupdict(), match.group("RECORD"))) + completions[pid] += [duration] - job_times = np.array(filter(lambda x: float(x) != 0, matches), dtype=np.float) + for (pid, durations) in completions: + job_times = np.array(durations) # Coefficient of variation cv = job_times.std() / job_times.mean() @@ -127,6 +182,10 @@ def config_exit_stats(file): task_list = sorted(config_dict[config]) # Replace tasks with corresponding exit stats + if not t.pid in exit_dict: + raise Exception("Missing exit record for task '%s' in '%s'" % + (t, file)) + exit_list = [exit_dict[t.pid] for t in task_list] config_dict[config] = exit_list diff --git a/parse/tuple_table.py b/parse/tuple_table.py index b56fa6c..6363b80 100644 --- a/parse/tuple_table.py +++ b/parse/tuple_table.py @@ -27,6 +27,9 @@ class ColMap(object): return key + def __contains__(self, col): + return col in self.rev_map + def get_map(self, tuple): map = {} for i in range(0, len(tuple)): diff --git a/parse_exps.py b/parse_exps.py index 3a1d1b9..c91a654 100755 --- a/parse_exps.py +++ b/parse_exps.py @@ -90,6 +90,10 @@ def main(): (plain_exps, scaling_bases) = gen_exp_data(args, base_conf, col_map) + if base_conf and base_conf.keys()[0] not in col_map: + raise IOError("Base column '%s' not present in any parameters!" % + base_conf.keys()[0]) + base_table = TupleTable(col_map) result_table = TupleTable(col_map) @@ -105,6 +109,7 @@ def main(): ft.extract_ft_data(exp.data_files.ft, result, conf.BASE_EVENTS) if exp.data_files.st: + base = None if base_conf: # Try to find a scaling base base_params = copy.deepcopy(exp.params) diff --git a/run_exps.py b/run_exps.py index 4484952..825ad5b 100755 --- a/run_exps.py +++ b/run_exps.py @@ -38,13 +38,13 @@ def parse_args(): def convert_data(data): """Convert a non-python schedule file into the python format""" regex = re.compile( - - r"(?P^" - r"(?P
/proc/\w+?/)?" - r"(?P[\w\/]+)" - r"\s*{\s*(?P.*?)\s*?}$)|" - r"(?P^(?P\w+?spin)?\s*?" - r"(?P\w[\s\w]*?)?\s*?$)", + r"(?P^" + r"(?P
/proc/\w+?/)?" + r"(?P[\w\/]+)" + r"\s*{\s*(?P.*?)\s*?}$)|" + r"(?P^" + r"(?P\w+?spin)?\s+" + r"(?P[\w\-_\d\. ]+)\s*$)", re.S|re.I|re.M) procs = [] @@ -63,6 +63,15 @@ def convert_data(data): return {'proc' : procs, 'spin' : spins} +def fix_paths(schedule, exp_dir): + for (idx, (spin, args)) in enumerate(schedule['spin']): + # Replace relative paths (if present) with absolute ones + for arg in args.split(" "): + abspath = "%s/%s" % (exp_dir, arg) + if os.path.exists(abspath): + args = args.replace(arg, abspath) + + schedule['spin'][idx] = (spin, args) def get_dirs(sched_file, out_base_dir): sched_leaf_dir = re.findall(r".*/([\w_-]+)/.*?$", sched_file)[0] @@ -88,27 +97,27 @@ def load_experiment(sched_file, scheduler, duration, param_file, out_base): params = {} kernel = "" - if not scheduler or not duration: - param_file = param_file or \ - "%s/%s" % (dirname, conf.DEFAULTS['params_file']) + param_file = param_file or \ + "%s/%s" % (dirname, conf.DEFAULTS['params_file']) - if os.path.isfile(param_file): - params = load_params(param_file) - scheduler = scheduler or params[conf.PARAMS['sched']] - duration = duration or params[conf.PARAMS['dur']] + if os.path.isfile(param_file): + params = load_params(param_file) + scheduler = scheduler or params[conf.PARAMS['sched']] + duration = duration or params[conf.PARAMS['dur']] - # Experiments can specify required kernel name - if conf.PARAMS['kernel'] in params: - kernel = params[conf.PARAMS['kernel']] + # Experiments can specify required kernel name + if conf.PARAMS['kernel'] in params: + kernel = params[conf.PARAMS['kernel']] - duration = duration or conf.DEFAULTS['duration'] + duration = duration or conf.DEFAULTS['duration'] - if not scheduler: - raise IOError("Parameter scheduler not specified in %s" % (param_file)) + if not scheduler: + raise IOError("Parameter scheduler not specified in %s" % (param_file)) # Parse schedule file's intentions schedule = load_schedule(sched_file) (work_dir, out_dir) = get_dirs(sched_file, out_base) + fix_paths(schedule, os.path.split(sched_file)[0]) run_exp(sched_file, schedule, scheduler, kernel, duration, work_dir, out_dir) @@ -170,8 +179,9 @@ def run_exp(name, schedule, scheduler, kernel, duration, work_dir, out_dir): exp = Experiment(name, scheduler, work_dir, out_dir, proc_entries, executables) - exp.run_exp() + exp.run_exp() + def main(): opts, args = parse_args() -- cgit v1.2.2