From a0e4b9fe9d7fab9a50a626cfeda3c614a9a6af5d Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Wed, 17 Apr 2013 10:43:53 -0400 Subject: Created infrastructure for calculating scaling factors. --- gen/__init__.py | 2 +- gen/generator.py | 2 +- gen/mc_generators.py | 7 +-- parse/sched.py | 133 +++++++++++++++++++++++++++++++++++++++++---------- parse_exps.py | 39 +++++++++++++-- 5 files changed, 148 insertions(+), 35 deletions(-) diff --git a/gen/__init__.py b/gen/__init__.py index 803bb37..dbc7b12 100644 --- a/gen/__init__.py +++ b/gen/__init__.py @@ -6,4 +6,4 @@ gen.register_generator("G-EDF", edf.GedfGenerator) gen.register_generator("P-EDF", edf.PedfGenerator) gen.register_generator("C-EDF", edf.CedfGenerator) gen.register_generator("MC", mc.McGenerator) -gen.register_generator("Color-MC", mc.ColorMcGenerator) +gen.register_generator("COLOR-MC", mc.ColorMcGenerator) diff --git a/gen/generator.py b/gen/generator.py index 1205490..693e52f 100644 --- a/gen/generator.py +++ b/gen/generator.py @@ -77,7 +77,7 @@ class Generator(object): def __make_options(self, params): '''Return generic Litmus options.''' - return [GenOption('num_tasks', int, + return [GenOption('tasks', int, range(self.cpus, 5*self.cpus, self.cpus), 'Number of tasks per experiment.'), GenOption('cpus', int, [self.cpus], diff --git a/gen/mc_generators.py b/gen/mc_generators.py index bbb1ab9..704bcc3 100644 --- a/gen/mc_generators.py +++ b/gen/mc_generators.py @@ -256,7 +256,7 @@ COLOR_TYPES = ['scheduling', 'locking', 'unmanaged'] class ColorMcGenerator(McGenerator): def __init__(self, params = {}): - super(ColorMcGenerator, self).__init__("COLOR-MC", + super(ColorMcGenerator, self).__init__("MC", templates=[TP_TYPE, TP_CHUNK, TP_COLOR_B, TP_COLOR_C], options=self.__make_options(), params=self.__extend_params(params)) @@ -313,10 +313,7 @@ class ColorMcGenerator(McGenerator): 'System page size.'), GenOption('wss', [float, int], [.5], 'Task working set sizes. Can be expressed as a fraction ' + - 'of the cache.'), - GenOption('align_unmanaged', [True, False], [True], - 'Place all working sets of unmanaged task systems in '+ - 'the same location, for maximum interference.')] + 'of the cache.')] def __get_wss_pages(self, params): diff --git a/parse/sched.py b/parse/sched.py index 1213f0d..1f07751 100644 --- a/parse/sched.py +++ b/parse/sched.py @@ -2,8 +2,8 @@ import config.config as conf import os import re import struct -import sys import subprocess +import sys from collections import defaultdict,namedtuple from common import recordtype @@ -33,8 +33,9 @@ class TimeTracker: self.job = record.job # Data stored for each task -TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu']) -TaskData = recordtype('TaskData', ['params', 'jobs', 'blocks', 'misses']) +TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu', 'level']) +TaskData = recordtype('TaskData', ['params', 'jobs', 'loads', + 'blocks', 'misses', 'execs']) # Map of event ids to corresponding class, binary format, and processing methods RecordInfo = namedtuple('RecordInfo', ['clazz', 'fmt', 'method']) @@ -124,6 +125,7 @@ def read_data(task_dict, fnames): def process_completion(task_dict, record): task_dict[record.pid].misses.store_time(record) + task_dict[record.pid].loads += [record.load] def process_release(task_dict, record): data = task_dict[record.pid] @@ -131,7 +133,9 @@ def process_release(task_dict, record): data.misses.start_time(record) def process_param(task_dict, record): - params = TaskParams(record.wcet, record.period, record.partition) + level = chr(97 + record.level) + params = TaskParams(record.wcet, record.period, + record.partition, level) task_dict[record.pid].params = params def process_block(task_dict, record): @@ -140,14 +144,27 @@ def process_block(task_dict, record): def process_resume(task_dict, record): task_dict[record.pid].blocks.store_time(record) +def process_switch_to(task_dict, record): + task_dict[record.pid].execs.start_time(record) + +def process_switch_away(task_dict, record): + task_dict[record.pid].execs.store_time(record) + register_record('ResumeRecord', 9, process_resume, 'Q8x', ['when']) register_record('BlockRecord', 8, process_block, 'Q8x', ['when']) -register_record('CompletionRecord', 7, process_completion, 'Q8x', ['when']) +register_record('CompletionRecord', 7, process_completion, 'QQ', ['when', 'load']) register_record('ReleaseRecord', 3, process_release, 'QQ', ['release', 'when']) -register_record('ParamRecord', 2, process_param, 'IIIcc2x', - ['wcet','period','phase','partition', 'task_class']) +register_record('SwitchToRecord', 5, process_switch_to, 'Q8x', ['when']) +register_record('SwitchAwayRecord', 6, process_switch_away, 'Q8x', ['when']) +register_record('ParamRecord', 2, process_param, 'IIIcccx', + ['wcet','period','phase','partition', 'task_class', 'level']) + +saved_stats = [] +def get_task_data(data_dir, work_dir = None): + '''Parse sched trace files''' + if data_dir in saved_stats: + return data_dir -def extract_sched_data(result, data_dir, work_dir): bin_files = conf.FILES['sched_data'].format(".*") output_file = "%s/out-st" % work_dir @@ -157,24 +174,46 @@ def extract_sched_data(result, data_dir, work_dir): # Save an in-english version of the data for debugging # This is optional and will only be done if 'st_show' is in PATH - if conf.BINS['st_show']: + if work_dir and conf.BINS['st_show']: cmd_arr = [conf.BINS['st_show']] cmd_arr.extend(bins) with open(output_file, "w") as f: print("calling %s" % cmd_arr) subprocess.call(cmd_arr, cwd=data_dir, stdout=f) - task_dict = defaultdict(lambda : - TaskData(0, 0, TimeTracker(), TimeTracker())) + task_dict = defaultdict(lambda :TaskData(0, 0, 0, [], TimeTracker(), + TimeTracker(), TimeTracker())) # Gather per-task values read_data(task_dict, bins) - stat_data = {"avg-tard" : [], "max-tard" : [], - "avg-block" : [], "max-block" : [], - "miss-ratio" : []} + saved_stats[data_dir] = task_dict + return task_dict + +class LeveledArray(object): + """Groups statistics by the level of the task to which they apply""" + def __init__(self): + self.name = name + self.vals = defaultdict(lambda: defaultdict(lambda:[])) + + def add(self, name, level, value): + if type(value) != type([]): + value = [value] + self.vals[name][task.config.level] += value - # Group per-task values + def write_measurements(self, result): + for stat_name, stat_data in self.vals.iteritems(): + for level, values in stat_data.iteritems(): + if not values: + continue + + name = "%s%s" % ("%s-" % level if level else "", stat_name) + result[name] = Measurement(name).from_array(arr) + +def extract_sched_data(result, data_dir, work_dir): + task_dict = get_task_data(data_dir, work_dir) + + stat_data = LeveledArray() for tdata in task_dict.itervalues(): if not tdata.params: # Currently unknown where these invalid tasks come from... @@ -184,15 +223,59 @@ def extract_sched_data(result, data_dir, work_dir): # Scale average down to account for jobs with 0 tardiness avg_tard = tdata.misses.avg * miss_ratio - stat_data["miss-ratio"].append(miss_ratio) - stat_data["avg-tard" ].append(avg_tard / tdata.params.wcet) - stat_data["max-tard" ].append(tdata.misses.max / tdata.params.wcet) - stat_data["avg-block" ].append(tdata.blocks.avg / NSEC_PER_MSEC) - stat_data["max-block" ].append(tdata.blocks.max / NSEC_PER_MSEC) - - # Summarize value groups - for name, data in stat_data.iteritems(): - if not data: + level = tdata.params.level + stat_data.add("miss-ratio", level, miss_ratio) + stat_data.add("avg-tard", level, avg_tard / tdata.params.wcet) + stat_data.add("max-tard", level, tdata.misses.max / tdata.params.wcet) + stat_data.add("avg-block", level, tdata.blocks.avg / NSEC_PER_MSEC) + stat_data.add("max-block", level, tdata.blocks.max / NSEC_PER_MSEC) + + stat_data.write_measurements(result) + +ScaleData = namedtuple('ScaleData', ['reg_tasks', 'base_tasks']) +def extract_mc_data(result, data_dir, base_dir): + task_dict = get_task_data(data_dir) + base_dict = get_task_data(base_dir) + + stat_data = LeveledArray() + + # Only level B loads are measured + for tdata in filter(task_dict.iteritems(), lambda x: x.level == 'b'): + stat_data.add('load', tdata.config.level, tdata.loads) + + tasks_by_config = defaultdict(lambda: ScaleData([], [])) + + # Add tasks in order of pid to tasks_by_config + # Tasks must be ordered by pid or we can't make 1 to 1 comparisons + # when multiple tasks have the same config in each task set + for tasks, field in ((task_dict, 'reg_tasks'), (base_dict, 'base_tasks')): + for pid in sorted(tasks.keys()): + tdata = tasks[pid] + tlist = getattr(tasks_by_config[tdata.params], field) + tlist += [tdata.execs] + + # Write scaling factors + for config, scale_data in tasks_by_config: + if len(scale_data.reg_tasks) != len(scale_data.base_tasks): + # Can't make comparison if different numbers of tasks! continue - result[name] = Measurement(str(name)).from_array(data) + all_pairs = zip(scale_data.reg_tasks, scale_data.base_tasks) + for reg_execs, base_execs in all_pairs: + if not reg_execs.max or not reg_execs.avg or\ + not base_execs.max or not base_execs.avg: + # This was an issue at some point, not sure if it still is + continue + + max_scale = float(base_execs.max) / reg_execs.max + avg_scale = float(base_execs.avg) / reg_execs.avg + + if (avg_scale < 1 or max_scale < 1) and config.level == "b": + sys.stderr.write("Task in {} with config {} has <1.0 scale!" + .format(data_dir, config) + continue + + stat_data.add('max-scale', config.level, max_scale) + stat_data.add('avg-scale', config.level, avg_scale) + + stat_data.write_measurements(result) diff --git a/parse_exps.py b/parse_exps.py index c254536..9475cfc 100755 --- a/parse_exps.py +++ b/parse_exps.py @@ -2,10 +2,12 @@ from __future__ import print_function import config.config as conf +import copy import os import parse.ft as ft import parse.sched as st import pickle +import re import shutil as sh import sys import traceback @@ -35,6 +37,9 @@ def parse_args(): parser.add_option('-p', '--processors', default=max(cpu_count() - 1, 1), type='int', dest='processors', help='number of threads for processing') + parser.add_option('-s', '--scale-against', dest='scale_against', + metavar='PARAM=VALUE', default="type=unmanaged", + help='calculate task scaling factors against these configs') return parser.parse_args() @@ -82,9 +87,9 @@ def load_exps(exp_dirs, cm_builder, force): return exps -def parse_exp(exp_force): +def parse_exp(exp_force_base): # Tupled for multiprocessing - exp, force = exp_force + exp, force, base_table = exp_force_base result_file = exp.work_dir + "/exp_point.pkl" should_load = not force and os.path.exists(result_file) @@ -109,6 +114,11 @@ def parse_exp(exp_force): # Write scheduling statistics into result st.extract_sched_data(result, exp.path, exp.work_dir) + if base_table and exp.params in base_table: + base_exp = base_table[exp.params] + if base_exp != exp: + st.extract_scaling_data(result, exp.path, base_exp.path) + with open(result_file, 'wb') as f: pickle.dump(result, f) except: @@ -116,6 +126,27 @@ def parse_exp(exp_force): return (exp, result) +def make_base_table(cmd_scale, col_map, exps): + if not cmd_scale: + return None + + # Configuration key for task systems used to calculate task + # execution scaling factors + [(param, value)] = dict(re.findall("(.*)=(.*)", cmd_scale)) + + if param not in col_map: + raise IOError("Base column '%s' not present in any parameters!" % param) + + base_map = copy.deepcopy(col_map) + base_table = TupleTable(base_map) + + # Fill table with exps who we will scale against + for exp in exps: + if exp.params[param] == value: + base_table[exp.params] = exp + + return base_table + def main(): opts, args = parse_args() @@ -135,11 +166,13 @@ def main(): col_map = builder.build() result_table = TupleTable(col_map) + base_table = make_base_table(opts.scale_against, col_map, exps) + sys.stderr.write("Parsing data...\n") procs = min(len(exps), opts.processors) pool = Pool(processes=procs) - pool_args = zip(exps, [opts.force]*len(exps)) + pool_args = zip(exps, [opts.force]*len(exps), [base_table]*len(exps)) enum = pool.imap_unordered(parse_exp, pool_args, 1) try: -- cgit v1.2.2