From cb8db5d30ee769304c2c2b00f2a7d9bcb3c4098f Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Mon, 26 Nov 2012 16:02:48 -0500 Subject: Removed 2-step parse for scheduling statistics. --- common.py | 74 ++++++ config/config.example.py | 39 ++-- experiment/executable/executable.py | 10 +- experiment/executable/ftcat.py | 4 +- experiment/experiment.py | 10 +- experiment/litmus_util.py | 12 +- parse/ft.py | 47 ++-- parse/point.py | 4 +- parse/sched.py | 445 +++++++++++++----------------------- parse_exps.py | 122 +++------- run_exps.py | 4 +- 11 files changed, 327 insertions(+), 444 deletions(-) diff --git a/common.py b/common.py index a09ef7c..984f584 100644 --- a/common.py +++ b/common.py @@ -1,4 +1,78 @@ +import sys from collections import defaultdict +from textwrap import dedent + +def recordtype(typename, field_names, default=0): + ''' Mutable namedtuple. Recipe from George Sakkis of MIT.''' + field_names = tuple(map(str, field_names)) + # Create and fill-in the class template + numfields = len(field_names) + argtxt = ', '.join(field_names) + reprtxt = ', '.join('%s=%%r' % f for f in field_names) + dicttxt = ', '.join('%r: self.%s' % (f,f) for f in field_names) + tupletxt = repr(tuple('self.%s' % f for f in field_names)).replace("'",'') + inittxt = '; '.join('self.%s=%s' % (f,f) for f in field_names) + itertxt = '; '.join('yield self.%s' % f for f in field_names) + eqtxt = ' and '.join('self.%s==other.%s' % (f,f) for f in field_names) + template = dedent(''' + class %(typename)s(object): + '%(typename)s(%(argtxt)s)' + + __slots__ = %(field_names)r + + def __init__(self, %(argtxt)s): + %(inittxt)s + + def __len__(self): + return %(numfields)d + + def __iter__(self): + %(itertxt)s + + def __getitem__(self, index): + return getattr(self, self.__slots__[index]) + + def __setitem__(self, index, value): + return setattr(self, self.__slots__[index], value) + + def todict(self): + 'Return a new dict which maps field names to their values' + return {%(dicttxt)s} + + def __repr__(self): + return '%(typename)s(%(reprtxt)s)' %% %(tupletxt)s + + def __eq__(self, other): + return isinstance(other, self.__class__) and %(eqtxt)s + + def __ne__(self, other): + return not self==other + + def __getstate__(self): + return %(tupletxt)s + + def __setstate__(self, state): + %(tupletxt)s = state + ''') % locals() + # Execute the template string in a temporary namespace + namespace = {} + try: + exec template in namespace + except SyntaxError, e: + raise SyntaxError(e.message + ':\n' + template) + cls = namespace[typename] + + # Setup defaults + init_defaults = tuple(default for f in field_names) + cls.__init__.im_func.func_defaults = init_defaults + + # For pickling to work, the __module__ variable needs to be set to the frame + # where the named tuple is created. Bypass this step in environments where + # sys._getframe is not defined (Jython for example). + if hasattr(sys, '_getframe') and sys.platform != 'cli': + cls.__module__ = sys._getframe(1).f_globals['__name__'] + + return cls def load_params(fname): params = defaultdict(int) diff --git a/config/config.example.py b/config/config.example.py index 50d30ba..9f24097 100644 --- a/config/config.example.py +++ b/config/config.example.py @@ -3,56 +3,43 @@ import os import sys import itertools -""" +''' These are paths to repository directories. -""" +''' REPOS = {'liblitmus' : '/home/hermanjl/git/liblitmus', 'sched_trace' : '/home/hermanjl/git/sched_trace', - 'analysis' : '/home/hermanjl/git/overhead-analysis-cjk', 'ft_tools' : '/home/hermanjl/git/feather-trace-tools', 'trace-cmd' : '/home/hermanjl/git/trace-cmd'} -BINS = {'bespin' : '{}/bespin'.format(REPOS['liblitmus']), - 'colorspin' : '{}/colorspin'.format(REPOS['liblitmus']), - 'rtspin' : '{}/rtspin'.format(REPOS['liblitmus']), +BINS = {'rtspin' : '{}/rtspin'.format(REPOS['liblitmus']), 'release' : '{}/release_ts'.format(REPOS['liblitmus']), 'ftcat' : '{}/ftcat'.format(REPOS['ft_tools']), + 'ftsplit' : '{}/ft2csv'.format(REPOS['ft_tools']), + 'ftsort' : '{}/ftsort'.format(REPOS['ft_tools']), 'st_trace' : '{}/st_trace'.format(REPOS['ft_tools']), - 'split' : '{}/split'.format(REPOS['analysis']), - 'sort' : '{}/sort-all'.format(REPOS['analysis']), - 'analyze' : '{}/analyze'.format(REPOS['analysis']), 'trace-cmd' : '{}/trace-cmd'.format(REPOS['trace-cmd']), 'st_show' : '{}/st_show'.format(REPOS['sched_trace'])} DEFAULTS = {'params_file' : 'params.py', 'sched_file' : 'sched.py', 'exps_file' : 'exps.py', - 'duration' : '10', - 'spin' : 'rtspin'} + 'duration' : 10, + 'spin' : 'rtspin', + 'cycles' : 2000} FILES = {'ft_data' : 'ft.bin', 'linux_data' : 'trace.dat', 'sched_data' : 'st-{}.bin', 'log_data' : 'trace.slog',} -PARAMS = {'sched' : 'scheduler', - 'dur' : 'duration', - 'kernel' : 'uname'} +PARAMS = {'sched' : 'scheduler', + 'dur' : 'duration', + 'kernel': 'uname', + 'cycles' : 'cpu-frequency'} SCHED_EVENTS = range(501, 513) -BASE_EVENTS = ['SCHED', 'RELEASE', 'SCHED2', 'TICK', 'CXS', 'SEND_RESCHED'] -BASE_EVENTS += ['CQ_ENQUEUE_READ', 'CQ_ENQUEUE_FLUSH', 'CQ_SUBMIT_WORK', - 'CQ_LOOP_WORK_CHECK', 'CQ_LOOP_PEACE_OUT', 'CQ_LOOP_BRANCH', - 'CQ_WORK_DO_WORK', 'CQ_WORK_NOTIFY', 'CQ_PHASE_WAIT'] - -# Expand for mixed-crit -# TODO don't use split -CRIT_EVENTS = ['LVL{}_SCHED', 'LVL{}_RELEASE'] -CRIT_LEVELS = ['A', 'B', 'C'] -BASE_EVENTS += [s.format(l) for (l,s) in - itertools.product(CRIT_LEVELS, CRIT_EVENTS)] - +BASE_EVENTS = ['SCHED', 'RELEASE', 'SCHED2', 'TICK', 'CXS'] ALL_EVENTS = ["%s_%s" % (e, t) for (e,t) in itertools.product(BASE_EVENTS, ["START","END"])] ALL_EVENTS += ['RELEASE_LATENCY'] diff --git a/experiment/executable/executable.py b/experiment/executable/executable.py index b964699..628f711 100644 --- a/experiment/executable/executable.py +++ b/experiment/executable/executable.py @@ -4,7 +4,7 @@ import signal from ..litmus_util import is_executable class Executable(object): - """Parent object that represents an executable for use in task-sets.""" + '''Parent object that represents an executable for use in task-sets.''' def __init__(self, exec_file, extra_args=None, stdout_file = None, stderr_file = None): self.exec_file = exec_file @@ -47,7 +47,7 @@ class Executable(object): return " ".join(self.__get_full_command()) def execute(self): - """Execute the binary.""" + '''Execute the binary.''' full_command = self.__get_full_command() self.sp = subprocess.Popen(full_command, stdout=self.stdout_file, stderr=self.stderr_file, cwd=self.cwd) @@ -59,15 +59,15 @@ class Executable(object): self.sp.send_signal(signal.SIGINT) def terminate(self): - """Send the terminate signal to the binary.""" + '''Send the terminate signal to the binary.''' self.sp.terminate() def wait(self): - """Wait until the executable is finished, checking return code. + '''Wait until the executable is finished, checking return code. If the exit status is non-zero, raise an exception. - """ + ''' self.sp.wait() if self.sp.returncode != 0: diff --git a/experiment/executable/ftcat.py b/experiment/executable/ftcat.py index 9966312..5da8fa7 100644 --- a/experiment/executable/ftcat.py +++ b/experiment/executable/ftcat.py @@ -4,10 +4,10 @@ import stat from executable import Executable class FTcat(Executable): - """Used to wrap the ftcat binary in the Experiment object.""" + '''Used to wrap the ftcat binary in the Experiment object.''' def __init__(self, ft_cat_bin, stdout_file, stderr_file, dev, events, cpu=None): - """Extends the Executable initializer method with ftcat attributes.""" + '''Extends the Executable initializer method with ftcat attributes.''' # hack to run FTCat at higher priority chrt_bin = '/usr/bin/chrt' diff --git a/experiment/experiment.py b/experiment/experiment.py index deb4ff2..4bd47c6 100644 --- a/experiment/experiment.py +++ b/experiment/experiment.py @@ -5,19 +5,19 @@ from operator import methodcaller from tracer import SchedTracer, LogTracer, PerfTracer, LinuxTracer, OverheadTracer class ExperimentException(Exception): - """Used to indicate when there are problems with an experiment.""" + '''Used to indicate when there are problems with an experiment.''' def __init__(self, name): self.name = name class ExperimentDone(ExperimentException): - """Raised when an experiment looks like it's been run already.""" + '''Raised when an experiment looks like it's been run already.''' def __str__(self): return "Experiment finished already: %d" % self.name class ExperimentInterrupted(ExperimentException): - """Raised when an experiment appears to be interrupted (partial results).""" + '''Raised when an experiment appears to be interrupted (partial results).''' def __str__(self): return "Experiment was interrupted in progress: %d" % self.name @@ -28,11 +28,11 @@ class ExperimentFailed(ExperimentException): class Experiment(object): - """Execute one task-set and save the results. Experiments have unique IDs.""" + '''Execute one task-set and save the results. Experiments have unique IDs.''' INTERRUPTED_DIR = ".interrupted" def __init__(self, name, scheduler, working_dir, finished_dir, proc_entries, executables): - """Run an experiment, optionally wrapped in tracing.""" + '''Run an experiment, optionally wrapped in tracing.''' self.name = name self.scheduler = scheduler diff --git a/experiment/litmus_util.py b/experiment/litmus_util.py index 42d3e5f..fb2b341 100644 --- a/experiment/litmus_util.py +++ b/experiment/litmus_util.py @@ -6,7 +6,7 @@ import stat import config.config as conf def num_cpus(): - """Return the number of CPUs in the system.""" + '''Return the number of CPUs in the system.''' lnx_re = re.compile(r'^(processor|online)') cpus = 0 @@ -18,9 +18,9 @@ def num_cpus(): return cpus def cpu_freq(): - """ + ''' The frequency (in MHz) of the CPU. - """ + ''' reg = re.compile(r'^cpu MHz\s*:\s*(\d+)', re.M) with open('/proc/cpuinfo', 'r') as f: data = f.read() @@ -31,12 +31,12 @@ def cpu_freq(): return int(match.group(1)) def switch_scheduler(switch_to_in): - """Switch the scheduler to whatever is passed in. + '''Switch the scheduler to whatever is passed in. This methods sleeps for two seconds to give Linux the chance to execute schedule switching code. Raises an exception if the switch does not work. - """ + ''' switch_to = str(switch_to_in).strip() @@ -57,7 +57,7 @@ def uname_matches(reg): return bool( re.match(reg, data) ) def is_executable(fname): - """Return whether the file passed in is executable""" + '''Return whether the file passed in is executable''' mode = os.stat(fname)[stat.ST_MODE] return mode & stat.S_IXUSR and mode & stat.S_IRUSR diff --git a/parse/ft.py b/parse/ft.py index c5f1522..fea246a 100644 --- a/parse/ft.py +++ b/parse/ft.py @@ -11,27 +11,8 @@ FT_SPLIT_NAME = "overhead={}.bin" FT_SORTED_NAME = "sorted-ft.bin" FT_ERR_NAME = "err-ft" -def extract_ft_data(result, data_dir, cycles, tmp_dir): - freg = conf.FILES['ft_data'] + "$" - bins = [f for f in os.listdir(data_dir) if re.match(freg, f)] - - if not len(bins): - return False - - bin_file = "{}/{}".format(data_dir, bins[0]) - - with open("%s/%s" % (tmp_dir, FT_ERR_NAME), 'w') as err_file: - sorted_bin = sort_ft(bin_file, err_file, tmp_dir) - - for event in conf.BASE_EVENTS: - parse_overhead(result, sorted_bin, event, cycles, - tmp_dir, err_file) - - os.remove(sorted_bin) - - return True - def parse_overhead(result, overhead_bin, overhead, cycles, out_dir, err_file): + '''Store statistics for @overhead in @overhead_bin into @result.''' ovh_fname = "{}/{}".format(out_dir, FT_SPLIT_NAME).format(overhead) if os.path.exists(ovh_fname): @@ -39,7 +20,7 @@ def parse_overhead(result, overhead_bin, overhead, cycles, out_dir, err_file): ovh_file = open(ovh_fname, 'w') # Extract matching overhead events into a seperate file - cmd = [conf.BINS["split"], "-r", "-b", overhead, overhead_bin] + cmd = [conf.BINS["ftsplit"], "-r", "-b", overhead, overhead_bin] ret = subprocess.call(cmd, cwd=out_dir, stderr=err_file, stdout=ovh_file) size = os.stat(ovh_fname).st_size @@ -65,9 +46,7 @@ def parse_overhead(result, overhead_bin, overhead, cycles, out_dir, err_file): os.remove(ovh_fname) def sort_ft(ft_file, err_file, out_dir): - """ - Create and return file with sorted overheads from @ft_file. - """ + '''Create and return file with sorted overheads from @ft_file.''' out_fname = "{}/{}".format(out_dir, FT_SORTED_NAME) # Sort happens in-place @@ -79,3 +58,23 @@ def sort_ft(ft_file, err_file, out_dir): raise Exception("Sort failed with command: %s" % " ".join(cmd)) return out_fname + +def extract_ft_data(result, data_dir, work_dir, cycles): + freg = conf.FILES['ft_data'] + "$" + bins = [f for f in os.listdir(data_dir) if re.match(freg, f)] + + if not len(bins): + return False + + bin_file = "{}/{}".format(data_dir, bins[0]) + + with open("%s/%s" % (work_dir, FT_ERR_NAME), 'w') as err_file: + sorted_bin = sort_ft(bin_file, err_file, work_dir) + + for event in conf.BASE_EVENTS: + parse_overhead(result, sorted_bin, event, cycles, + work_dir, err_file) + + os.remove(sorted_bin) + + return True diff --git a/parse/point.py b/parse/point.py index d5f4a5e..8e27869 100644 --- a/parse/point.py +++ b/parse/point.py @@ -1,6 +1,6 @@ -""" +''' Too much duplicate code in this file -""" +''' import copy import numpy as np diff --git a/parse/sched.py b/parse/sched.py index 3e30880..ffc6224 100644 --- a/parse/sched.py +++ b/parse/sched.py @@ -1,306 +1,183 @@ -""" -TODO: No longer very pythonic, lot of duplicate code -print out task execution times or something -get miss ratio and tardiness directly from schedule OR -email list about turning on optional summary statistics OR -set up run exps to only get release and completions to get these stats -""" - import config.config as conf import os import re -import numpy as np +import struct import subprocess -from collections import namedtuple,defaultdict -from operator import methodcaller -from point import Measurement,Type - -PARAM_RECORD = r"(?P" +\ - r"PARAM *?(?P\d+)\/.*?" +\ - r"cost.*?(?P[\d\.]+)ms.*?" +\ - r"period.*?(?P[\d.]+)ms.*?" +\ - r"part.*?(?P\d+)[, ]*" +\ - r"(?:class=(?P\w+))?[, ]*" +\ - r"(?:level=(?P\w+))?).*$" -EXIT_RECORD = r"(?P" +\ - r"TASK_EXIT *?(?P\d+)/.*?" +\ - r"Avg.*?(?P\d+).*?" +\ - r"Max.*?(?P\d+))" -TARDY_RECORD = r"(?P" +\ - r"TASK_TARDY.*?(?P\d+)/(?P\d+).*?" +\ - r"Tot.*?(?P[\d\.]+).*?ms.*?" +\ - r"(?P[\d\.]+).*?ms.*?" +\ - r"(?P[\d\.]+))" -COMPLETION_RECORD = r"(?P" +\ - r"COMPLETION.*?(?P\d+)/.*?" +\ - r"exec:.*?(?P[\d\.]+)ms.*?" +\ - r"flush:.*?(?P[\d\.]+)ms.*?" +\ - r"flush_work:.*?(?P[\d]+).*?" +\ - r"load:.*?(?P[\d\.]+)ms.*?" +\ - r"load_work:.*?(?P[\d]+))" - -TaskConfig = namedtuple('TaskConfig', ['cpu','wcet','period','type','level']) -Task = namedtuple('Task', ['pid', 'config', 'run']) - -class LeveledArray(object): - """ - Groups statistics by the level of the task to which they apply - """ - def __init__(self, name): - self.name = name - self.vals = defaultdict(lambda:[]) - - def add(self, task, value): - self.vals[task.config.level] += [value] - - - def write_measurements(self, result): - for level, arr in self.vals.iteritems(): - name = "%s%s" % ("%s-" % level if level else "", self.name) - result[name] = Measurement(name).from_array(arr) - -def get_st_output(data_dir, out_dir, force=False): - """ - Create and return files containing unpacked sched data - """ - bin_files = conf.FILES['sched_data'].format(".*") - bins = [f for f in os.listdir(data_dir) if re.match(bin_files, f)] +from collections import defaultdict,namedtuple +from common import recordtype +from point import Measurement - output_file = "%s/out-st" % out_dir - - if os.path.isfile(output_file): - if force: - os.remove(output_file) - else: - return output_file - - if len(bins) != 0: - cmd_arr = [conf.BINS['st_show']] - cmd_arr.extend(bins) - with open(output_file, "w") as f: - subprocess.call(cmd_arr, cwd=data_dir, stdout=f) - else: - return None - return output_file - -def get_tasks(data): - ret = [] - for match in re.finditer(PARAM_RECORD, data, re.M): - try: - t = Task( int(match.group('PID')), - TaskConfig( int(match.group('CPU')), - float(match.group('WCET')), - float(match.group('PERIOD')), - match.group("CLASS"), - match.group("LEVEL")), []) - if not (t.config.period and t.pid): - raise Exception() - ret += [t] - except Exception as e: - raise Exception("Invalid task record: %s\nparsed:\n\t%s\n\t%s" % - (e, match.groupdict(), match.group('RECORD'))) - return ret - -def get_task_dict(data): - tasks_list = get_tasks(data) - tasks_dict = {} - for t in tasks_list: - tasks_dict[t.pid] = t - return tasks_dict - -def get_task_exits(data): - ret = [] - for match in re.finditer(EXIT_RECORD, data): - try: - m = Measurement( int(match.group('PID')), - {Type.Max : float(match.group('MAX')), - Type.Avg : float(match.group('AVG'))}) - except: - raise Exception("Invalid exit record, parsed:\n\t%s\n\t%s" % - (match.groupdict(), match.group('RECORD'))) +class TimeTracker: + '''Store stats for durations of time demarcated by sched_trace records.''' + def __init__(self): + self.begin = self.avg = self.max = self.num = self.job = 0 - ret += [m] - return ret + def store_time(self, record): + '''End duration of time.''' + dur = record.when - self.begin + if self.job == record.job and dur > 0: + self.max = max(self.max, dur) + self.avg *= float(self.num / (self.num + 1)) + self.num += 1 + self.avg += dur / float(self.num) -def extract_tardy_vals(task_dict, data, exp_point): - ratios = LeveledArray("miss-ratio") - avg_tards = LeveledArray("avg-rel-tardiness") - max_tards = LeveledArray("max-rel-tardiness") + self.begin = 0 + self.job = 0 + + def start_time(self, record): + '''Start duration of time.''' + self.begin = record.when + self.job = record.job + +# Data stored for each task +TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu']) +TaskData = recordtype('TaskData', ['params', 'jobs', 'blocks', 'misses']) + +# Map of event ids to corresponding class, binary format, and processing methods +RecordInfo = namedtuple('RecordInfo', ['clazz', 'fmt', 'method']) +record_map = [0]*10 + +# Common to all records +HEADER_FORMAT = ' 1 or not pid: #TODO: fix, raise Exception() - continue - except: - raise Exception("Invalid completion record, missed: %d:" - "\n\t%s\n\t%s" % (missed[pid], match.groupdict(), - match.group("RECORD"))) - completions[pid] += [duration] - - for pid, durations in completions.iteritems(): - m = Measurement(pid).from_array(durations) - - # TODO: not this, please - if not task_dict[pid].run: - task_dict[pid].run.append(m) - - job_times = np.array(durations) - mean = job_times.mean() - - if not mean or not durations: + type_num = struct.unpack_from('b',data)[0] + except struct.error: + break + + rdata = record_map[type_num] if type_num <= max_type else 0 + if not rdata: continue - # Coefficient of variation - cv = job_times.std() / job_times.mean() - # Correction, assuming normal distributions - corrected = (1 + 1/(4 * len(job_times))) * cv - - varz.add(task_dict[pid], corrected) - # varz.add(task_dict[pid], m[Type.Var]) - - if exp_point: - map(methodcaller('write_measurements', exp_point), - [varz, flushes, loads, fworks, lworks]) - -def config_exit_stats(task_dict, data): - # # Dictionary of task exit measurements by pid - # exits = get_task_exits(data) - # exit_dict = dict((e.id, e) for e in exits) - extract_variance(task_dict, data, None) - - # Dictionary where keys are configurations, values are list - # of tasks with those configuratino - config_dict = defaultdict(lambda: []) - for t in task_dict.itervalues(): - config_dict[t.config] += [t] - - for config in config_dict: - task_list = sorted(config_dict[config]) - - # # Replace tasks with corresponding exit stats - # if not t.pid in exit_dict: - # raise Exception("Missing exit record for task '%s' in '%s'" % - # (t, file.name)) - # exit_list = [exit_dict[t.pid] for t in task_list] - exit_list = [t.run[0] for t in task_list] - config_dict[config] = exit_list - - return config_dict - -saved_stats = {} -def get_base_stats(base_file): - if base_file in saved_stats: - return saved_stats[base_file] - with open(base_file, 'r') as f: - data = f.read() - task_dict = get_task_dict(data) - - result = config_exit_stats(task_dict, data) - saved_stats[base_file] = result - return result - -def extract_scaling_data(task_dict, data, result, base_file): - # Generate trees of tasks with matching configurations - data_stats = config_exit_stats(task_dict, data) - base_stats = get_base_stats(base_file) - - # Scaling factors are calculated by matching groups of tasks with the same - # config, then comparing task-to-task exec times in order of PID within - # each group - max_scales = LeveledArray("max-scaling") - avg_scales = LeveledArray("avg-scaling") - - for config in data_stats: - if len(data_stats[config]) != len(base_stats[config]): - # Quit, we are missing a record and can't guarantee - # a task-to-task comparison + try: + values = struct.unpack_from(rdata.fmt, data) + except struct.error: continue - for data_stat, base_stat in zip(data_stats[config],base_stats[config]): - if not base_stat[Type.Avg] or not base_stat[Type.Max] or \ - not data_stat[Type.Avg] or not data_stat[Type.Max]: - continue - # How much larger is their exec stat than ours? - avg_scale = float(base_stat[Type.Avg]) / float(data_stat[Type.Avg]) - max_scale = float(base_stat[Type.Max]) / float(data_stat[Type.Max]) + obj = rdata.clazz(*values) + yield (obj, rdata.method) + +def read_data(task_dict, fnames): + '''Read records from @fnames and store per-pid stats in @task_dict.''' + buff = [] + + def add_record(itera): + # Ordered insertion into buff + try: + next_ret = itera.next() + except StopIteration: + return - task = task_dict[data_stat.id] + arecord, method = next_ret + i = 0 + for (i, (brecord, m, t)) in enumerate(buff): + if brecord.when > arecord.when: + break + buff.insert(i, (arecord, method, itera)) - avg_scales.add(task, avg_scale) - max_scales.add(task, max_scale) + for fname in fnames: + itera = make_iterator(fname) + add_record(itera) - avg_scales.write_measurements(result) - max_scales.write_measurements(result) + while buff: + (record, method, itera) = buff.pop(0) -def extract_sched_data(data_file, result, base_file): - with open(data_file, 'r') as f: - data = f.read() + add_record(itera) + method(task_dict, record) - task_dict = get_task_dict(data) +def process_completion(task_dict, record): + task_dict[record.pid].misses.store_time(record) - try: - extract_tardy_vals(task_dict, data, result) - extract_variance(task_dict, data, result) - except Exception as e: - print("Error in %s" % data_file) - raise e +def process_release(task_dict, record): + data = task_dict[record.pid] + data.jobs += 1 + data.misses.start_time(record) - if (base_file): - extract_scaling_data(task_dict, data, result, base_file) +def process_param(task_dict, record): + params = TaskParams(record.wcet, record.period, record.partition) + task_dict[record.pid].params = params + +def process_block(task_dict, record): + task_dict[record.pid].blocks.start_time(record) + +def process_resume(task_dict, record): + task_dict[record.pid].blocks.store_time(record) + +register_record('ResumeRecord', 9, process_resume, 'Q8x', ['when']) +register_record('BlockRecord', 8, process_block, 'Q8x', ['when']) +register_record('CompletionRecord', 7, process_completion, 'Q8x', ['when']) +register_record('ReleaseRecord', 3, process_release, 'QQ', ['release', 'when']) +register_record('ParamRecord', 2, process_param, 'IIIcc2x', + ['wcet','period','phase','partition', 'task_class']) + +def extract_sched_data(result, data_dir, work_dir): + bin_files = conf.FILES['sched_data'].format(".*") + output_file = "%s/out-st" % work_dir + + bins = [f for f in os.listdir(data_dir) if re.match(bin_files, f)] + if not len(bins): + return + + # Save an in-english version of the data for debugging + cmd_arr = [conf.BINS['st_show']] + cmd_arr.extend(bins) + with open(output_file, "w") as f: + subprocess.call(cmd_arr, cwd=data_dir, stdout=f) + + task_dict = defaultdict(lambda : + TaskData(0, 0, TimeTracker(), TimeTracker())) + + # Gather per-task values + read_data(task_dict, bins) + + stat_data = {"avg-tard" : [], "max-tard" : [], + "avg-block" : [], "max-block" : [], + "miss-ratio" : []} + + # Group per-task values + for tdata in task_dict.itervalues(): + miss_ratio = float(tdata.misses.num) / tdata.jobs + # Scale average down to account for jobs with 0 tardiness + avg_tard = tdata.misses.avg * miss_ratio + + stat_data["miss-ratio"].append(miss_ratio) + stat_data["avg-tard" ].append(avg_tard / tdata.params.wcet) + stat_data["max-tard" ].append(tdata.misses.max / tdata.params.wcet) + stat_data["avg-block" ].append(tdata.blocks.avg / NSEC_PER_MSEC) + stat_data["max-block" ].append(tdata.blocks.max / NSEC_PER_MSEC) + + # Summarize value groups + for name, data in stat_data.iteritems(): + result[name] = Measurement(str(name)).from_array(data) diff --git a/parse_exps.py b/parse_exps.py index d932b0d..c8cd8b1 100755 --- a/parse_exps.py +++ b/parse_exps.py @@ -2,11 +2,9 @@ from __future__ import print_function import config.config as conf -import copy import os import parse.ft as ft import parse.sched as st -import re import shutil as sh import sys @@ -22,13 +20,8 @@ def parse_args(): parser.add_option('-o', '--out', dest='out', help='file or directory for data output', default='parse-data') - - # TODO: this means nothing, also remove dests parser.add_option('-c', '--clean', action='store_true', default=False, dest='clean', help='do not output single-point csvs') - parser.add_option('-s', '--scale-against', dest='scale_against', - metavar='PARAM=VALUE', default="", - help='calculate task scaling factors against these configs') parser.add_option('-i', '--ignore', metavar='[PARAM...]', default="", help='ignore changing parameter values') parser.add_option('-f', '--force', action='store_true', default=False, @@ -41,136 +34,89 @@ def parse_args(): return parser.parse_args() -ExpData = namedtuple('ExpData', ['name', 'params', 'data_files', 'is_base']) -DataFiles = namedtuple('DataFiles', ['st']) +ExpData = namedtuple('ExpData', ['path', 'params', 'work_dir']) def get_exp_params(data_dir, col_map): param_file = "%s/%s" % (data_dir, conf.DEFAULTS['params_file']) if not os.path.isfile: raise Exception("No param file '%s' exists!" % param_file) - # Keep only params that uniquely identify the experiment + # Ignore 'magic' parameters used by these scripts params = load_params(param_file) for ignored in conf.PARAMS.itervalues(): - # Always include cycles or overhead parsing fails + # With the exception of cycles which is used by overhead parsing if ignored in params and ignored != conf.PARAMS['cycles']: params.pop(ignored) - # Track all changed params + # Store parameters in col_map, which will track which parameters change + # across experiments for key, value in params.iteritems(): col_map.try_add(key, value) + # Cycles must be present if conf.PARAMS['cycles'] not in params: params[conf.PARAMS['cycles']] = conf.DEFAULTS['cycles'] return params -def gen_exp_data(exp_dirs, base_conf, col_map, force): - plain_exps = [] - scaling_bases = [] +def load_exps(exp_dirs, col_map, clean): + exps = [] - sys.stderr.write("Generating data...\n") + sys.stderr.write("Loading experiments...\n") - for i, data_dir in enumerate(exp_dirs): + for data_dir in exp_dirs: if not os.path.isdir(data_dir): raise IOError("Invalid experiment '%s'" % os.path.abspath(data_dir)) - tmp_dir = data_dir + "/tmp" - if not os.path.exists(tmp_dir): - os.mkdir(tmp_dir) - - # Read and translate exp output files - params = get_exp_params(data_dir, col_map) - st_output = st.get_st_output(data_dir, tmp_dir, force) - - if base_conf and base_conf.viewitems() & params.viewitems(): - if not st_output: - raise Exception("Scaling base '%s' useless without sched data!" - % data_dir) - is_base = True - - base_params = copy.deepcopy(params) - base_params.pop(base_conf.keys()[0]) + # Used to store error output and debugging info + work_dir = data_dir + "/tmp" - base_exp = ExpData(data_dir, base_params, - DataFiles(st_output), True) - scaling_bases += [base_exp] - else: - is_base = False + if not os.path.exists(work_dir): + os.mkdir(work_dir) + elif clean: + sh.rmtree(work_dir) - # Create experiment named after the data dir - exp_data = ExpData(data_dir, params, - DataFiles(st_output), is_base) + params = get_exp_params(data_dir, col_map) - plain_exps += [exp_data] + exps += [ ExpData(data_dir, params, work_dir) ] - sys.stderr.write('\r {0:.2%}'.format(float(i)/len(exp_dirs))) - sys.stderr.write('\n') - return (plain_exps, scaling_bases) + return exps def main(): opts, args = parse_args() args = args or [os.getcwd()] - # Configuration key for task systems used to calculate task - # execution scaling factors - base_conf = dict(re.findall("(.*)=(.*)", opts.scale_against)) - + # Load exp parameters into col_map col_map = ColMap() + exps = load_exps(args, col_map, opts.force) - (plain_exps, scaling_bases) = gen_exp_data(args, base_conf, col_map, opts.force) - - if base_conf and base_conf.keys()[0] not in col_map: - raise IOError("Base column '%s' not present in any parameters!" % - base_conf.keys()[0]) - - base_map = copy.deepcopy(col_map) + # Don't track changes in ignored parameters if opts.ignore: for param in opts.ignore.split(","): col_map.try_remove(param) - base_table = TupleTable(base_map) # For tracking 'base' experiments - result_table = TupleTable(col_map) # For generating output - - # Used to find matching scaling_base for each experiment - for base in scaling_bases: - base_table.add_exp(base.params, base) + result_table = TupleTable(col_map) sys.stderr.write("Parsing data...\n") - for exp in args: - result = ExpPoint(exp) - params = get_exp_params(exp, col_map) - # Write overheads into result - ft.extract_ft_data(result, exp, - params[conf.PARAMS['cycles']], - exp + "/tmp") - - if opts.verbose: - print(result) - - for i,exp in enumerate(plain_exps): - result = ExpPoint(exp.name) - - if exp.data_files.st: - base = None - if base_conf and not exp.is_base: - # Try to find a scaling base - base_params = copy.deepcopy(exp.params) - base_params.pop(base_conf.keys()[0]) - base = base_table.get_exps(base_params)[0] + for i,exp in enumerate(exps): + result = ExpPoint(exp.path) + cycles = exp.params[conf.PARAMS['cycles']] - # Write deadline misses / tardiness into result - st.extract_sched_data(exp.data_files.st, result, - base.data_files.st if base else None) + # Write overheads into result + ft.extract_ft_data(result, exp.path, exp.work_dir, cycles) - result_table.add_exp(exp.params, result) + # Write scheduling statistics into result + st.extract_sched_data(result, exp.path, exp.work_dir) if opts.verbose: print(result) else: - sys.stderr.write('\r {0:.2%}'.format(float(i)/len(plain_exps))) + sys.stderr.write('\r {0:.2%}'.format(float(i)/len(exps))) + + result_table.add_exp(exp.params, result) + sys.stderr.write('\n') if opts.force and os.path.exists(opts.out): diff --git a/run_exps.py b/run_exps.py index 1d2cc2e..24f71e4 100755 --- a/run_exps.py +++ b/run_exps.py @@ -39,7 +39,7 @@ def parse_args(): def convert_data(data): - """Convert a non-python schedule file into the python format""" + '''Convert a non-python schedule file into the python format''' regex = re.compile( r"(?P^" r"(?P
/proc/[\w\-]+?/)?" @@ -67,7 +67,7 @@ def convert_data(data): return {'proc' : procs, 'spin' : spins} def fix_paths(schedule, exp_dir, sched_file): - """Replace relative paths of command line arguments with absolute ones.""" + '''Replace relative paths of command line arguments with absolute ones.''' for (idx, (spin, args)) in enumerate(schedule['spin']): for arg in re.split(" +", args): abspath = "%s/%s" % (exp_dir, arg) -- cgit v1.2.2