From 3f706cd240d039365cbb8f9975ad97480d8b6145 Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Sat, 27 Apr 2013 15:34:13 -0400 Subject: Fixes. --- parse/col_map.py | 5 +++- parse/sched.py | 85 ++++++++++++++++++++++++++++++++------------------------ parse_exps.py | 50 +++++++++++++++++++++------------ plot/style.py | 16 +++++++---- 4 files changed, 95 insertions(+), 61 deletions(-) diff --git a/parse/col_map.py b/parse/col_map.py index 15e1d64..ceb8867 100644 --- a/parse/col_map.py +++ b/parse/col_map.py @@ -11,12 +11,15 @@ class ColMapBuilder(object): return ColMap(col_list, self.value_map) def try_add(self, column, value): - self.value_map[column].add( value ) + self.value_map[column].add( str(value) ) def try_remove(self, column): if column in self.value_map: del(self.value_map[column]) + def __contains__(self, col): + return col in self.value_map + class ColMap(object): def __init__(self, col_list, values = None): self.col_list = col_list diff --git a/parse/sched.py b/parse/sched.py index 5a36da9..6e1fbe6 100644 --- a/parse/sched.py +++ b/parse/sched.py @@ -41,29 +41,23 @@ class TimeTracker: # any task is always skipped self.last_record = None - self.stored_dur = 0 - def store_time(self, next_record): '''End duration of time.''' dur = (self.last_record.when - self.begin) if self.last_record else -1 - dur += self.stored_dur if self.next_job == next_record.job: - self.last_record = next_record - if self.last_record: self.matches += 1 - if self.join_job and self.next_job == self.last_record.job: - self.stored_dur += dur - elif dur > 0: + self.last_record = next_record + + if dur > 0: self.max = max(self.max, dur) self.avg *= float(self.num / (self.num + 1)) self.num += 1 self.avg += dur / float(self.num) self.begin = 0 - self.stored_dur = 0 self.next_job = 0 else: self.disjoints += 1 @@ -81,7 +75,6 @@ class TimeTracker: class LeveledArray(object): """Groups statistics by the level of the task to which they apply""" def __init__(self): - self.name = name self.vals = defaultdict(lambda: defaultdict(lambda:[])) def add(self, name, level, value): @@ -92,12 +85,12 @@ class LeveledArray(object): def write_measurements(self, result): for stat_name, stat_data in self.vals.iteritems(): for level, values in stat_data.iteritems(): - if not values or not sum(values): - log_once(SKIP_MSG, SKIP_MSG % stat_name) - continue + # if not values or not sum(values): + # log_once(SKIP_MSG, SKIP_MSG % stat_name) + # continue - name = "%s%s" % ("%s-" % level if level else "", stat_name) - result[name] = Measurement(name).from_array(arr) + name = "%s%s" % ("%s-" % level.capitalize() if level else "", stat_name) + result[name] = Measurement(name).from_array(values) # Map of event ids to corresponding class and format record_map = {} @@ -201,8 +194,9 @@ class ParamRecord(SchedRecord): ('class', c_uint8), ('level', c_uint8)] def process(self, task_dict): + level = chr(97 + self.level) params = TaskParams(self.wcet, self.period, - self.partition, self.level) + self.partition, level) task_dict[self.pid].params = params class ReleaseRecord(SchedRecord): @@ -214,11 +208,13 @@ class ReleaseRecord(SchedRecord): if data.params: data.misses.start_time(self, self.when + data.params.period) +NSEC_PER_USEC = 1000 class CompletionRecord(SchedRecord): - FIELDS = [('when', c_uint64)] + FIELDS = [('when', c_uint64), ('load', c_uint64)] def process(self, task_dict): task_dict[self.pid].misses.store_time(self) + task_dict[self.pid].loads += [float(self.load) / NSEC_PER_USEC ] class BlockRecord(SchedRecord): FIELDS = [('when', c_uint64)] @@ -232,9 +228,24 @@ class ResumeRecord(SchedRecord): def process(self, task_dict): task_dict[self.pid].blocks.store_time(self) +class SwitchToRecord(SchedRecord): + FIELDS = [('when', c_uint64)] + + def process(self, task_dict): + task_dict[self.pid].execs.start_time(self) + +class SwitchAwayRecord(SchedRecord): + FIELDS = [('when', c_uint64)] + + def process(self, task_dict): + task_dict[self.pid].execs.store_time(self) + + # Map records to sched_trace ids (see include/litmus/sched_trace.h register_record(2, ParamRecord) register_record(3, ReleaseRecord) +register_record(5, SwitchToRecord) +register_record(6, SwitchAwayRecord) register_record(7, CompletionRecord) register_record(8, BlockRecord) register_record(9, ResumeRecord) @@ -250,7 +261,8 @@ def create_task_dict(data_dir, work_dir = None): output_file = "%s/out-st" % work_dir task_dict = defaultdict(lambda : - TaskData(None, 1, TimeTracker(), TimeTracker())) + TaskData(None, 1, [], TimeTracker(), + TimeTracker(), TimeTracker(True))) bin_names = [f for f in os.listdir(data_dir) if re.match(bin_files, f)] if not len(bin_names): @@ -281,11 +293,11 @@ def extract_sched_data(result, data_dir, work_dir): # Currently unknown where these invalid tasks come from... continue - level = tdata.config.level + level = tdata.params.level miss = tdata.misses record_loss = float(miss.disjoints)/(miss.matches + miss.disjoints) - stat_data("record-loss", level, record_loss) + stat_data.add("record-loss", level, record_loss) if record_loss > conf.MAX_RECORD_LOSS: log_once(LOSS_MSG) @@ -294,26 +306,27 @@ def extract_sched_data(result, data_dir, work_dir): miss_ratio = float(miss.num) / miss.matches avg_tard = miss.avg * miss_ratio - stat_data("miss-ratio", level, miss_ratio) + stat_data.add("miss-ratio", level, miss_ratio) - stat_data("max-tard", level, miss.max / tdata.params.period) - stat_data("avg-tard", level, avg_tard / tdata.params.period) + stat_data.add("max-tard", level, miss.max / tdata.params.period) + stat_data.add("avg-tard", level, avg_tard / tdata.params.period) - stat_data("avg-block", level, tdata.blocks.avg / NSEC_PER_MSEC) - stat_data("max-block", level, tdata.blocks.max / NSEC_PER_MSEC) + stat_data.add("avg-block", level, tdata.blocks.avg / NSEC_PER_MSEC) + stat_data.add("max-block", level, tdata.blocks.max / NSEC_PER_MSEC) - stat_data.write_measurements(result) + if tdata.params.level == 'b': + stat_data.add('LOAD', tdata.params.level, tdata.loads) -def extract_mc_data(result, data_dir, base_dir): - task_dict = get_task_data(data_dir) - base_dict = get_task_data(base_dir) + stat_data.write_measurements(result) - stat_data = LeveledArray() +def extract_scaling_data(result, data_dir, base_dir): + log_once("Scaling factor extraction currently broken, disabled.") + return - # Only level B loads are measured - for tdata in filter(task_dict.iteritems(), lambda x: x.level == 'b'): - stat_data.add('load', tdata.config.level, tdata.loads) + task_dict = create_task_dict(data_dir) + base_dict = create_task_dict(base_dir) + stat_data = LeveledArray() tasks_by_config = defaultdict(lambda: ScaleData([], [])) # Add task execution times in order of pid to tasks_by_config @@ -324,11 +337,11 @@ def extract_mc_data(result, data_dir, base_dir): for pid in sorted(tasks.keys()): tdata = tasks[pid] - tlist = getattr(tasks_by_config[tdata.params], field) - tlist += [tdata.execs] + tlist = getattr(tasks_by_config[tdata.params], field) + tlist += [tdata.execs] # Write scaling factors - for config, scale_data in tasks_by_config: + for config, scale_data in tasks_by_config.iteritems(): if len(scale_data.reg_tasks) != len(scale_data.base_tasks): # Can't make comparison if different numbers of tasks! continue diff --git a/parse_exps.py b/parse_exps.py index 7a99d8a..98f95df 100755 --- a/parse_exps.py +++ b/parse_exps.py @@ -52,7 +52,7 @@ ExpData = namedtuple('ExpData', ['path', 'params', 'work_dir']) def parse_exp(exp_force_base): # Tupled for multiprocessing - exp, force, base_table = exp_force_base + exp, force, base_exp = exp_force_base result_file = exp.work_dir + "/exp_point.pkl" should_load = not force and os.path.exists(result_file) @@ -81,11 +81,9 @@ def parse_exp(exp_force_base): # Write scheduling statistics into result st.extract_sched_data(result, exp.path, exp.work_dir) - # Write scaling factors into result - if base_table and exp.params in base_table: - base_exp = base_table[exp.params] - if base_exp != exp: - st.extract_mc_data(result, exp.path, base_exp.path) + if base_exp: + # Write scaling factors into result + st.extract_scaling_data(result, exp.path, base_exp.path) with open(result_file, 'wb') as f: pickle.dump(result, f) @@ -138,18 +136,22 @@ def load_exps(exp_dirs, cm_builder, force): return exps -def make_base_table(cmd_scale, col_map, exps): +def make_base_table(cmd_scale, builder, exps): if not cmd_scale: return None # Configuration key for task systems used to calculate task # execution scaling factors - [(param, value)] = dict(re.findall("(.*)=(.*)", cmd_scale)) + [(param, value)] = re.findall("(.*)=(.*)", cmd_scale) - if param not in col_map: - raise IOError("Base column '%s' not present in any parameters!" % param) - base_table = TupleTable(copy.deepcopy(col_map)) + if param not in builder: + com.log_once("Base column '%s' not present in any parameters!" % param) + com.log_once("Scaling factors will not be included.") + + builder.try_remove(param) + base_map = builder.build() + base_table = TupleTable(base_map) # Fill table with exps who we will scale against for exp in exps: @@ -171,23 +173,35 @@ def get_dirs(args): return [os.getcwd()] -def fill_table(table, exps, opts): - sys.stderr.write("Parsing data...\n") +def get_bases(builder, exps, opts): + '''Return a matching array of experiments against which scaling factors + will be calculated.''' + bases = [None]*len(exps) + + base_table = make_base_table(opts.scale_against, builder, exps) + if not base_table: + return bases + + for i,exp in enumerate(exps): + if exp.params in base_table and base_table[exp.params] != exp: + bases[i] = base_table[exp.params] + return bases + + +def fill_table(table, builder, exps, opts): procs = min(len(exps), opts.processors) logged = multiprocessing.Manager().list() + bases = get_bases(builder, exps, opts) sys.stderr.write("Parsing data...\n") - base_table = make_base_table(opts.scale_against, - table.get_col_map(), exps) - pool = multiprocessing.Pool(processes=procs, # Share a list of previously logged messages amongst processes # This is for the com.log_once method to use initializer=com.set_logged_list, initargs=(logged,)) - pool_args = zip(exps, [opts.force]*len(exps), [base_table]*len(exps)) + pool_args = zip(exps, [opts.force]*len(exps), bases) enum = pool.imap_unordered(parse_exp, pool_args, 1) try: @@ -259,7 +273,7 @@ def main(): col_map = builder.build() table = TupleTable(col_map) - fill_table(table, exps, opts) + fill_table(table, builder, exps, opts) if not table: sys.stderr.write("Found no data to parse!") diff --git a/plot/style.py b/plot/style.py index 4e2057f..5c2d661 100644 --- a/plot/style.py +++ b/plot/style.py @@ -1,7 +1,7 @@ from collections import namedtuple import matplotlib.pyplot as plot -class Style(namedtuple('SS', ['marker', 'line', 'color'])): +class Style(namedtuple('SS', ['marker', 'color', 'line'])): def fmt(self): return self.marker + self.line + self.color @@ -24,11 +24,12 @@ class StyleMap(object): t = float if float(value) % 1.0 else int except: t = bool if value in ['True','False'] else str - return StyleMap.ORDER.index(t) - col_list = sorted(col_list, key=type_priority) + # return StyleMap.ORDER.index(t) + return len(col_values[column]) + col_list = sorted(col_list, key=type_priority, reverse=True) # TODO: undo this, switch to popping mechanism - for field, values in reversed([x for x in self.__get_all()._asdict().iteritems()]): + for field, values in [x for x in self.__get_all()._asdict().iteritems()]: if not col_list: break @@ -36,7 +37,10 @@ class StyleMap(object): value_dict = {} for value in sorted(col_values[next_column]): - value_dict[value] = values.pop(0) + try: + value_dict[value] = values.pop(0) + except Exception as e: + raise e self.value_map[next_column] = value_dict self.field_map[next_column] = field @@ -44,7 +48,7 @@ class StyleMap(object): def __get_all(self): '''A Style holding all possible values for each property.''' return Style(marker=list('.,ov^<>1234sp*hH+xDd|_'), - line=['-', ':', '--'], + line=['-', ':', '--', '_'], color=list('bgrcmyk')) def get_style(self, kv): -- cgit v1.2.2