import config.config as conf import os import re import struct import subprocess from collections import defaultdict,namedtuple from common import recordtype,log_once from point import Measurement from ctypes import * from heapq import * class TimeTracker: '''Store stats for durations of time demarcated by sched_trace records.''' def __init__(self, is_valid_duration = lambda x: True, delay_buffer_size = 1, max_pending = -1): self.validator = is_valid_duration self.avg = self.max = self.num = 0 self.matches = 0 self.max_pending = max_pending self.discarded = 0 self.delay_buffer_size = delay_buffer_size self.start_delay_buffer = [] self.end_delay_buffer = [] self.start_records = {} self.end_records = {} def disjoints(self): unmatched = len(self.start_records) + len(self.end_records) return self.discarded + unmatched def process_completed(self): completed = self.start_records.viewkeys() & self.end_records.viewkeys() self.matches += len(completed) for c in completed: s, stime = self.start_records[c] e, etime = self.end_records[c] del self.start_records[c] del self.end_records[c] dur = etime - stime if self.validator(dur): self.max = max(self.max, dur) old_avg = self.avg * self.num self.num += 1 self.avg = (old_avg + dur) / float(self.num) # Give up on some jobs if they've been hanging around too long. # While not strictly needed, it helps improve performance and # it is unlikey to cause too much trouble. if(self.max_pending >= 0 and len(self.start_records) > self.max_pending): to_discard = len(self.start_records) - self.max_pending for i in range(to_discard): # pop off the oldest jobs del self.start_records[self.start_records.iterkeys().next()] self.discarded += to_discard if(self.max_pending >= 0 and len(self.end_records) > self.max_pending): to_discard = len(self.end_records) - self.max_pending for i in range(to_discard): # pop off the oldest jobs del self.end_records[self.end_records.iterkeys().next()] self.discarded += to_discard def end_time(self, record, time): '''End duration of time.''' if len(self.end_delay_buffer) == self.delay_buffer_size: to_queue = self.end_delay_buffer.pop(0) self.end_records[to_queue[0].job] = to_queue self.end_delay_buffer.append((record, time)) self.process_completed() def start_time(self, record, time): '''Start duration of time.''' if len(self.start_delay_buffer) == self.delay_buffer_size: to_queue = self.start_delay_buffer.pop(0) self.start_records[to_queue[0].job] = to_queue self.start_delay_buffer.append((record, time)) self.process_completed() # Data stored for each task TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu']) PgmTaskParams = namedtuple('PgmTaskParams', ['node_type', 'gid']) TaskData = recordtype('TaskData', ['params', 'pgm_params', 'jobs', 'blocks', 'response', 'lateness', 'misses', 'pgm_response', 'pgm_lateness', 'pgm_misses']) GraphParams = recordtype('GraphParams', ['gid', 'src', 'sink']) GraphData = recordtype('GraphData', ['params', 'jobs', 'response']) PGM_NOT_A_NODE = 0 PGM_SRC = 1 PGM_SINK = 2 PGM_SRC_SINK = 3 PGM_INTERNAL = 4 # Map of event ids to corresponding class and format record_map = {} RECORD_SIZE = 24 NSEC_PER_MSEC = 1000000 def bits_to_bytes(bits): '''Includes padding''' return bits / 8 + (1 if bits%8 else 0) def field_bytes(fields): fbytes = 0 fbits = 0 for f in fields: flist = list(f) if len(flist) > 2: # Specified a bitfield fbits += flist[2] else: # Only specified a type, use types size fbytes += sizeof(list(f)[1]) # Bitfields followed by a byte will cause any incomplete # bytes to be turned into full bytes fbytes += bits_to_bytes(fbits) fbits = 0 fbytes += bits_to_bytes(fbits) return fbytes + fbits def register_record(id, clazz): fields = clazz.FIELDS diff = RECORD_SIZE - field_bytes(SchedRecord.FIELDS) - field_bytes(fields) # Create extra padding fields to make record the proper size # Creating one big field of c_uint64 and giving it a size of 8*diff # _should_ work, but doesn't. This is an uglier way of accomplishing # the same goal for d in range(diff): fields += [("extra%d" % d, c_char)] # Create structure with fields and methods of clazz clazz2 = type("Dummy%d" % id, (LittleEndianStructure,clazz), {'_fields_': SchedRecord.FIELDS + fields, '_pack_' : 1}) record_map[id] = clazz2 def make_iterator(fname): '''Iterate over (parsed record, processing method) in a sched-trace file.''' if not os.path.getsize(fname): # Likely a release master CPU return f = open(fname, 'rb') while True: data = f.read(RECORD_SIZE) try: type_num = struct.unpack_from('b',data)[0] except struct.error: break if type_num not in record_map: continue clazz = record_map[type_num] obj = clazz() obj.fill(data) if obj.job != 1: yield obj else: # Results from the first job are nonsense pass def read_data(task_dict, graph_dict, fnames): '''Read records from @fnames and store per-pid stats in @task_dict.''' q = [] # Number of trace records to buffer from each stream/file. # A sorted window is maintained in order to deal with # events that were recorded out-of-order. window_size = 500 def get_time(record): return record.when if hasattr(record, 'when') else 0 def add_record(itera): # Ordered insertion into buff try: arecord = itera.next() except StopIteration: return sort_key = (get_time(arecord), arecord.job, arecord.pid) heappush(q, (sort_key, arecord, itera)) for fname in fnames: itera = make_iterator(fname) for w in range(window_size): add_record(itera) sys_released = False while q: sort_key, record, itera = heappop(q) # fetch another record add_record(itera) record.process(task_dict) record.process_pgm(task_dict, graph_dict) class SchedRecord(object): # Subclasses will have their FIELDs merged into this one FIELDS = [('type', c_uint8), ('cpu', c_uint8), ('pid', c_uint16), ('job', c_uint32)] def fill(self, data): memmove(addressof(self), data, RECORD_SIZE) def process(self, task_dict): raise NotImplementedError() def process_pgm(self, task_dict, graph_dict): pass class ParamRecord(SchedRecord): FIELDS = [('wcet', c_uint32), ('period', c_uint32), ('phase', c_uint32), ('partition', c_uint8), ('class', c_uint8)] def process(self, task_dict): params = TaskParams(self.wcet, self.period, self.partition) task_dict[self.pid].params = params class ReleaseRecord(SchedRecord): # renames the 'release' field to 'when' to enable sorting FIELDS = [('when', c_uint64), ('deadline', c_uint64)] def process(self, task_dict): data = task_dict[self.pid] data.jobs += 1 data.response.start_time(self, self.when) data.misses.start_time(self, self.deadline) data.lateness.start_time(self, self.deadline) def process_pgm(self, task_dict, graph_dict): data = task_dict[self.pid] data.pgm_response.start_time(self, self.when) data.pgm_misses.start_time(self, self.deadline) data.pgm_lateness.start_time(self, self.deadline) if data.pgm_params: ntype = data.pgm_params.node_type if ntype == PGM_SRC or ntype == PGM_SRC_SINK: gid = data.pgm_params.gid gdata = graph_dict[gid] gdata.jobs += 1 gdata.response.start_time(self, self.when) class CompletionRecord(SchedRecord): FIELDS = [('when', c_uint64)] def process(self, task_dict): data = task_dict[self.pid] data.response.end_time(self, self.when) data.misses.end_time(self, self.when) data.lateness.end_time(self, self.when) def process_pgm(self, task_dict, graph_dict): data = task_dict[self.pid] data.pgm_response.end_time(self, self.when) data.pgm_misses.end_time(self, self.when) data.pgm_lateness.end_time(self, self.when) if data.pgm_params: ntype = data.pgm_params.node_type if ntype == PGM_SINK or ntype == PGM_SRC_SINK: gid = data.pgm_params.gid gdata = graph_dict[gid] gdata.response.end_time(self, self.when) class BlockRecord(SchedRecord): FIELDS = [('when', c_uint64)] def process(self, task_dict): task_dict[self.pid].blocks.start_time(self, self.when) class ResumeRecord(SchedRecord): FIELDS = [('when', c_uint64)] def process(self, task_dict): task_dict[self.pid].blocks.end_time(self, self.when) class SysReleaseRecord(SchedRecord): FIELDS = [('when', c_uint64), ('release', c_uint64)] def process(self, task_dict): pass class PgmParamRecord(SchedRecord): FIELDS = [('node_type', c_uint32), ('graph_pid', c_uint16)] def process(self, task_dict): pass def process_pgm(self, task_dict, graph_dict): pgm_params = PgmTaskParams(self.node_type, self.graph_pid) task_dict[self.pid].pgm_params = pgm_params if self.node_type == PGM_SRC or self.node_type == PGM_SINK or self.node_type == PGM_SRC_SINK: graph_data = graph_dict[self.graph_pid] if not graph_data.params: graph_data.params = GraphParams(self.graph_pid, 0, 0) if self.node_type == PGM_SRC: assert graph_data.params.src == 0 graph_data.params.src = self.pid elif self.node_type == PGM_SINK: assert graph_data.params.sink == 0 graph_data.params.sink = self.pid else: assert graph_data.params.src == 0 assert graph_data.params.sink == 0 graph_data.params.src = self.pid graph_data.params.sink = self.pid class PgmReleaseRecord(SchedRecord): # renames the 'release' field to 'when' FIELDS = [('when', c_uint64), ('deadline', c_uint64)] def process(self, task_dict): pass def process_pgm(self, task_dict, graph_dict): data = task_dict[self.pid] data.pgm_response.start_time(self, self.when) data.pgm_misses.start_time(self, self.deadline) data.pgm_lateness.start_time(self, self.deadline) if data.pgm_params: ntype = data.pgm_params.node_type if ntype == PGM_SRC or ntype == PGM_SRC_SINK: gid = data.pgm_params.graph_pid gdata = graph_dict[gid] gdata.response.start_time(self, self.when) # Map records to sched_trace ids (see include/litmus/sched_trace.h register_record(2, ParamRecord) register_record(3, ReleaseRecord) register_record(7, CompletionRecord) register_record(8, BlockRecord) register_record(9, ResumeRecord) register_record(11, SysReleaseRecord) register_record(12, PgmParamRecord) register_record(13, PgmReleaseRecord) def create_trace_dict(data_dir, work_dir = None): '''Parse sched trace files''' bin_files = conf.FILES['sched_data'].format(".*") output_file = "%s/out-st" % work_dir task_dict = defaultdict(lambda : TaskData(None, None, 1, TimeTracker(is_valid_duration = lambda x: x > 0), TimeTracker(), TimeTracker(), TimeTracker(is_valid_duration = lambda x: x > 0), TimeTracker(), TimeTracker(), TimeTracker(is_valid_duration = lambda x: x > 0))) graph_dict = defaultdict(lambda: GraphData(None, 1, TimeTracker(is_valid_duration = lambda x: x > 0))) bin_names = [f for f in os.listdir(data_dir) if re.match(bin_files, f)] if not len(bin_names): return task_dict # Save an in-english version of the data for debugging # This is optional and will only be done if 'st_show' is in PATH if conf.BINS['st_show']: cmd_arr = [conf.BINS['st_show']] cmd_arr.extend(bin_names) with open(output_file, "w") as f: subprocess.call(cmd_arr, cwd=data_dir, stdout=f) # Gather per-task values bin_paths = ["%s/%s" % (data_dir,f) for f in bin_names] read_data(task_dict, graph_dict, bin_paths) return task_dict, graph_dict LOSS_MSG = """Found task missing more than %d%% of its scheduling records. These won't be included in scheduling statistics!"""%(100*conf.MAX_RECORD_LOSS) SKIP_MSG = """Measurement '%s' has no non-zero values. Measurements like these are not included in scheduling statistics. If a measurement is missing, this is why.""" def extract_sched_data(result, data_dir, work_dir): task_dict, graph_dict = create_trace_dict(data_dir, work_dir) stat_data = defaultdict(list) gstat_data = defaultdict(list) # Group per-task values for task, tdata in task_dict.iteritems(): if not tdata.params: # Currently unknown where these invalid tasks come from... continue lateness = tdata.lateness response = tdata.response miss = tdata.misses pgm_response = tdata.pgm_response pgm_lateness = tdata.pgm_lateness pgm_miss = tdata.pgm_misses record_loss = float(miss.disjoints())/(miss.matches + miss.disjoints()) stat_data["record-loss"].append(record_loss) if record_loss > conf.MAX_RECORD_LOSS: log_once(LOSS_MSG) continue miss_ratio = float(miss.num) / miss.matches pgm_miss_ratio = float(pgm_miss.num) / pgm_miss.matches # average job tardy by: avg_tard = miss.avg * miss_ratio pgm_avg_tard = pgm_miss.avg * pgm_miss_ratio # start with basic task information stat_data["miss-ratio" ].append(miss_ratio) stat_data["response-max"].append(float(response.max)/NSEC_PER_MSEC) stat_data["response-avg"].append(response.avg/NSEC_PER_MSEC) stat_data["response-prop-max"].append(float(response.max) / tdata.params.period) stat_data["response-prop-avg"].append(response.avg / tdata.params.period) stat_data["tard-max"].append(float(miss.max)/NSEC_PER_MSEC) stat_data["tard-avg"].append(avg_tard/NSEC_PER_MSEC) stat_data["tard-prop-max"].append(float(miss.max) / tdata.params.period) stat_data["tard-prop-avg"].append(avg_tard / tdata.params.period) stat_data["lateness-max"].append(float(lateness.max)/NSEC_PER_MSEC) stat_data["lateness-avg"].append(lateness.avg/NSEC_PER_MSEC) stat_data["lateness-prop-max"].append(float(lateness.max) / tdata.params.period) stat_data["lateness-prop-avg"].append(lateness.avg / tdata.params.period) # same data, but with PGM-adjusted release times (shifted deadlines) stat_data["pgm-miss-ratio" ].append(pgm_miss_ratio) stat_data["pgm-response-max"].append(float(pgm_response.max)/NSEC_PER_MSEC) stat_data["pgm-response-avg"].append(pgm_response.avg/NSEC_PER_MSEC) stat_data["pgm-response-prop-max"].append(float(pgm_response.max) / tdata.params.period) stat_data["pgm-response-prop-avg"].append(pgm_response.avg / tdata.params.period) stat_data["pgm-tard-max"].append(float(pgm_miss.max)/NSEC_PER_MSEC) stat_data["pgm-tard-avg"].append(pgm_avg_tard/NSEC_PER_MSEC) stat_data["pgm-tard-prop-max"].append(float(pgm_miss.max) / tdata.params.period) stat_data["pgm-tard-prop-avg"].append(pgm_avg_tard / tdata.params.period) stat_data["pgm-lateness-max"].append(float(pgm_lateness.max)/NSEC_PER_MSEC) stat_data["pgm-lateness-avg"].append(pgm_lateness.avg/NSEC_PER_MSEC) stat_data["pgm-lateness-prop-max"].append(float(pgm_lateness.max) / tdata.params.period) stat_data["pgm-lateness-prop-avg"].append(pgm_lateness.avg / tdata.params.period) for gid, gdata in graph_dict.iteritems(): if not gdata.params: continue response = gdata.response if response.matches + response.disjoints() == 0: record_loss = 0 else: record_loss = float(response.disjoints())/(response.matches + response.disjoints()) gstat_data["graph-record-loss"].append(record_loss) if record_loss > conf.MAX_RECORD_LOSS: log_once(LOSS_MSG) continue gstat_data["graph-response-max"].append(float(response.max)/NSEC_PER_MSEC) gstat_data["graph-response-avg"].append(response.avg/NSEC_PER_MSEC) # Summarize value groups for name, data in stat_data.iteritems(): if not data: log_once(SKIP_MSG, SKIP_MSG % name) continue result[name] = Measurement(str(name)).from_array(data) for name, data in gstat_data.iteritems(): result[name] = Measurement(str(name)).from_array(data)