From e948e4b6a6cf1efd8d1f3d3359b1ad9891b3babc Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Wed, 17 Apr 2013 13:56:55 -0400 Subject: Cleaned up sched_trace output and code. --- parse/sched.py | 46 +++++++++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 19 deletions(-) (limited to 'parse/sched.py') diff --git a/parse/sched.py b/parse/sched.py index 1213f0d..13c7ca2 100644 --- a/parse/sched.py +++ b/parse/sched.py @@ -2,7 +2,6 @@ import config.config as conf import os import re import struct -import sys import subprocess from collections import defaultdict,namedtuple @@ -92,7 +91,12 @@ def make_iterator(fname): continue obj = rdata.clazz(*values) - yield (obj, rdata.method) + + if obj.job != 1: + yield (obj, rdata.method) + else: + # Results from the first job are nonsense + pass def read_data(task_dict, fnames): '''Read records from @fnames and store per-pid stats in @task_dict.''' @@ -147,32 +151,35 @@ register_record('ReleaseRecord', 3, process_release, 'QQ', ['release', 'when']) register_record('ParamRecord', 2, process_param, 'IIIcc2x', ['wcet','period','phase','partition', 'task_class']) -def extract_sched_data(result, data_dir, work_dir): +def create_task_dict(data_dir, work_dir = None): + '''Parse sched trace files''' bin_files = conf.FILES['sched_data'].format(".*") output_file = "%s/out-st" % work_dir - bins = ["%s/%s" % (data_dir,f) for f in os.listdir(data_dir) if re.match(bin_files, f)] - if not len(bins): - return + task_dict = defaultdict(lambda : + TaskData(None, 1, TimeTracker(), TimeTracker())) + + bin_names = [f for f in os.listdir(data_dir) if re.match(bin_files, f)] + if not len(bin_names): + return task_dict # Save an in-english version of the data for debugging # This is optional and will only be done if 'st_show' is in PATH if conf.BINS['st_show']: cmd_arr = [conf.BINS['st_show']] - cmd_arr.extend(bins) + cmd_arr.extend(bin_names) with open(output_file, "w") as f: - print("calling %s" % cmd_arr) subprocess.call(cmd_arr, cwd=data_dir, stdout=f) - task_dict = defaultdict(lambda : - TaskData(0, 0, TimeTracker(), TimeTracker())) - # Gather per-task values - read_data(task_dict, bins) + bin_paths = ["%s/%s" % (data_dir,f) for f in bin_names] + read_data(task_dict, bin_paths) - stat_data = {"avg-tard" : [], "max-tard" : [], - "avg-block" : [], "max-block" : [], - "miss-ratio" : []} + return task_dict + +def extract_sched_data(result, data_dir, work_dir): + task_dict = create_task_dict(data_dir, work_dir) + stat_data = defaultdict(list) # Group per-task values for tdata in task_dict.itervalues(): @@ -181,18 +188,19 @@ def extract_sched_data(result, data_dir, work_dir): continue miss_ratio = float(tdata.misses.num) / tdata.jobs + stat_data["miss-ratio"].append(float(tdata.misses.num) / tdata.jobs) + + stat_data["max-tard" ].append(tdata.misses.max / tdata.params.wcet) # Scale average down to account for jobs with 0 tardiness avg_tard = tdata.misses.avg * miss_ratio - - stat_data["miss-ratio"].append(miss_ratio) stat_data["avg-tard" ].append(avg_tard / tdata.params.wcet) - stat_data["max-tard" ].append(tdata.misses.max / tdata.params.wcet) + stat_data["avg-block" ].append(tdata.blocks.avg / NSEC_PER_MSEC) stat_data["max-block" ].append(tdata.blocks.max / NSEC_PER_MSEC) # Summarize value groups for name, data in stat_data.iteritems(): - if not data: + if not data or not sum(data): continue result[name] = Measurement(str(name)).from_array(data) -- cgit v1.2.2 From 2a4b1c11751632dcc1f47c3c13ab2e2a718b883c Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Wed, 17 Apr 2013 16:07:35 -0400 Subject: Fixed calculation of tardiness. --- parse/sched.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'parse/sched.py') diff --git a/parse/sched.py b/parse/sched.py index 13c7ca2..a38c61b 100644 --- a/parse/sched.py +++ b/parse/sched.py @@ -26,10 +26,13 @@ class TimeTracker: self.begin = 0 self.job = 0 - def start_time(self, record): + def start_time(self, record, time = None): '''Start duration of time.''' - self.begin = record.when - self.job = record.job + if not time: + self.begin = record.when + else: + self.begin = time + self.job = record.job # Data stored for each task TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu']) @@ -132,7 +135,8 @@ def process_completion(task_dict, record): def process_release(task_dict, record): data = task_dict[record.pid] data.jobs += 1 - data.misses.start_time(record) + if data.params: + data.misses.start_time(record, record.when + data.params.period) def process_param(task_dict, record): params = TaskParams(record.wcet, record.period, record.partition) @@ -147,7 +151,7 @@ def process_resume(task_dict, record): register_record('ResumeRecord', 9, process_resume, 'Q8x', ['when']) register_record('BlockRecord', 8, process_block, 'Q8x', ['when']) register_record('CompletionRecord', 7, process_completion, 'Q8x', ['when']) -register_record('ReleaseRecord', 3, process_release, 'QQ', ['release', 'when']) +register_record('ReleaseRecord', 3, process_release, 'QQ', ['when', 'release']) register_record('ParamRecord', 2, process_param, 'IIIcc2x', ['wcet','period','phase','partition', 'task_class']) @@ -203,4 +207,3 @@ def extract_sched_data(result, data_dir, work_dir): if not data or not sum(data): continue result[name] = Measurement(str(name)).from_array(data) - -- cgit v1.2.2 From c0405807b7f7f75fa1cf93265e6b2a739e449596 Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Thu, 18 Apr 2013 17:42:23 -0400 Subject: Switched sched_trace data to verbose ctypes structs. --- parse/sched.py | 136 +++++++++++++++++++++++++++++++++------------------------ 1 file changed, 79 insertions(+), 57 deletions(-) (limited to 'parse/sched.py') diff --git a/parse/sched.py b/parse/sched.py index a38c61b..b56324b 100644 --- a/parse/sched.py +++ b/parse/sched.py @@ -7,6 +7,7 @@ import subprocess from collections import defaultdict,namedtuple from common import recordtype from point import Measurement +from ctypes import * class TimeTracker: '''Store stats for durations of time demarcated by sched_trace records.''' @@ -38,33 +39,30 @@ class TimeTracker: TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu']) TaskData = recordtype('TaskData', ['params', 'jobs', 'blocks', 'misses']) -# Map of event ids to corresponding class, binary format, and processing methods -RecordInfo = namedtuple('RecordInfo', ['clazz', 'fmt', 'method']) -record_map = [0]*10 +# Map of event ids to corresponding class and format +record_map = {} -# Common to all records -HEADER_FORMAT = ' arecord.when: + for (i, (brecord, _)) in enumerate(buff): + if get_time(brecord) > get_time(arecord): break - buff.insert(i, (arecord, method, itera)) + buff.insert(i, (arecord, itera)) for fname in fnames: itera = make_iterator(fname) add_record(itera) while buff: - (record, method, itera) = buff.pop(0) + record, itera = buff.pop(0) add_record(itera) - method(task_dict, record) + record.process(task_dict) + +class SchedRecord(object): + # Subclasses will have their FIELDs merged into this one + FIELDS = [('type', c_uint8), ('cpu', c_uint8), + ('pid', c_uint16), ('job', c_uint32)] + + def fill(self, data): + memmove(addressof(self), data, RECORD_SIZE) + + def process(self, task_dict): + raise NotImplementedError() + +class ParamRecord(SchedRecord): + FIELDS = [('wcet', c_uint32), ('period', c_uint32), + ('phase', c_uint32), ('partition', c_uint8)] + + def process(self, task_dict): + params = TaskParams(self.wcet, self.period, self.partition) + task_dict[self.pid].params = params + +class ReleaseRecord(SchedRecord): + FIELDS = [('when', c_uint64), ('release', c_uint64)] + + def process(self, task_dict): + data = task_dict[self.pid] + data.jobs += 1 + if data.params: + data.misses.start_time(self, self.when + data.params.period) + +class CompletionRecord(SchedRecord): + FIELDS = [('when', c_uint64)] -def process_completion(task_dict, record): - task_dict[record.pid].misses.store_time(record) + def process(self, task_dict): + task_dict[self.pid].misses.store_time(self) -def process_release(task_dict, record): - data = task_dict[record.pid] - data.jobs += 1 - if data.params: - data.misses.start_time(record, record.when + data.params.period) +class BlockRecord(SchedRecord): + FIELDS = [('when', c_uint64)] -def process_param(task_dict, record): - params = TaskParams(record.wcet, record.period, record.partition) - task_dict[record.pid].params = params + def process(self, task_dict): + task_dict[self.pid].blocks.start_time(self) -def process_block(task_dict, record): - task_dict[record.pid].blocks.start_time(record) +class ResumeRecord(SchedRecord): + FIELDS = [('when', c_uint64)] -def process_resume(task_dict, record): - task_dict[record.pid].blocks.store_time(record) + def process(self, task_dict): + task_dict[self.pid].blocks.store_time(self) -register_record('ResumeRecord', 9, process_resume, 'Q8x', ['when']) -register_record('BlockRecord', 8, process_block, 'Q8x', ['when']) -register_record('CompletionRecord', 7, process_completion, 'Q8x', ['when']) -register_record('ReleaseRecord', 3, process_release, 'QQ', ['when', 'release']) -register_record('ParamRecord', 2, process_param, 'IIIcc2x', - ['wcet','period','phase','partition', 'task_class']) +# Map records to sched_trace ids (see include/litmus/sched_trace.h +register_record(2, ParamRecord) +register_record(3, ReleaseRecord) +register_record(7, CompletionRecord) +register_record(8, BlockRecord) +register_record(9, ResumeRecord) def create_task_dict(data_dir, work_dir = None): '''Parse sched trace files''' -- cgit v1.2.2 From 7545402506aa76261e18d85af585ff0ac1cf05c1 Mon Sep 17 00:00:00 2001 From: Jonathan Herman Date: Tue, 23 Apr 2013 14:01:35 -0400 Subject: Improved accuracy of sched_trace measurement parsing. * Measurements from tasks missing > 20% of their scheduling records are ignored. This is configurable in config/config.py. * Measurements which only have zero values are ignored. * If either of these 2 situations are encountered print out a message the first time using the common.log_once() method. See parse_exps.py for how this is used with multiple threads. * Measurements from a task's last job are ignored. * Miss ratio is calculated only as a fraction of the number of jobs whose matching release and completion records were found, not just release. --- parse/sched.py | 84 +++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 60 insertions(+), 24 deletions(-) (limited to 'parse/sched.py') diff --git a/parse/sched.py b/parse/sched.py index b56324b..4933037 100644 --- a/parse/sched.py +++ b/parse/sched.py @@ -5,35 +5,55 @@ import struct import subprocess from collections import defaultdict,namedtuple -from common import recordtype +from common import recordtype,log_once from point import Measurement from ctypes import * class TimeTracker: '''Store stats for durations of time demarcated by sched_trace records.''' def __init__(self): - self.begin = self.avg = self.max = self.num = self.job = 0 + self.begin = self.avg = self.max = self.num = self.next_job = 0 - def store_time(self, record): + # Count of times the job in start_time matched that in store_time + self.matches = 0 + # And the times it didn't + self.disjoints = 0 + + # Measurements are recorded in store_ time using the previous matching + # record which was passed to store_time. This way, the last record for + # any task is always skipped + self.last_record = None + + def store_time(self, next_record): '''End duration of time.''' - dur = record.when - self.begin + dur = (self.last_record.when - self.begin) if self.last_record else -1 - if self.job == record.job and dur > 0: - self.max = max(self.max, dur) - self.avg *= float(self.num / (self.num + 1)) - self.num += 1 - self.avg += dur / float(self.num) + if self.next_job == next_record.job: + self.last_record = next_record - self.begin = 0 - self.job = 0 + if self.last_record: + self.matches += 1 + + if dur > 0: + self.max = max(self.max, dur) + self.avg *= float(self.num / (self.num + 1)) + self.num += 1 + self.avg += dur / float(self.num) + + self.begin = 0 + self.next_job = 0 + else: + self.disjoints += 1 def start_time(self, record, time = None): '''Start duration of time.''' - if not time: - self.begin = record.when - else: - self.begin = time - self.job = record.job + if self.last_record: + if not time: + self.begin = self.last_record.when + else: + self.begin = time + + self.next_job = record.job # Data stored for each task TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu']) @@ -203,6 +223,12 @@ def create_task_dict(data_dir, work_dir = None): return task_dict +LOSS_MSG = """Found task missing more than %d%% of its scheduling records. +These won't be included in scheduling statistics!"""%(100*conf.MAX_RECORD_LOSS) +SKIP_MSG = """Measurement '%s' has no non-zero values. +Measurements like these are not included in scheduling statistics. +If a measurement is missing, this is why.""" + def extract_sched_data(result, data_dir, work_dir): task_dict = create_task_dict(data_dir, work_dir) stat_data = defaultdict(list) @@ -213,19 +239,29 @@ def extract_sched_data(result, data_dir, work_dir): # Currently unknown where these invalid tasks come from... continue - miss_ratio = float(tdata.misses.num) / tdata.jobs - stat_data["miss-ratio"].append(float(tdata.misses.num) / tdata.jobs) + miss = tdata.misses + + record_loss = float(miss.disjoints)/(miss.matches + miss.disjoints) + stat_data["record-loss"].append(record_loss) + + if record_loss > conf.MAX_RECORD_LOSS: + log_once(LOSS_MSG) + continue + + miss_ratio = float(miss.num) / miss.matches + avg_tard = miss.avg * miss_ratio + + stat_data["miss-ratio" ].append(miss_ratio) - stat_data["max-tard" ].append(tdata.misses.max / tdata.params.wcet) - # Scale average down to account for jobs with 0 tardiness - avg_tard = tdata.misses.avg * miss_ratio - stat_data["avg-tard" ].append(avg_tard / tdata.params.wcet) + stat_data["max-tard"].append(miss.max / tdata.params.period) + stat_data["avg-tard"].append(avg_tard / tdata.params.period) - stat_data["avg-block" ].append(tdata.blocks.avg / NSEC_PER_MSEC) - stat_data["max-block" ].append(tdata.blocks.max / NSEC_PER_MSEC) + stat_data["avg-block"].append(tdata.blocks.avg / NSEC_PER_MSEC) + stat_data["max-block"].append(tdata.blocks.max / NSEC_PER_MSEC) # Summarize value groups for name, data in stat_data.iteritems(): if not data or not sum(data): + log_once(SKIP_MSG, SKIP_MSG % name) continue result[name] = Measurement(str(name)).from_array(data) -- cgit v1.2.2