From 7545402506aa76261e18d85af585ff0ac1cf05c1 Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Tue, 23 Apr 2013 14:01:35 -0400 Subject: Improved accuracy of sched_trace measurement parsing. * Measurements from tasks missing > 20% of their scheduling records are ignored. This is configurable in config/config.py. * Measurements which only have zero values are ignored. * If either of these 2 situations are encountered print out a message the first time using the common.log_once() method. See parse_exps.py for how this is used with multiple threads. * Measurements from a task's last job are ignored. * Miss ratio is calculated only as a fraction of the number of jobs whose matching release and completion records were found, not just release. --- common.py | 17 ++++++ config/config.py | 4 ++ parse/point.py | 4 ++ parse/sched.py | 84 +++++++++++++++++++-------- parse/tuple_table.py | 4 ++ parse_exps.py | 159 +++++++++++++++++++++++++++++++-------------------- 6 files changed, 186 insertions(+), 86 deletions(-) diff --git a/common.py b/common.py index a2c6224..7abf0f2 100644 --- a/common.py +++ b/common.py @@ -193,3 +193,20 @@ def is_device(dev): return False mode = os.stat(dev)[stat.ST_MODE] return not (not mode & stat.S_IFCHR) + +__logged = [] + +def set_logged_list(logged): + global __logged + __logged = logged + +def log_once(id, msg = None, indent = True): + global __logged + + msg = msg if msg else id + + if id not in __logged: + __logged += [id] + if indent: + msg = ' ' + msg.strip('\t').replace('\n', '\n\t') + sys.stderr.write('\n' + msg.strip('\n') + '\n') diff --git a/config/config.py b/config/config.py index b631aa2..5e6f9e3 100644 --- a/config/config.py +++ b/config/config.py @@ -56,3 +56,7 @@ OVH_ALL_EVENTS = ["%s_%s" % (e, t) for (e,t) in OVH_ALL_EVENTS += ['RELEASE_LATENCY'] # This event doesn't have a START and END OVH_BASE_EVENTS += ['RELEASE_LATENCY'] + +# If a task is missing more than this many records, its measurements +# are not included in sched_trace summaries +MAX_RECORD_LOSS = .2 diff --git a/parse/point.py b/parse/point.py index ac47c70..b1d9d53 100644 --- a/parse/point.py +++ b/parse/point.py @@ -133,6 +133,10 @@ class ExpPoint(object): def get_stats(self): return self.stats.keys() + def __bool__(self): + return bool(self.stats) + __nonzero__ = __bool__ + class SummaryPoint(ExpPoint): def __init__(self, id="", points=[], typemap = default_typemap): diff --git a/parse/sched.py b/parse/sched.py index b56324b..4933037 100644 --- a/parse/sched.py +++ b/parse/sched.py @@ -5,35 +5,55 @@ import struct import subprocess from collections import defaultdict,namedtuple -from common import recordtype +from common import recordtype,log_once from point import Measurement from ctypes import * class TimeTracker: '''Store stats for durations of time demarcated by sched_trace records.''' def __init__(self): - self.begin = self.avg = self.max = self.num = self.job = 0 + self.begin = self.avg = self.max = self.num = self.next_job = 0 - def store_time(self, record): + # Count of times the job in start_time matched that in store_time + self.matches = 0 + # And the times it didn't + self.disjoints = 0 + + # Measurements are recorded in store_ time using the previous matching + # record which was passed to store_time. This way, the last record for + # any task is always skipped + self.last_record = None + + def store_time(self, next_record): '''End duration of time.''' - dur = record.when - self.begin + dur = (self.last_record.when - self.begin) if self.last_record else -1 - if self.job == record.job and dur > 0: - self.max = max(self.max, dur) - self.avg *= float(self.num / (self.num + 1)) - self.num += 1 - self.avg += dur / float(self.num) + if self.next_job == next_record.job: + self.last_record = next_record - self.begin = 0 - self.job = 0 + if self.last_record: + self.matches += 1 + + if dur > 0: + self.max = max(self.max, dur) + self.avg *= float(self.num / (self.num + 1)) + self.num += 1 + self.avg += dur / float(self.num) + + self.begin = 0 + self.next_job = 0 + else: + self.disjoints += 1 def start_time(self, record, time = None): '''Start duration of time.''' - if not time: - self.begin = record.when - else: - self.begin = time - self.job = record.job + if self.last_record: + if not time: + self.begin = self.last_record.when + else: + self.begin = time + + self.next_job = record.job # Data stored for each task TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu']) @@ -203,6 +223,12 @@ def create_task_dict(data_dir, work_dir = None): return task_dict +LOSS_MSG = """Found task missing more than %d%% of its scheduling records. +These won't be included in scheduling statistics!"""%(100*conf.MAX_RECORD_LOSS) +SKIP_MSG = """Measurement '%s' has no non-zero values. +Measurements like these are not included in scheduling statistics. +If a measurement is missing, this is why.""" + def extract_sched_data(result, data_dir, work_dir): task_dict = create_task_dict(data_dir, work_dir) stat_data = defaultdict(list) @@ -213,19 +239,29 @@ def extract_sched_data(result, data_dir, work_dir): # Currently unknown where these invalid tasks come from... continue - miss_ratio = float(tdata.misses.num) / tdata.jobs - stat_data["miss-ratio"].append(float(tdata.misses.num) / tdata.jobs) + miss = tdata.misses + + record_loss = float(miss.disjoints)/(miss.matches + miss.disjoints) + stat_data["record-loss"].append(record_loss) + + if record_loss > conf.MAX_RECORD_LOSS: + log_once(LOSS_MSG) + continue + + miss_ratio = float(miss.num) / miss.matches + avg_tard = miss.avg * miss_ratio + + stat_data["miss-ratio" ].append(miss_ratio) - stat_data["max-tard" ].append(tdata.misses.max / tdata.params.wcet) - # Scale average down to account for jobs with 0 tardiness - avg_tard = tdata.misses.avg * miss_ratio - stat_data["avg-tard" ].append(avg_tard / tdata.params.wcet) + stat_data["max-tard"].append(miss.max / tdata.params.period) + stat_data["avg-tard"].append(avg_tard / tdata.params.period) - stat_data["avg-block" ].append(tdata.blocks.avg / NSEC_PER_MSEC) - stat_data["max-block" ].append(tdata.blocks.max / NSEC_PER_MSEC) + stat_data["avg-block"].append(tdata.blocks.avg / NSEC_PER_MSEC) + stat_data["max-block"].append(tdata.blocks.max / NSEC_PER_MSEC) # Summarize value groups for name, data in stat_data.iteritems(): if not data or not sum(data): + log_once(SKIP_MSG, SKIP_MSG % name) continue result[name] = Measurement(str(name)).from_array(data) diff --git a/parse/tuple_table.py b/parse/tuple_table.py index 47fb6b6..320d9dd 100644 --- a/parse/tuple_table.py +++ b/parse/tuple_table.py @@ -13,6 +13,10 @@ class TupleTable(object): def get_col_map(self): return self.col_map + def __bool__(self): + return bool(self.table) + __nonzero__ = __bool__ + def __getitem__(self, kv): key = self.col_map.get_key(kv) return self.table[key] diff --git a/parse_exps.py b/parse_exps.py index c2cbedb..cc4372a 100755 --- a/parse_exps.py +++ b/parse_exps.py @@ -1,6 +1,8 @@ #!/usr/bin/env python from __future__ import print_function +import common as com +import multiprocessing import os import parse.ft as ft import parse.sched as st @@ -10,13 +12,12 @@ import sys import traceback from collections import namedtuple -from common import load_params from config.config import DEFAULTS,PARAMS from optparse import OptionParser from parse.point import ExpPoint from parse.tuple_table import TupleTable from parse.col_map import ColMapBuilder -from multiprocessing import Pool, cpu_count + def parse_args(): parser = OptionParser("usage: %prog [options] [data_dir]...") @@ -33,18 +34,60 @@ def parse_args(): parser.add_option('-m', '--write-map', action='store_true', default=False, dest='write_map', help='Output map of values instead of csv tree') - parser.add_option('-p', '--processors', default=max(cpu_count() - 1, 1), + parser.add_option('-p', '--processors', + default=max(multiprocessing.cpu_count() - 1, 1), type='int', dest='processors', help='number of threads for processing') return parser.parse_args() + ExpData = namedtuple('ExpData', ['path', 'params', 'work_dir']) + +def parse_exp(exp_force): + # Tupled for multiprocessing + exp, force = exp_force + + result_file = exp.work_dir + "/exp_point.pkl" + should_load = not force and os.path.exists(result_file) + + result = None + if should_load: + with open(result_file, 'rb') as f: + try: + # No need to go through this work twice + result = pickle.load(f) + except: + pass + + if not result: + try: + # Create a readable name + name = os.path.relpath(exp.path) + name = name if name != "." else os.path.split(os.getcwd())[1] + + result = ExpPoint(name) + + # Write overheads into result + cycles = exp.params[PARAMS['cycles']] + ft.extract_ft_data(result, exp.path, exp.work_dir, cycles) + + # Write scheduling statistics into result + st.extract_sched_data(result, exp.path, exp.work_dir) + + with open(result_file, 'wb') as f: + pickle.dump(result, f) + except: + traceback.print_exc() + + return (exp, result) + + def get_exp_params(data_dir, cm_builder): param_file = "%s/%s" % (data_dir, DEFAULTS['params_file']) if os.path.isfile(param_file): - params = load_params(param_file) + params = com.load_params(param_file) # Store parameters in cm_builder, which will track which parameters change # across experiments @@ -83,41 +126,8 @@ def load_exps(exp_dirs, cm_builder, force): return exps -def parse_exp(exp_force): - # Tupled for multiprocessing - exp, force = exp_force - - result_file = exp.work_dir + "/exp_point.pkl" - should_load = not force and os.path.exists(result_file) - - result = None - if should_load: - with open(result_file, 'rb') as f: - try: - # No need to go through this work twice - result = pickle.load(f) - except: - pass - if not result: - try: - result = ExpPoint(exp.path) - cycles = exp.params[PARAMS['cycles']] - - # Write overheads into result - ft.extract_ft_data(result, exp.path, exp.work_dir, cycles) - - # Write scheduling statistics into result - st.extract_sched_data(result, exp.path, exp.work_dir) - - with open(result_file, 'wb') as f: - pickle.dump(result, f) - except: - traceback.print_exc() - - return (exp, result) - -def get_exps(args): +def get_dirs(args): if args: return args elif os.path.exists(DEFAULTS['out-run']): @@ -128,38 +138,32 @@ def get_exps(args): sys.stderr.write("Reading data from current directory.\n") return [os.getcwd()] -def main(): - opts, args = parse_args() - exp_dirs = get_exps(args) - - # Load exp parameters into a ColMap - builder = ColMapBuilder() - exps = load_exps(exp_dirs, builder, opts.force) - # Don't track changes in ignored parameters - if opts.ignore: - for param in opts.ignore.split(","): - builder.try_remove(param) - builder.try_remove(PARAMS['trial']) # Always average multiple trials - builder.try_remove(PARAMS['cycles']) # Only need for feather-trace parsing +def fill_table(table, exps, opts): + sys.stderr.write("Parsing data...\n") - col_map = builder.build() - result_table = TupleTable(col_map) + procs = min(len(exps), opts.processors) + logged = multiprocessing.Manager().list() - sys.stderr.write("Parsing data...\n") + pool = multiprocessing.Pool(processes=procs, + # Share a list of previously logged messages amongst processes + # This is for the com.log_once method to use + initializer=com.set_logged_list, initargs=(logged,)) - procs = min(len(exps), opts.processors) - pool = Pool(processes=procs) pool_args = zip(exps, [opts.force]*len(exps)) enum = pool.imap_unordered(parse_exp, pool_args, 1) try: for i, (exp, result) in enumerate(enum): + if not result: + continue + if opts.verbose: print(result) else: sys.stderr.write('\r {0:.2%}'.format(float(i)/len(exps))) - result_table[exp.params] += [result] + table[exp.params] += [result] + pool.close() except: pool.terminate() @@ -170,16 +174,17 @@ def main(): sys.stderr.write('\n') - if opts.force and os.path.exists(opts.out): - sh.rmtree(opts.out) - reduced_table = result_table.reduce() +def write_output(table, opts): + reduced_table = table.reduce() if opts.write_map: sys.stderr.write("Writing python map into %s...\n" % opts.out) - # Write summarized results into map reduced_table.write_map(opts.out) else: + if opts.force and os.path.exists(opts.out): + sh.rmtree(opts.out) + # Write out csv directories for all variable params dir_map = reduced_table.to_dir_map() @@ -188,12 +193,42 @@ def main(): if not opts.verbose: sys.stderr.write("Too little data to make csv files, " + "printing results.\n") - for key, exp in result_table: + for key, exp in table: for e in exp: print(e) else: sys.stderr.write("Writing csvs into %s...\n" % opts.out) dir_map.write(opts.out) + +def main(): + opts, args = parse_args() + exp_dirs = get_dirs(args) + + # Load experiment parameters into a ColMap + builder = ColMapBuilder() + exps = load_exps(exp_dirs, builder, opts.force) + + # Don't track changes in ignored parameters + if opts.ignore: + for param in opts.ignore.split(","): + builder.try_remove(param) + + # Always average multiple trials + builder.try_remove(PARAMS['trial']) + # Only need this for feather-trace parsing + builder.try_remove(PARAMS['cycles']) + + col_map = builder.build() + table = TupleTable(col_map) + + fill_table(table, exps, opts) + + if not table: + sys.stderr.write("Found no data to parse!") + sys.exit(1) + + write_output(table, opts) + if __name__ == '__main__': main() -- cgit v1.2.2