############################################################################### # Description ############################################################################### # trace_reader(files) returns an iterator which produces records # in order from the files given. (the param is a list of files.) # # Each record is just a Python object. It is guaranteed to have the following # attributes: # - 'pid': pid of the task # - 'job': job number for that task # - 'cpu', given by LITMUS # - 'when', given by LITMUS as a timestamp. LITMUS does not provide a # timestamp for all records. In this case, when is set to 0. # - 'type', a numerical value given by LITMUS # - 'type_name', a human-readable name defined in this module # - 'record_type', set to 'event' by this module (to distinguish from, e.g., # error records produced elsewhere). # - Possible additional attributes, depending on the type of record. # # To find out exactly what attributes are set for each record type, look at # the trace-parsing information at the bottom of this file. ############################################################################### # Imports ############################################################################### import struct import sys import mmap ############################################################################### # Public functions ############################################################################### # Generator function returning an iterable over records in a trace file. def trace_reader(files): # Yield a record containing the input files # This is used by progress.py to calculate progress record = dict(record_type='meta', type_name='trace_files', files=files) yield record # Yield a record indicating the number of CPUs, used by the G-EDF test record = dict(record_type='meta', type_name='num_cpus', num_cpus=len(files)) yield record # Create iterators for each file and a buffer to store records in file_iters = [] # file iterators file_iter_buff = [] # file iterator buffers for file in files: file_iter = _get_file_iter(file) file_iters.append(file_iter) try: file_iter_buff.append([file_iter.next()]) # What if there isn't a single valid record in a trace file? # file_iter.next() will raise a StopIteration that we need to catch except: # Forget that file iter file_iters.pop() # We keep 200 records in each buffer and then keep the buffer sorted # This is because records may have been recorded slightly out of order # This cannot guarantee records are produced in order, but it makes it # overwhelmingly probably. # The 'try' and 'except' catches the case where there are less than 200 # records in a file (throwing a StopIteration) which otherwise would # propogate up and cause the trace_reader generator itself to throw a # StopIteration. for x in range(0,len(file_iter_buff)): try: for y in range(0,200): file_iter_buff[x].append(file_iters[x].next()) except StopIteration: pass # for x in range(0,len(file_iter_buff)): # file_iter_buff[x] = sorted(file_iter_buff[x],key=lambda rec: rec['when']) # Remember the time of the last record. This way, we can make sure records # truly are produced in monotonically increasing order by time and terminate # fatally if they are not. last_time = None # We want to give records ID numbers so users can filter by ID id = 0 # Keep pulling records as long as we have a buffer while len(file_iter_buff) > 0: # Select the earliest record from those at the heads of the buffers earliest = -1 buff_to_refill = -1 for x in range(0,len(file_iter_buff)): if earliest==-1 or file_iter_buff[x][0]['when'] < earliest['when']: earliest = file_iter_buff[x][0] buff_to_refill = x # Take it out of the buffer del file_iter_buff[buff_to_refill][0] # Try to append a new record to the buffer (if there is another) and # then keep the buffer sorted try: file_iter_buff[buff_to_refill].append(file_iters[buff_to_refill].next()) # file_iter_buff[buff_to_refill] = sorted(file_iter_buff[buff_to_refill], # key=lambda rec: rec['when']) # If there aren't any more records, fine. Unless the buffer is also empty. # If that is the case, delete the buffer. except StopIteration: if len(file_iter_buff[buff_to_refill]) < 1: del file_iter_buff[buff_to_refill] del file_iters[buff_to_refill] # Check for monotonically increasing time if last_time is not None and earliest['when'] < last_time: # exit("FATAL ERROR: trace_reader.py: out-of-order record produced") # else: last_time = earliest['when'] # Give the record an id number id += 1 earliest['id'] = id # Yield the record yield earliest ############################################################################### # Private functions ############################################################################### import traceback # Returns an iterator to pull records from a file def _get_file_iter(file): f = open(file,'r+b') f = mmap.mmap(f.fileno(), 0) while True: data = f.read(RECORD_HEAD_SIZE) try: type_num = struct.unpack_from('b',data)[0] except struct.error: break #We read to the end of the file try: type = _get_type(type_num) except: sys.stderr.write("Skipping record with invalid type num: %d\n" % (type_num)) continue try: values = struct.unpack_from(StHeader.format + type.format,data) record = dict(zip(type.keys,values)) except: f.close() sys.stderr.write("Skipping record that does not match proper" + " struct formatting\n") continue # Give it a type name (easier to work with than type number) record['type_name'] = _get_type_name(type_num) # All records should have a 'record type' field. # e.g. these are 'event's as opposed to 'error's record['record_type'] = "event" # If there is no timestamp, set the time to 0 if 'when' not in record.keys(): record['when'] = 0 yield record # Convert a dict into an object def _dict2obj(d): class Obj(object): pass o = Obj() for key in d.keys(): o.__dict__[key] = d[key] return o ############################################################################### # Trace record data types and accessor functions ############################################################################### # Each class below represents a type of event record. The format attribute # specifies how to decode the binary record and the keys attribute # specifies how to name the pieces of information decoded. Note that all # event records have a common initial 24 bytes, represented by the StHeader # class. RECORD_HEAD_SIZE = 24 class StHeader: format = ' len(types)-1 or type_num < 1: raise Exception return types[type_num] # Return the type name, given the type_num (this is simply a convenience to # programmers of other modules) def _get_type_name(type_num): type_names = [None,"name","params","release","assign","switch_to", "switch_away","completion","block","resume","sys_release"] return type_names[type_num]