From 44a8ade3ed5dc4810fd95c41dbe8ec3aa2fb0cf7 Mon Sep 17 00:00:00 2001 From: Gary Bressler Date: Mon, 1 Mar 2010 23:46:44 -0500 Subject: Reorganized tree, along with the visualizer --- reader/__init__.py | 4 + reader/gedf_test.py | 163 ++++++++++++++++++++++++++++ reader/naive_trace_reader.py | 165 +++++++++++++++++++++++++++++ reader/runtests.py | 47 +++++++++ reader/sample_script.py | 41 ++++++++ reader/sample_script.py~ | 41 ++++++++ reader/sanitizer.py | 53 ++++++++++ reader/stats.py | 39 +++++++ reader/stdout_printer.py | 69 ++++++++++++ reader/test.py | 15 +++ reader/trace_reader.py | 245 +++++++++++++++++++++++++++++++++++++++++++ 11 files changed, 882 insertions(+) create mode 100644 reader/__init__.py create mode 100644 reader/gedf_test.py create mode 100644 reader/naive_trace_reader.py create mode 100755 reader/runtests.py create mode 100755 reader/sample_script.py create mode 100644 reader/sample_script.py~ create mode 100644 reader/sanitizer.py create mode 100644 reader/stats.py create mode 100644 reader/stdout_printer.py create mode 100755 reader/test.py create mode 100644 reader/trace_reader.py (limited to 'reader') diff --git a/reader/__init__.py b/reader/__init__.py new file mode 100644 index 0000000..afbfe44 --- /dev/null +++ b/reader/__init__.py @@ -0,0 +1,4 @@ +import trace_reader +import gedf_test +import sanitizer +import stats diff --git a/reader/gedf_test.py b/reader/gedf_test.py new file mode 100644 index 0000000..e31fb19 --- /dev/null +++ b/reader/gedf_test.py @@ -0,0 +1,163 @@ +############################################################################### +# Description +############################################################################### + +# G-EDF Test + +############################################################################### +# Imports +############################################################################### + +import copy + + +############################################################################## +# Public Functions +############################################################################## + +def gedf_test(stream): + + # Two lists to model the system: tasks occupying a CPU and tasks eligible + # to do so. Also, m = the number of CPUs. + eligible = [] + on_cpu = [] + m = None + + # Time of the last record we saw. Only run the G-EDF test when the time + # is updated. + last_time = None + + for record in stream: + if record.record_type != "event": + if record.record_type == "meta" and record.type_name == "num_cpus": + m = record.num_cpus + continue + + # Check for inversion starts and ends and yield them. + # Only to the check when time has moved forward. + # (It is common to have records with simultaneous timestamps.) + if last_time is not None and last_time != record.when: + errors = _gedf_check(eligible,on_cpu,record.when,m) + for error in errors: + yield error + + # Add a newly-released Job to the eligible queue + if record.type_name == 'release': + eligible.append(Job(record)) + + # Move a Job from the eligible queue to on_cpu + elif record.type_name == 'switch_to': + pos = _find_job(record,eligible) + job = eligible[pos] + del eligible[pos] + on_cpu.append(job) + + # Mark a Job as completed. + # The only time a Job completes when it is not on a + # CPU is when it is the last job of the task. + elif record.type_name == 'completion': + pos = _find_job(record,on_cpu) + if pos is not None: + on_cpu[pos].is_complete = True + else: + pos = _find_job(record,eligible) + del eligible[pos] + + # A job is switched away from a CPU. If it has + # been marked as complete, remove it from the model. + elif record.type_name == 'switch_away': + pos = _find_job(record,on_cpu) + job = on_cpu[pos] + del on_cpu[pos] + if job.is_complete == False: + eligible.append(job) + + last_time = record.when + yield record + +############################################################################### +# Private Functions +############################################################################### + +# Internal representation of a Job +class Job(object): + def __init__(self, record): + self.pid = record.pid + self.job = record.job + self.deadline = record.deadline + self.is_complete = False + self.inversion_start = None + self.inversion_end = None + def __str__(self): + return "(%d.%d:%d)" % (self.pid,self.job,self.deadline) + +# G-EDF errors: the start or end of an inversion +class Error(object): + def __init__(self, job, eligible, on_cpu): + self.job = copy.copy(job) + self.eligible = copy.copy(eligible) + self.on_cpu = copy.copy(on_cpu) + self.record_type = 'error' + if job.inversion_end is None: + self.type_name = 'inversion_start' + else: + self.type_name = 'inversion_end' + +# Returns the position of a Job in a list, or None +def _find_job(record,list): + for i in range(0,len(list)): + if list[i].pid == record.pid and list[i].job == record.job: + return i + return None + +# Return records for any inversion_starts and inversion_ends +def _gedf_check(eligible,on_cpu,when,m): + + # List of error records to be returned + errors = [] + + # List of all jobs that are not complete + all = [] + for x in on_cpu: + if x.is_complete is not True: + all.append(x) + all += eligible + + # Sort by on_cpu and then by deadline. sort() is guaranteed to be stable. + # Thus, this gives us jobs ordered by deadline with preference to those + # actually running. + all.sort(key=lambda x: 0 if (x in on_cpu) else 1) + all.sort(key=lambda x: x.deadline) + + # Check those that actually should be running + for x in range(0,min(m,len(all))): + job = all[x] + + # It's not running and an inversion_start has not been recorded + if job not in on_cpu and job.inversion_start is None: + job.inversion_start = when + errors.append(Error(job, eligible, on_cpu)) + + # It is running and an inversion_start exists (i.e. it it still + # marked as being inverted) + elif job in on_cpu and job.inversion_start is not None: + job.inversion_end = when + errors.append(Error(job, eligible, on_cpu)) + job.inversion_start = None + job.inversion_end = None + + # Check those that actually should not be running + for x in range(m,len(all)): + job = all[x] + + # It actually is running. We don't care. + + # It isn't running, but an inversion_start exists (i.e. it is still + # marked as being inverted) + if job not in on_cpu and job.inversion_start is not None: + job.inversion_end = when + errors.append(Error(job, eligible, on_cpu)) + job.inversion_start = None + job.inversion_end = None + + return errors diff --git a/reader/naive_trace_reader.py b/reader/naive_trace_reader.py new file mode 100644 index 0000000..0f117b8 --- /dev/null +++ b/reader/naive_trace_reader.py @@ -0,0 +1,165 @@ +############################################################################### +# Description +############################################################################### + +# trace_reader(files) returns an iterator which produces records +# OUT OF ORDER from the files given. (the param is a list of files.) +# +# The non-naive trace_reader has a lot of complex logic which attempts to +# produce records in order (even though they are being pulled from multiple +# files which themselves are only approximately ordered). This trace_reader +# attempts to be as simple as possible and is used in the unit tests to +# make sure the total number of records read by the normal trace_reader is +# the same as the number of records read by this one. + +############################################################################### +# Imports +############################################################################### + +import struct + + +############################################################################### +# Public functions +############################################################################### + +# Generator function returning an iterable over records in a trace file. +def trace_reader(files): + for file in files: + f = open(file,'rb') + while True: + data = f.read(RECORD_HEAD_SIZE) + try: + type_num = struct.unpack_from('b',data)[0] + except struct.error: + break #We read to the end of the file + type = _get_type(type_num) + try: + values = struct.unpack_from(StHeader.format + + type.format,data) + record_dict = dict(zip(type.keys,values)) + except struct.error: + f.close() + print "Invalid record detected, stopping." + exit() + + # Convert the record_dict into an object + record = _dict2obj(record_dict) + + # Give it a type name (easier to work with than type number) + record.type_name = _get_type_name(type_num) + + # All records should have a 'record type' field. + # e.g. these are 'event's as opposed to 'error's + record.record_type = "event" + + # If there is no timestamp, set the time to 0 + if 'when' not in record.__dict__.keys(): + record.when = 0 + + yield record + +############################################################################### +# Private functions +############################################################################### + +# Convert a dict into an object +def _dict2obj(d): + class Obj: pass + o = Obj() + for key in d.keys(): + o.__dict__[key] = d[key] + return o + +############################################################################### +# Trace record data types and accessor functions +############################################################################### + +# Each class below represents a type of event record. The format attribute +# specifies how to decode the binary record and the keys attribute +# specifies how to name the pieces of information decoded. Note that all +# event records have a common initial 24 bytes, represented by the StHeader +# class. + +RECORD_HEAD_SIZE = 24 + +class StHeader(object): + format = ' output). + +# Import the modules we need. You should not need to know about +# their internals. +import trace_reader +import sanitizer +import gedf_test +import stats +import stdout_printer + +# Specify your trace files +g6 = [ +'../sample_traces/st-g6-0.bin', +'../sample_traces/st-g6-1.bin', +'../sample_traces/st-g6-2.bin', +'../sample_traces/st-g6-3.bin', +] + +# Here is an example of a custom filter function. +# It will remove from the error stream all inversion_end records indicating +# an inversion of less than 4000000 time units. Thus, you can grep through +# the output looking 'Inversion end' and find only errors for particularly +# long inversions. This is commented out in the pipeline (below) since you +# probably don't want it in general. +def my_filter(record): + if record.record_type == 'error' and record.type_name == 'inversion_end': + if record.job.inversion_end - record.job.inversion_start < 4000000: + return False + return True + +# Pipeline +stream = trace_reader.trace_reader(g6) # Read events from traces +stream = sanitizer.sanitizer(stream) # Remove garbage events +stream = gedf_test.gedf_test(stream) # Produce G-EDF error records +stream = stats.stats(stream) # Produce a statistics record +#stream = filter(my_filter, stream) # Filter some records before printing +stdout_printer.stdout_printer(stream) # Print records to stdout diff --git a/reader/sample_script.py~ b/reader/sample_script.py~ new file mode 100644 index 0000000..c3b7843 --- /dev/null +++ b/reader/sample_script.py~ @@ -0,0 +1,41 @@ +#!/usr/bin/python + +# This is a sample script for using the tool. I would recommend copying +# this and modifying it to suit your needs for a particular test. Make +# sure you redirect the output to a file (e.g. ./sample_script.py > output). + +# Import the modules we need. You should not need to know about +# their internals. +import trace_reader +import sanitizer +import gedf_test +import stats +import stdout_printer + +# Specify your trace files +g6 = [ +'./sample_traces/st-g6-0.bin', +'./sample_traces/st-g6-1.bin', +'./sample_traces/st-g6-2.bin', +'./sample_traces/st-g6-3.bin', +] + +# Here is an example of a custom filter function. +# It will remove from the error stream all inversion_end records indicating +# an inversion of less than 4000000 time units. Thus, you can grep through +# the output looking 'Inversion end' and find only errors for particularly +# long inversions. This is commented out in the pipeline (below) since you +# probably don't want it in general. +def my_filter(record): + if record.record_type == 'error' and record.type_name == 'inversion_end': + if record.job.inversion_end - record.job.inversion_start < 4000000: + return False + return True + +# Pipeline +stream = trace_reader.trace_reader(g6) # Read events from traces +stream = sanitizer.sanitizer(stream) # Remove garbage events +stream = gedf_test.gedf_test(stream) # Produce G-EDF error records +stream = stats.stats(stream) # Produce a statistics record +#stream = filter(my_filter, stream) # Filter some records before printing +stdout_printer.stdout_printer(stream) # Print records to stdout diff --git a/reader/sanitizer.py b/reader/sanitizer.py new file mode 100644 index 0000000..79315cc --- /dev/null +++ b/reader/sanitizer.py @@ -0,0 +1,53 @@ +############################################################################### +# Description +############################################################################### + +# Sanitize input. (There are a number of goofy issues with the sched_trace +# output.) + +############################################################################### +# Public functions +############################################################################### + +def sanitizer(stream): + + job_2s_released = [] # list of tasks which have released their job 2s + jobs_switched_to = [] + + for record in stream: + + # Ignore records which are not events (e.g. the num_cpus record) + if record.record_type != 'event': + yield record + continue + + # All records with job < 2 are garbage + if record.job < 2: + continue + + # Some records with job == 2 are garbage + if record.job==2: + + # There is a duplicate release of every job 2 + # This will throw away the second one + if record.type_name == 'release': + if record.pid in job_2s_released: + continue + else: + job_2s_released.append(record.pid) + + # Job 2 has a resume that is garbage + if record.type_name == 'resume': + continue + + # By default, the switch_away for a job (after it has completed) + # is maked as being for job+1, which has never been switched to. + # We can correct this if we note which jobs really + # have been switched to. + if record.type_name == 'switch_to': + jobs_switched_to.append((record.pid,record.job)) + if record.type_name == 'switch_away': + if (record.pid,record.job) not in jobs_switched_to: + record.job -= 1 + + yield record diff --git a/reader/stats.py b/reader/stats.py new file mode 100644 index 0000000..34a842f --- /dev/null +++ b/reader/stats.py @@ -0,0 +1,39 @@ +############################################################################### +# Description +############################################################################### +# Compute and produce statistics + + +############################################################################### +# Public Functions +############################################################################### + +def stats(stream): + min_inversion = -1 + max_inversion = -1 + sum_inversions = 0 + num_inversions = 0 + for record in stream: + if record.type_name == 'inversion_end': + length = record.job.inversion_end - record.job.inversion_start + if length > 0: + num_inversions += 1 + if length > max_inversion: + max_inversion = length + if length < min_inversion or min_inversion == -1: + min_inversion = length + sum_inversions += length + yield record + if num_inversions > 0: + avg_inversion = int(sum_inversions / num_inversions) + else: + avg_inversion = 0 + class Obj(object): pass + rec = Obj() + rec.record_type = "meta" + rec.type_name = "stats" + rec.num_inversions = num_inversions + rec.min_inversion = min_inversion + rec.max_inversion = max_inversion + rec.avg_inversion = avg_inversion + yield rec diff --git a/reader/stdout_printer.py b/reader/stdout_printer.py new file mode 100644 index 0000000..f8d9a84 --- /dev/null +++ b/reader/stdout_printer.py @@ -0,0 +1,69 @@ +############################################################################### +# Description +############################################################################### + +# Prints records to standard out + +############################################################################### +# Public functions +############################################################################### + +def stdout_printer(stream): + for record in stream: + if record.record_type == "event": + _print_event(record) + elif record.record_type == "meta" and record.type_name == "stats": + _print_stats(record) + elif record.record_type == "error" and record.type_name == 'inversion_start': + _print_inversion_start(record) + elif record.record_type == "error" and record.type_name == 'inversion_end': + _print_inversion_end(record) + else: + continue + print "" + +############################################################################### +# Private functions +############################################################################### + +def _print_event(record): + print "Job: %d.%d" % (record.pid,record.job) + print "Type: %s" % (record.type_name) + print "Time: %d" % (record.when) + +def _print_inversion_start(record): + print "Type: %s" % ("Inversion start") + print "Time: %d" % (record.job.inversion_start) + print "Job: %d.%d" % (record.job.pid,record.job.job) + print "Deadline: %d" % (record.job.deadline) + print "Eligible: ", + for job in record.eligible: + print str(job) + " ", + print + print "On CPU: ", + for job in record.on_cpu: + print str(job) + " ", + print #newline + +def _print_inversion_end(record): + print "Type: %s" % ("Inversion end") + print "Time: %d" % (record.job.inversion_end) + print "Duration: %d" % ( + record.job.inversion_end - record.job.inversion_start) + print "Job: %d.%d" % (record.job.pid,record.job.job) + print "Deadline: %d" % (record.job.deadline) + print "Eligible: ", + for job in record.eligible: + print str(job) + " ", + print + print "On CPU: ", + for job in record.on_cpu: + print str(job) + " ", + print #newline + +def _print_stats(record): + print "Inversion statistics" + print "Num inversions: %d" % (record.num_inversions) + print "Min inversion: %d" % (record.min_inversion) + print "Max inversion: %d" % (record.max_inversion) + print "Avg inversion: %d" % (record.avg_inversion) diff --git a/reader/test.py b/reader/test.py new file mode 100755 index 0000000..b260314 --- /dev/null +++ b/reader/test.py @@ -0,0 +1,15 @@ +#!/usr/bin/python + +import cairo + +if __name__ == '__main__': + surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, 500, 500) + ctx = cairo.Context(surface) + ctx.move_to(10, 10) + ctx.line_to(-100, 10) + ctx.set_line_width(2) + + ctx.move_to(10, 10) + ctx.line_to(20, 10) + ctx.stroke() + surface.write_to_png('test.png') diff --git a/reader/trace_reader.py b/reader/trace_reader.py new file mode 100644 index 0000000..a4ff964 --- /dev/null +++ b/reader/trace_reader.py @@ -0,0 +1,245 @@ +############################################################################### +# Description +############################################################################### + +# trace_reader(files) returns an iterator which produces records +# in order from the files given. (the param is a list of files.) +# +# Each record is just a Python object. It is guaranteed to have the following +# attributes: +# - 'pid': pid of the task +# - 'job': job number for that task +# - 'cpu', given by LITMUS +# - 'when', given by LITMUS as a timestamp. LITMUS does not provide a +# timestamp for all records. In this case, when is set to 0. +# - 'type', a numerical value given by LITMUS +# - 'type_name', a human-readable name defined in this module +# - 'record_type', set to 'event' by this module (to distinguish from, e.g., +# error records produced elsewhere). +# - Possible additional attributes, depending on the type of record. +# +# To find out exactly what attributes are set for each record type, look at +# the trace-parsing information at the bottom of this file. + +############################################################################### +# Imports +############################################################################### + +import struct + + +############################################################################### +# Public functions +############################################################################### + +# Generator function returning an iterable over records in a trace file. +def trace_reader(files): + + # Yield a record indicating the number of CPUs, used by the G-EDF test + class Obj: pass + record = Obj() + record.record_type = "meta" + record.type_name = "num_cpus" + record.num_cpus = len(files) + yield record + + # Create iterators for each file and a buffer to store records in + file_iters = [] # file iterators + file_iter_buff = [] # file iterator buffers + for file in files: + file_iter = _get_file_iter(file) + file_iters.append(file_iter) + file_iter_buff.append([file_iter.next()]) + + # We keep 100 records in each buffer and then keep the buffer sorted + # This is because records may have been recorded slightly out of order + # This cannot guarantee records are produced in order, but it makes it + # overwhelmingly probably. + for x in range(0,len(file_iter_buff)): + for y in range(0,100): + file_iter_buff[x].append(file_iters[x].next()) + for x in range(0,len(file_iter_buff)): + file_iter_buff[x] = sorted(file_iter_buff[x],key=lambda rec: rec.when) + + # Remember the time of the last record. This way, we can make sure records + # truly are produced in monotonically increasing order by time and terminate + # fatally if they are not. + last_time = None + + # Keep pulling records as long as we have a buffer + while len(file_iter_buff) > 0: + + # Select the earliest record from those at the heads of the buffers + earliest = -1 + buff_to_refill = -1 + for x in range(0,len(file_iter_buff)): + if earliest==-1 or file_iter_buff[x][0].when < earliest.when: + earliest = file_iter_buff[x][0] + buff_to_refill = x + + # Take it out of the buffer + del file_iter_buff[buff_to_refill][0] + + # Try to append a new record to the buffer (if there is another) and + # then keep the buffer sorted + try: + file_iter_buff[buff_to_refill].append(file_iters[buff_to_refill].next()) + file_iter_buff[buff_to_refill] = sorted(file_iter_buff[buff_to_refill], + key=lambda rec: rec.when) + + # If there aren't any more records, fine. Unless the buffer is also empty. + # If that is the case, delete the buffer. + except StopIteration: + if len(file_iter_buff[buff_to_refill]) < 1: + del file_iter_buff[buff_to_refill] + del file_iters[buff_to_refill] + + # Check for monotonically increasing time + if last_time is not None and earliest.when < last_time: + exit("FATAL ERROR: trace_reader.py: out-of-order record produced") + else: + last_time = earliest.when + + # Yield the record + yield earliest + +############################################################################### +# Private functions +############################################################################### + +# Returns an iterator to pull records from a file +def _get_file_iter(file): + f = open(file,'rb') + while True: + data = f.read(RECORD_HEAD_SIZE) + try: + type_num = struct.unpack_from('b',data)[0] + except struct.error: + break #We read to the end of the file + type = _get_type(type_num) + try: + values = struct.unpack_from(StHeader.format + + type.format,data) + record_dict = dict(zip(type.keys,values)) + except struct.error: + f.close() + print "Invalid record detected, stopping." + exit() + + # Convert the record_dict into an object + record = _dict2obj(record_dict) + + # Give it a type name (easier to work with than type number) + record.type_name = _get_type_name(type_num) + + # All records should have a 'record type' field. + # e.g. these are 'event's as opposed to 'error's + record.record_type = "event" + + # If there is no timestamp, set the time to 0 + if 'when' not in record.__dict__.keys(): + record.when = 0 + + yield record + +# Convert a dict into an object +def _dict2obj(d): + class Obj(object): pass + o = Obj() + for key in d.keys(): + o.__dict__[key] = d[key] + return o + +############################################################################### +# Trace record data types and accessor functions +############################################################################### + +# Each class below represents a type of event record. The format attribute +# specifies how to decode the binary record and the keys attribute +# specifies how to name the pieces of information decoded. Note that all +# event records have a common initial 24 bytes, represented by the StHeader +# class. + +RECORD_HEAD_SIZE = 24 + +class StHeader: + format = '