From d90a8d25b1026ea41e4cd3041ad2ba03732b99a3 Mon Sep 17 00:00:00 2001 From: Glenn Elliott Date: Fri, 17 Jan 2014 16:57:20 -0500 Subject: Support PGM-based sched trace analysis --- parse/sched.py | 211 +++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 192 insertions(+), 19 deletions(-) (limited to 'parse/sched.py') diff --git a/parse/sched.py b/parse/sched.py index a25ec15..652378b 100644 --- a/parse/sched.py +++ b/parse/sched.py @@ -11,7 +11,7 @@ from ctypes import * class TimeTracker: '''Store stats for durations of time demarcated by sched_trace records.''' - def __init__(self): + def __init__(self, allow_negative = False): self.begin = self.avg = self.max = self.num = self.next_job = 0 # Count of times the job in start_time matched that in store_time @@ -19,6 +19,8 @@ class TimeTracker: # And the times it didn't self.disjoints = 0 + self.allow_negative = allow_negative + # Measurements are recorded in store_ time using the previous matching # record which was passed to store_time. This way, the last record for # any task is always skipped @@ -34,7 +36,7 @@ class TimeTracker: if self.last_record: self.matches += 1 - if dur > 0: + if dur > 0 or self.allow_negative: self.max = max(self.max, dur) self.avg *= float(self.num / (self.num + 1)) self.num += 1 @@ -56,8 +58,19 @@ class TimeTracker: self.next_job = record.job # Data stored for each task -TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu']) -TaskData = recordtype('TaskData', ['params', 'jobs', 'blocks', 'misses']) +TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu']) +PgmTaskParams = namedtuple('PgmTaskParams', ['node_type', 'gid']) +TaskData = recordtype('TaskData', ['params', 'pgm_params', 'jobs', 'blocks', + 'response', 'lateness', 'misses', + 'pgm_response', 'pgm_lateness', 'pgm_misses']) +GraphParams = recordtype('GraphParams', ['gid', 'src', 'sink']) +GraphData = recordtype('GraphData', ['params', 'jobs', 'response']) + +PGM_NOT_A_NODE = 0 +PGM_SRC = 1 +PGM_SINK = 2 +PGM_SRC_SINK = 3 +PGM_INTERNAL = 4 # Map of event ids to corresponding class and format record_map = {} @@ -137,7 +150,7 @@ def make_iterator(fname): # Results from the first job are nonsense pass -def read_data(task_dict, fnames): +def read_data(task_dict, graph_dict, fnames): '''Read records from @fnames and store per-pid stats in @task_dict.''' buff = [] @@ -166,6 +179,7 @@ def read_data(task_dict, fnames): add_record(itera) record.process(task_dict) + record.process_pgm(task_dict, graph_dict) class SchedRecord(object): # Subclasses will have their FIELDs merged into this one @@ -178,28 +192,66 @@ class SchedRecord(object): def process(self, task_dict): raise NotImplementedError() + def process_pgm(self, task_dict, graph_dict): + pass + class ParamRecord(SchedRecord): FIELDS = [('wcet', c_uint32), ('period', c_uint32), - ('phase', c_uint32), ('partition', c_uint8)] + ('phase', c_uint32), ('partition', c_uint8), + ('class', c_uint8)] def process(self, task_dict): params = TaskParams(self.wcet, self.period, self.partition) task_dict[self.pid].params = params class ReleaseRecord(SchedRecord): - FIELDS = [('when', c_uint64), ('release', c_uint64)] + # renames the 'release' field to 'when' + FIELDS = [('when', c_uint64), ('deadline', c_uint64)] def process(self, task_dict): data = task_dict[self.pid] data.jobs += 1 - if data.params: - data.misses.start_time(self, self.when + data.params.period) + data.response.start_time(self, self.when) + data.misses.start_time(self, self.deadline) + data.lateness.start_time(self, self.deadline) + + print ' release %d: r=%d d=%d' % (self.pid, self.when, self.deadline) + + def process_pgm(self, task_dict, graph_dict): + data = task_dict[self.pid] + data.pgm_response.start_time(self, self.when) + data.pgm_misses.start_time(self, self.deadline) + data.pgm_lateness.start_time(self, self.deadline) + + return + ntype = task_dict[self.pid].pgm_params.node_type + if ntype == PGM_SRC or ntype == PGM_SRC_SINK: + gid = task_dict[self.pid].pgm_params.graph_pid + gdata = graph_dict[gid] + gdata.jobs += 1 + gdata.response.start_time(self, self.when) class CompletionRecord(SchedRecord): FIELDS = [('when', c_uint64)] def process(self, task_dict): - task_dict[self.pid].misses.store_time(self) + data = task_dict[self.pid] + data.response.store_time(self) + data.misses.store_time(self) + data.lateness.store_time(self) + + def process_pgm(self, task_dict, graph_dict): + data = task_dict[self.pid] + data.pgm_response.store_time(self) + data.pgm_misses.store_time(self) + data.pgm_lateness.store_time(self) + + return + ntype = data.pgm_params.node_type + if ntype == PGM_SINK or ntype == PGM_SRC_SINK: + gid = data.pgm_params.graph_pid + gdata = graph_dict[gid] + gdata.response.store_time(self) class BlockRecord(SchedRecord): FIELDS = [('when', c_uint64)] @@ -213,20 +265,85 @@ class ResumeRecord(SchedRecord): def process(self, task_dict): task_dict[self.pid].blocks.store_time(self) +class SysReleaseRecord(SchedRecord): + FIELDS = [('when', c_uint64), ('release', c_uint64)] + + def process(self, task_dict): + pass + +class PgmParamRecord(SchedRecord): + FIELDS = [('node_type', c_uint32), ('graph_pid', c_uint16)] + + def process(self, task_dict): + pass + + def process_pgm(self, task_dict, graph_dict): + + pgm_params = PgmTaskParams(self.node_type, self.graph_pid) + task_dict[self.pid].pgm_params = pgm_params + + print '%d: graph id = %d, node type = %d' % (self.pid, self.graph_pid, self.node_type) + + if self.node_type == PGM_SRC or self.node_type == PGM_SINK or self.node_type == PGM_SRC_SINK: + graph_data = graph_dict[self.graph_pid] + if not graph_data.params: + graph_data.params = GraphParams(self.graph_pid, 0, 0) + if self.node_type == PGM_SRC: + assert graph_data.params.src == 0 + graph_data.params.src = self.pid + elif self.node_type == PGM_SINK: + assert graph_data.params.sink == 0 + graph_data.params.sink = self.pid + else: + assert graph_data.params.src == 0 + assert graph_data.params.sink == 0 + graph_data.params.src = self.pid + graph_data.params.sink = self.pid + +class PgmReleaseRecord(SchedRecord): + # renames the 'release' field to 'when' + FIELDS = [('when', c_uint64), ('deadline', c_uint64)] + + def process(self, task_dict): + pass + + def process_pgm(self, task_dict, graph_dict): + data = task_dict[self.pid] + data.pgm_response.start_time(self, self.when) + data.pgm_misses.start_time(self, self.deadline) + data.pgm_lateness.start_time(self, self.deadline) + + print 'pgm_release %d: r=%d d=%d' % (self.pid, self.when, self.deadline) + + return + + ntype = data.pgm_params.node_type + if ntype == PGM_SRC or ntype == PGM_SRC_SINK: + gid = data.pgm_params.graph_pid + gdata = graph_dict[gid] + gdata.response.start_time(self, self.when) + # Map records to sched_trace ids (see include/litmus/sched_trace.h register_record(2, ParamRecord) register_record(3, ReleaseRecord) register_record(7, CompletionRecord) register_record(8, BlockRecord) register_record(9, ResumeRecord) +register_record(11, SysReleaseRecord) +register_record(12, PgmParamRecord) +register_record(13, PgmReleaseRecord) -def create_task_dict(data_dir, work_dir = None): +def create_trace_dict(data_dir, work_dir = None): '''Parse sched trace files''' bin_files = conf.FILES['sched_data'].format(".*") output_file = "%s/out-st" % work_dir task_dict = defaultdict(lambda : - TaskData(None, 1, TimeTracker(), TimeTracker())) + TaskData(None, None, 1, TimeTracker(), + TimeTracker(), TimeTracker(True), TimeTracker(), + TimeTracker(), TimeTracker(True), TimeTracker())) + graph_dict = defaultdict(lambda: + GraphData(None, 1, TimeTracker())) bin_names = [f for f in os.listdir(data_dir) if re.match(bin_files, f)] if not len(bin_names): @@ -242,9 +359,9 @@ def create_task_dict(data_dir, work_dir = None): # Gather per-task values bin_paths = ["%s/%s" % (data_dir,f) for f in bin_names] - read_data(task_dict, bin_paths) + read_data(task_dict, graph_dict, bin_paths) - return task_dict + return task_dict, graph_dict LOSS_MSG = """Found task missing more than %d%% of its scheduling records. These won't be included in scheduling statistics!"""%(100*conf.MAX_RECORD_LOSS) @@ -253,8 +370,9 @@ Measurements like these are not included in scheduling statistics. If a measurement is missing, this is why.""" def extract_sched_data(result, data_dir, work_dir): - task_dict = create_task_dict(data_dir, work_dir) + task_dict, graph_dict = create_trace_dict(data_dir, work_dir) stat_data = defaultdict(list) + gstat_data = defaultdict(list) # Group per-task values for tdata in task_dict.itervalues(): @@ -262,7 +380,12 @@ def extract_sched_data(result, data_dir, work_dir): # Currently unknown where these invalid tasks come from... continue + lateness = tdata.lateness + response = tdata.response miss = tdata.misses + pgm_response = tdata.pgm_response + pgm_lateness = tdata.pgm_lateness + pgm_miss = tdata.pgm_misses record_loss = float(miss.disjoints)/(miss.matches + miss.disjoints) stat_data["record-loss"].append(record_loss) @@ -272,15 +395,62 @@ def extract_sched_data(result, data_dir, work_dir): continue miss_ratio = float(miss.num) / miss.matches + pgm_miss_ratio = float(pgm_miss.num) / pgm_miss.matches + + # average job tardy by: avg_tard = miss.avg * miss_ratio + pgm_avg_tard = pgm_miss.avg * pgm_miss_ratio + # start with basic task information stat_data["miss-ratio" ].append(miss_ratio) - stat_data["max-tard"].append(miss.max / tdata.params.period) - stat_data["avg-tard"].append(avg_tard / tdata.params.period) + stat_data["max-response"].append(response.max) + stat_data["avg-response"].append(response.avg) + stat_data["max-response-prop"].append(response.max / tdata.params.period) + stat_data["avg-response-prop"].append(response.avg / tdata.params.period) + + stat_data["max-tard"].append(miss.max) + stat_data["avg-tard"].append(avg_tard) + stat_data["max-tard-prop"].append(miss.max / tdata.params.period) + stat_data["avg-tard-prop"].append(avg_tard / tdata.params.period) + + stat_data["max-response"].append(lateness.max) + stat_data["avg-response"].append(lateness.avg) + stat_data["max-response-prop"].append(lateness.max / tdata.params.period) + stat_data["avg-response-prop"].append(lateness.avg / tdata.params.period) + + # same data, but with PGM-adjusted release times (shifted deadlines) + stat_data["pgm-miss-ratio" ].append(pgm_miss_ratio) + + stat_data["pgm-max-response"].append(pgm_response.max) + stat_data["pgm-avg-response"].append(pgm_response.avg) + stat_data["pgm-max-response-prop"].append(pgm_response.max / tdata.params.period) + stat_data["pgm-avg-response-prop"].append(pgm_response.avg / tdata.params.period) + + stat_data["pgm-max-tard"].append(pgm_miss.max) + stat_data["pgm-avg-tard"].append(pgm_avg_tard) + stat_data["pgm-max-tard-prop"].append(pgm_miss.max / tdata.params.period) + stat_data["pgm-avg-tard-prop"].append(pgm_avg_tard / tdata.params.period) + + stat_data["pgm-max-response"].append(pgm_lateness.max) + stat_data["pgm-avg-response"].append(pgm_lateness.avg) + stat_data["pgm-max-response-prop"].append(pgm_lateness.max / tdata.params.period) + stat_data["pgm-avg-response-prop"].append(pgm_lateness.avg / tdata.params.period) + + for gdata in graph_dict.itervalues(): + if not gdata.params: + continue + + response = gdata.response + record_loss = float(response.disjoints)/(response.matches + response.disjoints) + gstat_data["graph-record-loss"].append(record_loss) - stat_data["avg-block"].append(tdata.blocks.avg / NSEC_PER_MSEC) - stat_data["max-block"].append(tdata.blocks.max / NSEC_PER_MSEC) + if record_los > conf.MAX_RECORD_LOSS: + log_once(LOSS_MSG) + continue + + gstat_data["graph-max-response"].append(response.max) + gstat_data["graph-avg-response"].append(response.avg) # Summarize value groups for name, data in stat_data.iteritems(): @@ -288,3 +458,6 @@ def extract_sched_data(result, data_dir, work_dir): log_once(SKIP_MSG, SKIP_MSG % name) continue result[name] = Measurement(str(name)).from_array(data) + + for name, data in gstat_data.iteritems(): + result[name] = Measurement(str(name)).from_array(data) -- cgit v1.2.2