From cb8db5d30ee769304c2c2b00f2a7d9bcb3c4098f Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Mon, 26 Nov 2012 16:02:48 -0500 Subject: Removed 2-step parse for scheduling statistics. --- parse/ft.py | 47 +++--- parse/point.py | 4 +- parse/sched.py | 445 +++++++++++++++++++++------------------------------------ 3 files changed, 186 insertions(+), 310 deletions(-) (limited to 'parse') diff --git a/parse/ft.py b/parse/ft.py index c5f1522..fea246a 100644 --- a/parse/ft.py +++ b/parse/ft.py @@ -11,27 +11,8 @@ FT_SPLIT_NAME = "overhead={}.bin" FT_SORTED_NAME = "sorted-ft.bin" FT_ERR_NAME = "err-ft" -def extract_ft_data(result, data_dir, cycles, tmp_dir): - freg = conf.FILES['ft_data'] + "$" - bins = [f for f in os.listdir(data_dir) if re.match(freg, f)] - - if not len(bins): - return False - - bin_file = "{}/{}".format(data_dir, bins[0]) - - with open("%s/%s" % (tmp_dir, FT_ERR_NAME), 'w') as err_file: - sorted_bin = sort_ft(bin_file, err_file, tmp_dir) - - for event in conf.BASE_EVENTS: - parse_overhead(result, sorted_bin, event, cycles, - tmp_dir, err_file) - - os.remove(sorted_bin) - - return True - def parse_overhead(result, overhead_bin, overhead, cycles, out_dir, err_file): + '''Store statistics for @overhead in @overhead_bin into @result.''' ovh_fname = "{}/{}".format(out_dir, FT_SPLIT_NAME).format(overhead) if os.path.exists(ovh_fname): @@ -39,7 +20,7 @@ def parse_overhead(result, overhead_bin, overhead, cycles, out_dir, err_file): ovh_file = open(ovh_fname, 'w') # Extract matching overhead events into a seperate file - cmd = [conf.BINS["split"], "-r", "-b", overhead, overhead_bin] + cmd = [conf.BINS["ftsplit"], "-r", "-b", overhead, overhead_bin] ret = subprocess.call(cmd, cwd=out_dir, stderr=err_file, stdout=ovh_file) size = os.stat(ovh_fname).st_size @@ -65,9 +46,7 @@ def parse_overhead(result, overhead_bin, overhead, cycles, out_dir, err_file): os.remove(ovh_fname) def sort_ft(ft_file, err_file, out_dir): - """ - Create and return file with sorted overheads from @ft_file. - """ + '''Create and return file with sorted overheads from @ft_file.''' out_fname = "{}/{}".format(out_dir, FT_SORTED_NAME) # Sort happens in-place @@ -79,3 +58,23 @@ def sort_ft(ft_file, err_file, out_dir): raise Exception("Sort failed with command: %s" % " ".join(cmd)) return out_fname + +def extract_ft_data(result, data_dir, work_dir, cycles): + freg = conf.FILES['ft_data'] + "$" + bins = [f for f in os.listdir(data_dir) if re.match(freg, f)] + + if not len(bins): + return False + + bin_file = "{}/{}".format(data_dir, bins[0]) + + with open("%s/%s" % (work_dir, FT_ERR_NAME), 'w') as err_file: + sorted_bin = sort_ft(bin_file, err_file, work_dir) + + for event in conf.BASE_EVENTS: + parse_overhead(result, sorted_bin, event, cycles, + work_dir, err_file) + + os.remove(sorted_bin) + + return True diff --git a/parse/point.py b/parse/point.py index d5f4a5e..8e27869 100644 --- a/parse/point.py +++ b/parse/point.py @@ -1,6 +1,6 @@ -""" +''' Too much duplicate code in this file -""" +''' import copy import numpy as np diff --git a/parse/sched.py b/parse/sched.py index 3e30880..ffc6224 100644 --- a/parse/sched.py +++ b/parse/sched.py @@ -1,306 +1,183 @@ -""" -TODO: No longer very pythonic, lot of duplicate code -print out task execution times or something -get miss ratio and tardiness directly from schedule OR -email list about turning on optional summary statistics OR -set up run exps to only get release and completions to get these stats -""" - import config.config as conf import os import re -import numpy as np +import struct import subprocess -from collections import namedtuple,defaultdict -from operator import methodcaller -from point import Measurement,Type - -PARAM_RECORD = r"(?P" +\ - r"PARAM *?(?P\d+)\/.*?" +\ - r"cost.*?(?P[\d\.]+)ms.*?" +\ - r"period.*?(?P[\d.]+)ms.*?" +\ - r"part.*?(?P\d+)[, ]*" +\ - r"(?:class=(?P\w+))?[, ]*" +\ - r"(?:level=(?P\w+))?).*$" -EXIT_RECORD = r"(?P" +\ - r"TASK_EXIT *?(?P\d+)/.*?" +\ - r"Avg.*?(?P\d+).*?" +\ - r"Max.*?(?P\d+))" -TARDY_RECORD = r"(?P" +\ - r"TASK_TARDY.*?(?P\d+)/(?P\d+).*?" +\ - r"Tot.*?(?P[\d\.]+).*?ms.*?" +\ - r"(?P[\d\.]+).*?ms.*?" +\ - r"(?P[\d\.]+))" -COMPLETION_RECORD = r"(?P" +\ - r"COMPLETION.*?(?P\d+)/.*?" +\ - r"exec:.*?(?P[\d\.]+)ms.*?" +\ - r"flush:.*?(?P[\d\.]+)ms.*?" +\ - r"flush_work:.*?(?P[\d]+).*?" +\ - r"load:.*?(?P[\d\.]+)ms.*?" +\ - r"load_work:.*?(?P[\d]+))" - -TaskConfig = namedtuple('TaskConfig', ['cpu','wcet','period','type','level']) -Task = namedtuple('Task', ['pid', 'config', 'run']) - -class LeveledArray(object): - """ - Groups statistics by the level of the task to which they apply - """ - def __init__(self, name): - self.name = name - self.vals = defaultdict(lambda:[]) - - def add(self, task, value): - self.vals[task.config.level] += [value] - - - def write_measurements(self, result): - for level, arr in self.vals.iteritems(): - name = "%s%s" % ("%s-" % level if level else "", self.name) - result[name] = Measurement(name).from_array(arr) - -def get_st_output(data_dir, out_dir, force=False): - """ - Create and return files containing unpacked sched data - """ - bin_files = conf.FILES['sched_data'].format(".*") - bins = [f for f in os.listdir(data_dir) if re.match(bin_files, f)] +from collections import defaultdict,namedtuple +from common import recordtype +from point import Measurement - output_file = "%s/out-st" % out_dir - - if os.path.isfile(output_file): - if force: - os.remove(output_file) - else: - return output_file - - if len(bins) != 0: - cmd_arr = [conf.BINS['st_show']] - cmd_arr.extend(bins) - with open(output_file, "w") as f: - subprocess.call(cmd_arr, cwd=data_dir, stdout=f) - else: - return None - return output_file - -def get_tasks(data): - ret = [] - for match in re.finditer(PARAM_RECORD, data, re.M): - try: - t = Task( int(match.group('PID')), - TaskConfig( int(match.group('CPU')), - float(match.group('WCET')), - float(match.group('PERIOD')), - match.group("CLASS"), - match.group("LEVEL")), []) - if not (t.config.period and t.pid): - raise Exception() - ret += [t] - except Exception as e: - raise Exception("Invalid task record: %s\nparsed:\n\t%s\n\t%s" % - (e, match.groupdict(), match.group('RECORD'))) - return ret - -def get_task_dict(data): - tasks_list = get_tasks(data) - tasks_dict = {} - for t in tasks_list: - tasks_dict[t.pid] = t - return tasks_dict - -def get_task_exits(data): - ret = [] - for match in re.finditer(EXIT_RECORD, data): - try: - m = Measurement( int(match.group('PID')), - {Type.Max : float(match.group('MAX')), - Type.Avg : float(match.group('AVG'))}) - except: - raise Exception("Invalid exit record, parsed:\n\t%s\n\t%s" % - (match.groupdict(), match.group('RECORD'))) +class TimeTracker: + '''Store stats for durations of time demarcated by sched_trace records.''' + def __init__(self): + self.begin = self.avg = self.max = self.num = self.job = 0 - ret += [m] - return ret + def store_time(self, record): + '''End duration of time.''' + dur = record.when - self.begin + if self.job == record.job and dur > 0: + self.max = max(self.max, dur) + self.avg *= float(self.num / (self.num + 1)) + self.num += 1 + self.avg += dur / float(self.num) -def extract_tardy_vals(task_dict, data, exp_point): - ratios = LeveledArray("miss-ratio") - avg_tards = LeveledArray("avg-rel-tardiness") - max_tards = LeveledArray("max-rel-tardiness") + self.begin = 0 + self.job = 0 + + def start_time(self, record): + '''Start duration of time.''' + self.begin = record.when + self.job = record.job + +# Data stored for each task +TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu']) +TaskData = recordtype('TaskData', ['params', 'jobs', 'blocks', 'misses']) + +# Map of event ids to corresponding class, binary format, and processing methods +RecordInfo = namedtuple('RecordInfo', ['clazz', 'fmt', 'method']) +record_map = [0]*10 + +# Common to all records +HEADER_FORMAT = ' 1 or not pid: #TODO: fix, raise Exception() - continue - except: - raise Exception("Invalid completion record, missed: %d:" - "\n\t%s\n\t%s" % (missed[pid], match.groupdict(), - match.group("RECORD"))) - completions[pid] += [duration] - - for pid, durations in completions.iteritems(): - m = Measurement(pid).from_array(durations) - - # TODO: not this, please - if not task_dict[pid].run: - task_dict[pid].run.append(m) - - job_times = np.array(durations) - mean = job_times.mean() - - if not mean or not durations: + type_num = struct.unpack_from('b',data)[0] + except struct.error: + break + + rdata = record_map[type_num] if type_num <= max_type else 0 + if not rdata: continue - # Coefficient of variation - cv = job_times.std() / job_times.mean() - # Correction, assuming normal distributions - corrected = (1 + 1/(4 * len(job_times))) * cv - - varz.add(task_dict[pid], corrected) - # varz.add(task_dict[pid], m[Type.Var]) - - if exp_point: - map(methodcaller('write_measurements', exp_point), - [varz, flushes, loads, fworks, lworks]) - -def config_exit_stats(task_dict, data): - # # Dictionary of task exit measurements by pid - # exits = get_task_exits(data) - # exit_dict = dict((e.id, e) for e in exits) - extract_variance(task_dict, data, None) - - # Dictionary where keys are configurations, values are list - # of tasks with those configuratino - config_dict = defaultdict(lambda: []) - for t in task_dict.itervalues(): - config_dict[t.config] += [t] - - for config in config_dict: - task_list = sorted(config_dict[config]) - - # # Replace tasks with corresponding exit stats - # if not t.pid in exit_dict: - # raise Exception("Missing exit record for task '%s' in '%s'" % - # (t, file.name)) - # exit_list = [exit_dict[t.pid] for t in task_list] - exit_list = [t.run[0] for t in task_list] - config_dict[config] = exit_list - - return config_dict - -saved_stats = {} -def get_base_stats(base_file): - if base_file in saved_stats: - return saved_stats[base_file] - with open(base_file, 'r') as f: - data = f.read() - task_dict = get_task_dict(data) - - result = config_exit_stats(task_dict, data) - saved_stats[base_file] = result - return result - -def extract_scaling_data(task_dict, data, result, base_file): - # Generate trees of tasks with matching configurations - data_stats = config_exit_stats(task_dict, data) - base_stats = get_base_stats(base_file) - - # Scaling factors are calculated by matching groups of tasks with the same - # config, then comparing task-to-task exec times in order of PID within - # each group - max_scales = LeveledArray("max-scaling") - avg_scales = LeveledArray("avg-scaling") - - for config in data_stats: - if len(data_stats[config]) != len(base_stats[config]): - # Quit, we are missing a record and can't guarantee - # a task-to-task comparison + try: + values = struct.unpack_from(rdata.fmt, data) + except struct.error: continue - for data_stat, base_stat in zip(data_stats[config],base_stats[config]): - if not base_stat[Type.Avg] or not base_stat[Type.Max] or \ - not data_stat[Type.Avg] or not data_stat[Type.Max]: - continue - # How much larger is their exec stat than ours? - avg_scale = float(base_stat[Type.Avg]) / float(data_stat[Type.Avg]) - max_scale = float(base_stat[Type.Max]) / float(data_stat[Type.Max]) + obj = rdata.clazz(*values) + yield (obj, rdata.method) + +def read_data(task_dict, fnames): + '''Read records from @fnames and store per-pid stats in @task_dict.''' + buff = [] + + def add_record(itera): + # Ordered insertion into buff + try: + next_ret = itera.next() + except StopIteration: + return - task = task_dict[data_stat.id] + arecord, method = next_ret + i = 0 + for (i, (brecord, m, t)) in enumerate(buff): + if brecord.when > arecord.when: + break + buff.insert(i, (arecord, method, itera)) - avg_scales.add(task, avg_scale) - max_scales.add(task, max_scale) + for fname in fnames: + itera = make_iterator(fname) + add_record(itera) - avg_scales.write_measurements(result) - max_scales.write_measurements(result) + while buff: + (record, method, itera) = buff.pop(0) -def extract_sched_data(data_file, result, base_file): - with open(data_file, 'r') as f: - data = f.read() + add_record(itera) + method(task_dict, record) - task_dict = get_task_dict(data) +def process_completion(task_dict, record): + task_dict[record.pid].misses.store_time(record) - try: - extract_tardy_vals(task_dict, data, result) - extract_variance(task_dict, data, result) - except Exception as e: - print("Error in %s" % data_file) - raise e +def process_release(task_dict, record): + data = task_dict[record.pid] + data.jobs += 1 + data.misses.start_time(record) - if (base_file): - extract_scaling_data(task_dict, data, result, base_file) +def process_param(task_dict, record): + params = TaskParams(record.wcet, record.period, record.partition) + task_dict[record.pid].params = params + +def process_block(task_dict, record): + task_dict[record.pid].blocks.start_time(record) + +def process_resume(task_dict, record): + task_dict[record.pid].blocks.store_time(record) + +register_record('ResumeRecord', 9, process_resume, 'Q8x', ['when']) +register_record('BlockRecord', 8, process_block, 'Q8x', ['when']) +register_record('CompletionRecord', 7, process_completion, 'Q8x', ['when']) +register_record('ReleaseRecord', 3, process_release, 'QQ', ['release', 'when']) +register_record('ParamRecord', 2, process_param, 'IIIcc2x', + ['wcet','period','phase','partition', 'task_class']) + +def extract_sched_data(result, data_dir, work_dir): + bin_files = conf.FILES['sched_data'].format(".*") + output_file = "%s/out-st" % work_dir + + bins = [f for f in os.listdir(data_dir) if re.match(bin_files, f)] + if not len(bins): + return + + # Save an in-english version of the data for debugging + cmd_arr = [conf.BINS['st_show']] + cmd_arr.extend(bins) + with open(output_file, "w") as f: + subprocess.call(cmd_arr, cwd=data_dir, stdout=f) + + task_dict = defaultdict(lambda : + TaskData(0, 0, TimeTracker(), TimeTracker())) + + # Gather per-task values + read_data(task_dict, bins) + + stat_data = {"avg-tard" : [], "max-tard" : [], + "avg-block" : [], "max-block" : [], + "miss-ratio" : []} + + # Group per-task values + for tdata in task_dict.itervalues(): + miss_ratio = float(tdata.misses.num) / tdata.jobs + # Scale average down to account for jobs with 0 tardiness + avg_tard = tdata.misses.avg * miss_ratio + + stat_data["miss-ratio"].append(miss_ratio) + stat_data["avg-tard" ].append(avg_tard / tdata.params.wcet) + stat_data["max-tard" ].append(tdata.misses.max / tdata.params.wcet) + stat_data["avg-block" ].append(tdata.blocks.avg / NSEC_PER_MSEC) + stat_data["max-block" ].append(tdata.blocks.max / NSEC_PER_MSEC) + + # Summarize value groups + for name, data in stat_data.iteritems(): + result[name] = Measurement(str(name)).from_array(data) -- cgit v1.2.2