From 69bdd9a6095fbd85cc90447862cc898c3391b3a7 Mon Sep 17 00:00:00 2001 From: Glenn Elliott Date: Fri, 17 Jan 2014 20:18:08 -0500 Subject: parse_exps.py: Make data analysis more accurate. This patch makes the data analysis of parse_exps.py. The old code suffered from fragility---logs that were out of order lead to many dropped records. This is because it had a look-ahead buffer of 1. This patch does: 1) Maintains buffers of unmatched records. 2) Preloads 200 records from every sched stream and processes them in sorted order. This patch does increase runtime, but prior results could be useless for many trace logs. --- parse/sched.py | 236 +++++++++++++++++++++++++++++++-------------------------- 1 file changed, 128 insertions(+), 108 deletions(-) (limited to 'parse/sched.py') diff --git a/parse/sched.py b/parse/sched.py index 652378b..0ab16ce 100644 --- a/parse/sched.py +++ b/parse/sched.py @@ -9,53 +9,76 @@ from common import recordtype,log_once from point import Measurement from ctypes import * +from heapq import * + class TimeTracker: '''Store stats for durations of time demarcated by sched_trace records.''' - def __init__(self, allow_negative = False): - self.begin = self.avg = self.max = self.num = self.next_job = 0 + def __init__(self, is_valid_duration = lambda x: True, delay_buffer_size = 1, max_pending = 100): + self.validator = is_valid_duration + self.avg = self.max = self.num = 0 - # Count of times the job in start_time matched that in store_time self.matches = 0 - # And the times it didn't - self.disjoints = 0 - - self.allow_negative = allow_negative - - # Measurements are recorded in store_ time using the previous matching - # record which was passed to store_time. This way, the last record for - # any task is always skipped - self.last_record = None - - def store_time(self, next_record): - '''End duration of time.''' - dur = (self.last_record.when - self.begin) if self.last_record else -1 - - if self.next_job == next_record.job: - self.last_record = next_record - if self.last_record: - self.matches += 1 - - if dur > 0 or self.allow_negative: - self.max = max(self.max, dur) + self.max_pending = max_pending + self.discarded = 0 + + self.delay_buffer_size = delay_buffer_size + self.start_delay_buffer = [] + self.end_delay_buffer = [] + self.start_records = {} + self.end_records = {} + + def disjoints(self): + unmatched = len(self.start_records) + len(self.end_records) + return self.discarded + unmatched + + def process_completed(self): + completed = self.start_records.viewkeys() & self.end_records.viewkeys() + self.matches += len(completed) + for c in completed: + s, stime = self.start_records[c] + e, etime = self.end_records[c] + del self.start_records[c] + del self.end_records[c] + + dur = etime - stime + if self.validator(dur): + self.max = max(self.max, dur) self.avg *= float(self.num / (self.num + 1)) self.num += 1 self.avg += dur / float(self.num) - self.begin = 0 - self.next_job = 0 - else: - self.disjoints += 1 + # Give up on some jobs if they've been hanging around too long. + # While not strictly needed, it helps improve performance and + # it is unlikey to cause too much trouble. + if(len(self.start_records) > self.max_pending): + to_discard = len(self.start_records) - self.max_pending + for i in range(to_discard): + # pop off the oldest jobs + del self.start_records[self.start_records.iterkeys().next()] + self.discarded += to_discard + if(len(self.end_records) > self.max_pending): + to_discard = len(self.end_records) - self.max_pending + for i in range(to_discard): + # pop off the oldest jobs + del self.end_records[self.end_records.iterkeys().next()] + self.discarded += to_discard + + def end_time(self, record, time): + '''End duration of time.''' + if len(self.end_delay_buffer) == self.delay_buffer_size: + to_queue = self.end_delay_buffer.pop(0) + self.end_records[to_queue[0].job] = to_queue + self.end_delay_buffer.append((record, time)) + self.process_completed() - def start_time(self, record, time = None): + def start_time(self, record, time): '''Start duration of time.''' - if self.last_record: - if not time: - self.begin = self.last_record.when - else: - self.begin = time - - self.next_job = record.job + if len(self.start_delay_buffer) == self.delay_buffer_size: + to_queue = self.start_delay_buffer.pop(0) + self.start_records[to_queue[0].job] = to_queue + self.start_delay_buffer.append((record, time)) + self.process_completed() # Data stored for each task TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu']) @@ -152,7 +175,12 @@ def make_iterator(fname): def read_data(task_dict, graph_dict, fnames): '''Read records from @fnames and store per-pid stats in @task_dict.''' - buff = [] + q = [] + + # Number of trace records to buffer from each stream/file. + # A sorted window is maintained in order to deal with + # events that were recorded out-of-order. + window_size = 500 def get_time(record): return record.when if hasattr(record, 'when') else 0 @@ -164,19 +192,18 @@ def read_data(task_dict, graph_dict, fnames): except StopIteration: return - i = 0 - for (i, (brecord, _)) in enumerate(buff): - if get_time(brecord) > get_time(arecord): - break - buff.insert(i, (arecord, itera)) + sort_key = (get_time(arecord), arecord.job, arecord.pid) + heappush(q, (sort_key, arecord, itera)) for fname in fnames: itera = make_iterator(fname) - add_record(itera) - - while buff: - record, itera = buff.pop(0) + for w in range(window_size): + add_record(itera) + sys_released = False + while q: + sort_key, record, itera = heappop(q) + # fetch another record add_record(itera) record.process(task_dict) record.process_pgm(task_dict, graph_dict) @@ -205,7 +232,7 @@ class ParamRecord(SchedRecord): task_dict[self.pid].params = params class ReleaseRecord(SchedRecord): - # renames the 'release' field to 'when' + # renames the 'release' field to 'when' to enable sorting FIELDS = [('when', c_uint64), ('deadline', c_uint64)] def process(self, task_dict): @@ -215,18 +242,15 @@ class ReleaseRecord(SchedRecord): data.misses.start_time(self, self.deadline) data.lateness.start_time(self, self.deadline) - print ' release %d: r=%d d=%d' % (self.pid, self.when, self.deadline) - def process_pgm(self, task_dict, graph_dict): data = task_dict[self.pid] data.pgm_response.start_time(self, self.when) data.pgm_misses.start_time(self, self.deadline) data.pgm_lateness.start_time(self, self.deadline) - return ntype = task_dict[self.pid].pgm_params.node_type if ntype == PGM_SRC or ntype == PGM_SRC_SINK: - gid = task_dict[self.pid].pgm_params.graph_pid + gid = task_dict[self.pid].pgm_params.gid gdata = graph_dict[gid] gdata.jobs += 1 gdata.response.start_time(self, self.when) @@ -236,34 +260,34 @@ class CompletionRecord(SchedRecord): def process(self, task_dict): data = task_dict[self.pid] - data.response.store_time(self) - data.misses.store_time(self) - data.lateness.store_time(self) + data.response.end_time(self, self.when) + data.misses.end_time(self, self.when) + data.lateness.end_time(self, self.when) def process_pgm(self, task_dict, graph_dict): data = task_dict[self.pid] - data.pgm_response.store_time(self) - data.pgm_misses.store_time(self) - data.pgm_lateness.store_time(self) + data.pgm_response.end_time(self, self.when) + data.pgm_misses.end_time(self, self.when) + data.pgm_lateness.end_time(self, self.when) - return - ntype = data.pgm_params.node_type - if ntype == PGM_SINK or ntype == PGM_SRC_SINK: - gid = data.pgm_params.graph_pid - gdata = graph_dict[gid] - gdata.response.store_time(self) + if data.pgm_params: + ntype = data.pgm_params.node_type + if ntype == PGM_SINK or ntype == PGM_SRC_SINK: + gid = data.pgm_params.gid + gdata = graph_dict[gid] + gdata.response.end_time(self, self.when) class BlockRecord(SchedRecord): FIELDS = [('when', c_uint64)] def process(self, task_dict): - task_dict[self.pid].blocks.start_time(self) + task_dict[self.pid].blocks.start_time(self, self.when) class ResumeRecord(SchedRecord): FIELDS = [('when', c_uint64)] def process(self, task_dict): - task_dict[self.pid].blocks.store_time(self) + task_dict[self.pid].blocks.end_time(self, self.when) class SysReleaseRecord(SchedRecord): FIELDS = [('when', c_uint64), ('release', c_uint64)] @@ -278,12 +302,9 @@ class PgmParamRecord(SchedRecord): pass def process_pgm(self, task_dict, graph_dict): - pgm_params = PgmTaskParams(self.node_type, self.graph_pid) task_dict[self.pid].pgm_params = pgm_params - print '%d: graph id = %d, node type = %d' % (self.pid, self.graph_pid, self.node_type) - if self.node_type == PGM_SRC or self.node_type == PGM_SINK or self.node_type == PGM_SRC_SINK: graph_data = graph_dict[self.graph_pid] if not graph_data.params: @@ -313,15 +334,12 @@ class PgmReleaseRecord(SchedRecord): data.pgm_misses.start_time(self, self.deadline) data.pgm_lateness.start_time(self, self.deadline) - print 'pgm_release %d: r=%d d=%d' % (self.pid, self.when, self.deadline) - - return - - ntype = data.pgm_params.node_type - if ntype == PGM_SRC or ntype == PGM_SRC_SINK: - gid = data.pgm_params.graph_pid - gdata = graph_dict[gid] - gdata.response.start_time(self, self.when) + if data.pgm_params: + ntype = data.pgm_params.node_type + if ntype == PGM_SRC or ntype == PGM_SRC_SINK: + gid = data.pgm_params.graph_pid + gdata = graph_dict[gid] + gdata.response.start_time(self, self.when) # Map records to sched_trace ids (see include/litmus/sched_trace.h register_record(2, ParamRecord) @@ -339,11 +357,11 @@ def create_trace_dict(data_dir, work_dir = None): output_file = "%s/out-st" % work_dir task_dict = defaultdict(lambda : - TaskData(None, None, 1, TimeTracker(), - TimeTracker(), TimeTracker(True), TimeTracker(), - TimeTracker(), TimeTracker(True), TimeTracker())) + TaskData(None, None, 1, TimeTracker(is_valid_duration = lambda x: x > 0), + TimeTracker(), TimeTracker(), TimeTracker(is_valid_duration = lambda x: x > 0), + TimeTracker(), TimeTracker(), TimeTracker(is_valid_duration = lambda x: x > 0))) graph_dict = defaultdict(lambda: - GraphData(None, 1, TimeTracker())) + GraphData(None, 1, TimeTracker(is_valid_duration = lambda x: x > 0))) bin_names = [f for f in os.listdir(data_dir) if re.match(bin_files, f)] if not len(bin_names): @@ -375,7 +393,7 @@ def extract_sched_data(result, data_dir, work_dir): gstat_data = defaultdict(list) # Group per-task values - for tdata in task_dict.itervalues(): + for task, tdata in task_dict.iteritems(): if not tdata.params: # Currently unknown where these invalid tasks come from... continue @@ -387,7 +405,7 @@ def extract_sched_data(result, data_dir, work_dir): pgm_lateness = tdata.pgm_lateness pgm_miss = tdata.pgm_misses - record_loss = float(miss.disjoints)/(miss.matches + miss.disjoints) + record_loss = float(miss.disjoints())/(miss.matches + miss.disjoints()) stat_data["record-loss"].append(record_loss) if record_loss > conf.MAX_RECORD_LOSS: @@ -404,57 +422,59 @@ def extract_sched_data(result, data_dir, work_dir): # start with basic task information stat_data["miss-ratio" ].append(miss_ratio) - stat_data["max-response"].append(response.max) - stat_data["avg-response"].append(response.avg) - stat_data["max-response-prop"].append(response.max / tdata.params.period) + stat_data["max-response"].append(float(response.max)/NSEC_PER_MSEC) + stat_data["avg-response"].append(response.avg/NSEC_PER_MSEC) + stat_data["max-response-prop"].append(float(response.max) / tdata.params.period) stat_data["avg-response-prop"].append(response.avg / tdata.params.period) - stat_data["max-tard"].append(miss.max) - stat_data["avg-tard"].append(avg_tard) - stat_data["max-tard-prop"].append(miss.max / tdata.params.period) + stat_data["max-tard"].append(float(miss.max)/NSEC_PER_MSEC) + stat_data["avg-tard"].append(avg_tard/NSEC_PER_MSEC) + stat_data["max-tard-prop"].append(float(miss.max) / tdata.params.period) stat_data["avg-tard-prop"].append(avg_tard / tdata.params.period) - stat_data["max-response"].append(lateness.max) - stat_data["avg-response"].append(lateness.avg) - stat_data["max-response-prop"].append(lateness.max / tdata.params.period) + stat_data["max-response"].append(float(lateness.max)/NSEC_PER_MSEC) + stat_data["avg-response"].append(lateness.avg/NSEC_PER_MSEC) + stat_data["max-response-prop"].append(float(lateness.max) / tdata.params.period) stat_data["avg-response-prop"].append(lateness.avg / tdata.params.period) # same data, but with PGM-adjusted release times (shifted deadlines) stat_data["pgm-miss-ratio" ].append(pgm_miss_ratio) - stat_data["pgm-max-response"].append(pgm_response.max) - stat_data["pgm-avg-response"].append(pgm_response.avg) - stat_data["pgm-max-response-prop"].append(pgm_response.max / tdata.params.period) + stat_data["pgm-max-response"].append(float(pgm_response.max)/NSEC_PER_MSEC) + stat_data["pgm-avg-response"].append(pgm_response.avg/NSEC_PER_MSEC) + stat_data["pgm-max-response-prop"].append(float(pgm_response.max) / tdata.params.period) stat_data["pgm-avg-response-prop"].append(pgm_response.avg / tdata.params.period) - stat_data["pgm-max-tard"].append(pgm_miss.max) - stat_data["pgm-avg-tard"].append(pgm_avg_tard) - stat_data["pgm-max-tard-prop"].append(pgm_miss.max / tdata.params.period) + stat_data["pgm-max-tard"].append(float(pgm_miss.max)/NSEC_PER_MSEC) + stat_data["pgm-avg-tard"].append(pgm_avg_tard/NSEC_PER_MSEC) + stat_data["pgm-max-tard-prop"].append(float(pgm_miss.max) / tdata.params.period) stat_data["pgm-avg-tard-prop"].append(pgm_avg_tard / tdata.params.period) - stat_data["pgm-max-response"].append(pgm_lateness.max) - stat_data["pgm-avg-response"].append(pgm_lateness.avg) - stat_data["pgm-max-response-prop"].append(pgm_lateness.max / tdata.params.period) + stat_data["pgm-max-response"].append(float(pgm_lateness.max)/NSEC_PER_MSEC) + stat_data["pgm-avg-response"].append(pgm_lateness.avg/NSEC_PER_MSEC) + stat_data["pgm-max-response-prop"].append(float(pgm_lateness.max) / tdata.params.period) stat_data["pgm-avg-response-prop"].append(pgm_lateness.avg / tdata.params.period) - for gdata in graph_dict.itervalues(): + for gid, gdata in graph_dict.iteritems(): if not gdata.params: continue - response = gdata.response - record_loss = float(response.disjoints)/(response.matches + response.disjoints) + if response.matches + response.disjoints() == 0: + record_loss = 0 + else: + record_loss = float(response.disjoints())/(response.matches + response.disjoints()) gstat_data["graph-record-loss"].append(record_loss) - if record_los > conf.MAX_RECORD_LOSS: + if record_loss > conf.MAX_RECORD_LOSS: log_once(LOSS_MSG) continue - gstat_data["graph-max-response"].append(response.max) - gstat_data["graph-avg-response"].append(response.avg) + gstat_data["graph-max-response"].append(float(response.max)/NSEC_PER_MSEC) + gstat_data["graph-avg-response"].append(response.avg/NSEC_PER_MSEC) # Summarize value groups for name, data in stat_data.iteritems(): - if not data or not sum(data): + if not data: log_once(SKIP_MSG, SKIP_MSG % name) continue result[name] = Measurement(str(name)).from_array(data) -- cgit v1.2.2