From 5d97a6baf6166b74355c6e744e010949a46fd625 Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Sun, 7 Oct 2012 23:40:12 -0400 Subject: Split scheduling data by task criticality. --- experiment/experiment.py | 32 ++++++------ parse/ft.py | 2 +- parse/sched.py | 125 ++++++++++++++++++++++++++++------------------- parse/tuple_table.py | 8 +-- parse_exps.py | 20 +++----- run_exps.py | 22 ++++++--- 6 files changed, 121 insertions(+), 88 deletions(-) diff --git a/experiment/experiment.py b/experiment/experiment.py index a44f798..e6dc92d 100644 --- a/experiment/experiment.py +++ b/experiment/experiment.py @@ -40,6 +40,8 @@ class Experiment(object): self.finished_dir = finished_dir self.proc_entries = proc_entries self.executables = executables + self.exec_out = None + self.exec_err = None self.__make_dirs() self.__assign_executable_cwds() @@ -151,23 +153,26 @@ class Experiment(object): print "[Exp %s]: %s" % (self.name, msg) def run_exp(self): - self.setup() - succ = False - try: - self.__run_tasks() - self.log("Saving results in %s" % self.finished_dir) - succ = True + self.setup() + + try: + self.__run_tasks() + self.log("Saving results in %s" % self.finished_dir) + succ = True + finally: + self.teardown() finally: - self.teardown() + self.log("Switching to Linux scheduler") + litmus_util.switch_scheduler("Linux") if succ: self.__save_results() self.log("Experiment done!") - def setup(self): + def setup(self): self.log("Writing %d proc entries" % len(self.proc_entries)) map(methodcaller('write_proc'), self.proc_entries) @@ -185,13 +190,13 @@ class Experiment(object): executable.stdout_file = self.exec_out executable.stderr_file = self.exec_err map(set_out, self.executables) - + time.sleep(4) def teardown(self): - self.exec_out.close() - self.exec_err.close() - + self.exec_out and self.exec_out.close() + self.exec_err and self.exec_err.close() + sleep_time = 5 self.log("Sleeping %d seconds to allow buffer flushing" % sleep_time) time.sleep(sleep_time) @@ -199,6 +204,3 @@ class Experiment(object): self.log("Stopping tracers") map(methodcaller('stop_tracing'), self.tracers) - self.log("Switching to Linux scheduler") - litmus_util.switch_scheduler("Linux") - diff --git a/parse/ft.py b/parse/ft.py index 20a430e..127e49f 100644 --- a/parse/ft.py +++ b/parse/ft.py @@ -12,7 +12,7 @@ def get_ft_output(data_dir, out_dir): FT_DATA_NAME = "scheduler=x-ft" output_file = "{}/out-ft".format(out_dir) - + if os.path.isfile(output_file): print("ft-output already exists for %s" % data_dir) return output_file diff --git a/parse/sched.py b/parse/sched.py index b84e16e..300c569 100644 --- a/parse/sched.py +++ b/parse/sched.py @@ -34,7 +34,26 @@ COMPLETION_RECORD = r"(?P" +\ TaskConfig = namedtuple('TaskConfig', ['cpu','wcet','period','type','level']) Task = namedtuple('Task', ['pid', 'config']) +class LeveledArray(object): + """ + Groups statistics by the level of the task to which they apply + """ + def __init__(self, name): + self.name = name + self.vals = defaultdict(lambda:[]) + + def add(self, task, value): + self.vals[task.config.level] += [value] + + def write_measurements(self, result): + for level, arr in self.vals.iteritems(): + name = "%s%s" % ("%s-" % level if level else "", self.name) + result[name] = Measurement(name).from_array(arr) + def get_st_output(data_dir, out_dir): + """ + Create and return files containing unpacked sched data + """ bin_files = conf.FILES['sched_data'].format(".*") bins = [f for f in os.listdir(data_dir) if re.match(bin_files, f)] @@ -70,7 +89,7 @@ def get_tasks(data): (e, match.groupdict(), match.group('RECORD'))) return ret -def get_tasks_dict(data): +def get_task_dict(data): tasks_list = get_tasks(data) tasks_dict = {} for t in tasks_list: @@ -89,17 +108,15 @@ def get_task_exits(data): except: raise Exception("Invalid exit record, parsed:\n\t%s\n\t%s" % (match.groupdict(), m.group('RECORD'))) - + ret += [m] return ret - -def extract_tardy_vals(data, exp_point): - ratios = [] - avg_tards = [] - max_tards = [] - tasks = get_tasks_dict(data) +def extract_tardy_vals(task_dict, data, exp_point): + ratios = LeveledArray("miss-ratio") + avg_tards = LeveledArray("avg-rel-tardiness") + max_tards = LeveledArray("max-rel-tardiness") for match in re.finditer(TARDY_RECORD, data): try: @@ -114,35 +131,40 @@ def extract_tardy_vals(data, exp_point): raise Exception("Invalid tardy record:\n\t%s\n\t%s" % (match.groupdict(), match.group("RECORD"))) - if pid not in tasks: + if pid not in task_dict: raise Exception("Invalid pid '%d' in tardy record:\n\t%s" % match.group("RECORD")) - - t = tasks[pid] - avg_tards += [ total_tard / (jobs * t.config.period) ] - max_tards += [ max_tard / t.config.period ] - ratios += [ misses / jobs ] - - exp_point["avg-rel-tard"] = Measurement().from_array(avg_tards) - exp_point["max-rel-tard"] = Measurement().from_array(max_tards) - exp_point["miss-ratio"] = Measurement().from_array(ratios) - -def extract_variance(data, exp_point): - varz = [] + + t = task_dict[pid] + avg_tards.add(t, total_tard / (jobs * t.config.period)) + max_tards.add(t, max_tard / t.config.period) + ratios.add(t, misses / jobs) + + ratios.write_measurements(exp_point) + avg_tards.write_measurements(exp_point) + max_tards.write_measurements(exp_point) + +def extract_variance(task_dict, data, exp_point): + varz = LeveledArray("exec-variance") completions = defaultdict(lambda: []) + missed = defaultdict(lambda: int()) for match in re.finditer(COMPLETION_RECORD, data): try: pid = int(match.group("PID")) duration = float(match.group("EXEC")) - if not (duration and pid): raise Exception() + # Last (exit) record often has exec time of 0 + missed[pid] += not bool(duration) + + if missed[pid] > 1 or not pid: raise Exception() except: - raise Exception("Invalid completion record:\n\t%s\n\t%s" % - (match.groupdict(), match.group("RECORD"))) + raise Exception("Invalid completion record, missed - %d:" + "\n\t%s\n\t%s" % (missed[pid], match.groupdict(), + match.group("RECORD"))) completions[pid] += [duration] - for (pid, durations) in completions: + for pid, durations in completions.iteritems(): job_times = np.array(durations) # Coefficient of variation @@ -150,32 +172,22 @@ def extract_variance(data, exp_point): # Correction, assuming normal distributions corrected = (1 + 1/(4 * len(job_times))) * cv - varz.append(corrected) - - exp_point['exec-var'] = Measurement().from_array(varz) - -def extract_sched_data(data_file, result): - with open(data_file, 'r') as f: - data = f.read() + varz.add(task_dict[pid], corrected) - extract_tardy_vals(data, result) - extract_variance(data, result) + varz.write_measurements(exp_point) -def config_exit_stats(file): +def config_exit_stats(task_dict, file): with open(file, 'r') as f: data = f.read() - - tasks = get_tasks(data) # Dictionary of task exit measurements by pid exits = get_task_exits(data) - exit_dict = dict((e.id, e) for e in exits) # Dictionary where keys are configurations, values are list # of tasks with those configuratino config_dict = defaultdict(lambda: []) - for t in tasks: + for t in task_dict.itervalues(): config_dict[t.config] += [t] for config in config_dict: @@ -185,7 +197,6 @@ def config_exit_stats(file): if not t.pid in exit_dict: raise Exception("Missing exit record for task '%s' in '%s'" % (t, file)) - exit_list = [exit_dict[t.pid] for t in task_list] config_dict[config] = exit_list @@ -195,20 +206,22 @@ saved_stats = {} def get_base_stats(base_file): if base_file in saved_stats: return saved_stats[base_file] - result = config_exit_stats(base_file) + with open(base_file, 'r') as f: + data = f.read() + result = config_exit_stats(data) saved_stats[base_file] = result return result -def extract_scaling_data(data_file, base_file, result): +def extract_scaling_data(task_dict, data, result, base_file): # Generate trees of tasks with matching configurations - data_stats = config_exit_stats(data_file) + data_stats = config_exit_stats(data) base_stats = get_base_stats(base_file) # Scaling factors are calculated by matching groups of tasks with the same # config, then comparing task-to-task exec times in order of PID within # each group - max_scales = [] - avg_scales = [] + max_scales = LeveledArray("max-scaling") + avg_scales = LeveledArray("avg-scaling") for config in data_stats: if len(data_stats[config]) != len(base_stats[config]): @@ -220,8 +233,22 @@ def extract_scaling_data(data_file, base_file, result): avg_scale = float(base_stat[Type.Avg]) / float(base_stat[Type.Avg]) max_scale = float(base_stat[Type.Max]) / float(base_stat[Type.Max]) - avg_scales += [avg_scale] - max_scales += [max_scale] + task = task_dict[data_stat.id] + + avg_scales.add(task, avg_scale) + max_scales.add(task, max_scale) + + avg_scales.write_measurements(result) + max_scales.write_measurements(result) + +def extract_sched_data(data_file, result, base_file): + with open(data_file, 'r') as f: + data = f.read() + + task_dict = get_task_dict(data) + + extract_tardy_vals(task_dict, data, result) + extract_variance(task_dict, data, result) - result['max-scale'] = Measurement().from_array(max_scales) - result['avg-scale'] = Measurement().from_array(avg_scales) + if (base_file): + extract_scaling_data(task_dict, data, result, base_file) diff --git a/parse/tuple_table.py b/parse/tuple_table.py index 6363b80..e6f0cc5 100644 --- a/parse/tuple_table.py +++ b/parse/tuple_table.py @@ -13,7 +13,7 @@ class ColMap(object): def get_key(self, kv): key = () added = 0 - + for col in self.col_list: if col not in kv: key += (None,) @@ -24,7 +24,7 @@ class ColMap(object): if added < len(kv): raise Exception("column map '%s' missed field in map '%s'" % (self.col_list, kv)) - + return key def __contains__(self, col): @@ -43,7 +43,7 @@ class ColMap(object): def __str__(self): return "%s" % (self.rev_map) - + class TupleTable(object): def __init__(self, col_map): self.col_map = col_map @@ -63,7 +63,7 @@ class TupleTable(object): raise Exception("cannot reduce twice!") self.reduced = True for key, values in self.table.iteritems(): - self.table[key] = SummaryPoint(key, values) + self.table[key] = SummaryPoint(str(key), values) def write_result(self, out_dir): dir_map = DirMap(out_dir) diff --git a/parse_exps.py b/parse_exps.py index c91a654..8f98309 100755 --- a/parse_exps.py +++ b/parse_exps.py @@ -42,18 +42,18 @@ def get_exp_params(data_dir, col_map): # Track all changed params for key in params.keys(): col_map.try_add(key) - + return params def gen_exp_data(exp_dirs, base_conf, col_map): plain_exps = [] scaling_bases = [] - + for data_dir in exp_dirs: if not os.path.isdir(data_dir): raise IOError("Invalid experiment '%s'" % os.path.abspath(data_dir)) - + tmp_dir = data_dir + "/tmp" if not os.path.exists(tmp_dir): os.mkdir(tmp_dir) @@ -85,7 +85,7 @@ def main(): # Configuration key for task systems used to calculate task # execution scaling factors base_conf = dict(re.findall("(.*)=(.*)", opts.scale_against)) - + col_map = ColMap() (plain_exps, scaling_bases) = gen_exp_data(args, base_conf, col_map) @@ -103,7 +103,7 @@ def main(): for exp in plain_exps: result = ExpPoint(exp.name) - + if exp.data_files.ft: # Write overheads into result ft.extract_ft_data(exp.data_files.ft, result, conf.BASE_EVENTS) @@ -115,13 +115,9 @@ def main(): base_params = copy.deepcopy(exp.params) base_params.pop(base_conf.keys()[0]) base = base_table.get_exps(base_params)[0] - if base: - # Write scaling factor (vs base) into result - st.extract_scaling_data(exp.data_files.st, - base.data_files.st, - result) # Write deadline misses / tardiness into result - st.extract_sched_data(exp.data_files.st, result) + st.extract_sched_data(exp.data_files.st, result, + base.data_files.st if base else None) result_table.add_exp(exp.params, result) @@ -129,6 +125,6 @@ def main(): result_table.write_result(opts.out_dir) - + if __name__ == '__main__': main() diff --git a/run_exps.py b/run_exps.py index 825ad5b..8f72adb 100755 --- a/run_exps.py +++ b/run_exps.py @@ -1,5 +1,9 @@ #!/usr/bin/env python from __future__ import print_function +""" +TODO: no -f flag, instead allow individual schedules to be passed in. + -f flag now forced, which removes old data directories +""" import config.config as conf import experiment.litmus_util as lu @@ -39,11 +43,11 @@ def convert_data(data): """Convert a non-python schedule file into the python format""" regex = re.compile( r"(?P^" - r"(?P
/proc/\w+?/)?" - r"(?P[\w\/]+)" + r"(?P
/proc/[\w\-]+?/)?" + r"(?P[\w\-\/]+)" r"\s*{\s*(?P.*?)\s*?}$)|" r"(?P^" - r"(?P\w+?spin)?\s+" + r"(?P\w+?spin)?\s*" r"(?P[\w\-_\d\. ]+)\s*$)", re.S|re.I|re.M) @@ -70,6 +74,7 @@ def fix_paths(schedule, exp_dir): abspath = "%s/%s" % (exp_dir, arg) if os.path.exists(abspath): args = args.replace(arg, abspath) + break schedule['spin'][idx] = (spin, args) @@ -96,7 +101,7 @@ def load_experiment(sched_file, scheduler, duration, param_file, out_base): params = {} kernel = "" - + param_file = param_file or \ "%s/%s" % (dirname, conf.DEFAULTS['params_file']) @@ -181,7 +186,6 @@ def run_exp(name, schedule, scheduler, kernel, duration, work_dir, out_dir): proc_entries, executables) exp.run_exp() - def main(): opts, args = parse_args() @@ -193,14 +197,16 @@ def main(): args = args or [opts.sched_file] + created = False if not os.path.exists(out_base): + created = True os.mkdir(out_base) done = 0 succ = 0 failed = 0 invalid = 0 - + for exp in args: path = "%s/%s" % (os.getcwd(), exp) @@ -223,7 +229,9 @@ def main(): traceback.print_exc() failed += 1 - + if not os.listdir(out_base) and created and not succ: + os.rmdir(out_base) + print("Experiments run:\t%d" % len(args)) print(" Successful:\t\t%d" % succ) print(" Failed:\t\t%d" % failed) -- cgit v1.2.2