############################################################################### # Description ############################################################################### # G-EDF Test ############################################################################### # Imports ############################################################################### import copy import sys ############################################################################### # Public Functions ############################################################################### def gedf_test(stream): m = None # CPUs # System model on_cpu = [] # Tasks on CPU off_cpu = [] # Tasks not on CPU tasklet_off_cpu = [] # Tasklets on CPU tasklet_on_cpu = [] # Tasklets on CPU work_off_cpu = [] # work item on CPU work_on_cpu = [] # work item on CPU for record in stream: if record.record_type == "meta" and record.type_name == "num_cpus": m = record.num_cpus break # Time of the last record we saw. Only run the G-EDF test when the time # is updated. last_time = None # First event for the latest timestamp. This is used to match up # inversion starts and ends with the first event from the previous # timestamp, which is the first event that could have triggered # the inversion start or end. first_event_this_timestamp = 0 for record in stream: if record.record_type != "event": continue # Bookkeeping iff the timestamp has moved forward. # Check for inversion starts and ends and yield them. # (It is common to have records with simultaneous timestamps, # so we only check when the time has moved forward) # Also, need to update the first_event_this_timestamp variable if last_time is not None and last_time != record.when: errors = _cedf_check_tasklet(off_cpu,on_cpu, tasklet_off_cpu,tasklet_on_cpu,last_time,m, first_event_this_timestamp) for error in errors: yield error errors = _cedf_check_work(off_cpu,on_cpu, work_off_cpu,work_on_cpu,last_time,m, first_event_this_timestamp) for error in errors: yield error errors = _cedf_check(off_cpu,on_cpu,last_time,m, first_event_this_timestamp) for error in errors: yield error first_event_this_timestamp = record.id # Add a newly-released Job to the off_cpu queue if record.type_name == 'release': off_cpu.append(Job(record)) pos = _find_job(record,off_cpu) off_cpu[pos].deadline = record.deadline off_cpu[pos].inh_deadline = record.deadline # Move a Job from the off_cpu queue to on_cpu elif record.type_name == 'switch_to': pos = _find_job(record,off_cpu) if pos is None: msg = "Event %d tried to switch to a job that was not on the" msg += " off_cpu queue\n" msg = msg % (record.id) sys.stderr.write(msg) exit() job = off_cpu[pos] del off_cpu[pos] on_cpu.append(job) # Mark a Job as completed. # The only time a Job completes when it is not on a # CPU is when it is the last job of the task. elif record.type_name == 'completion': pos = _find_job(record,on_cpu) if pos is not None: on_cpu[pos].is_complete = True else: pos = _find_job(record,off_cpu) del off_cpu[pos] # A job is switched away from a CPU. If it has # been marked as complete, remove it from the model. elif record.type_name == 'switch_away': pos = _find_job(record,on_cpu) if pos is None: msg = ("Event %d tried to switch to switch away a job" + " that was not running\n") msg = msg % (record.id) sys.stderr.write(msg) exit() job = on_cpu[pos] del on_cpu[pos] if job.is_complete == False: off_cpu.append(job) # A job has been blocked. elif record.type_name == 'block': pos = _find_job(record,on_cpu) # What if the job is blocked AFTER being switched away? # This is a bug in some versions of LITMUS. if pos is None: pos = _find_job(record,off_cpu) job = off_cpu[pos] else: job = on_cpu[pos] job.is_blocked = True # A job is resumed elif record.type_name == 'resume': pos = _find_job(record,off_cpu) job = off_cpu[pos] job.is_blocked = False # Add a newly-released Takslet to the tasklet_off_cpu queue if record.type_name == 'tasklet_release': tasklet_off_cpu.append(Job(record)) # Move a Takslet from the tasklet_off_cpu queue to tasklet_on_cpu elif record.type_name == 'tasklet_begin': pos = _find_job(record,tasklet_off_cpu) if pos is None: msg = "Event %d tried to begin to a tasklet that was not on the" msg += " tasklet_off_cpu queue\n" msg = msg % (record.id) sys.stderr.write(msg) exit() job = tasklet_off_cpu[pos] del tasklet_off_cpu[pos] tasklet_on_cpu.append(job) # A Takslet is end from a CPU. elif record.type_name == 'tasklet_end': pos = _find_job(record,tasklet_on_cpu) if pos is None: msg = ("Event %d tried to end a tasklet" + " that was not running\n") msg = msg % (record.id) sys.stderr.write(msg) exit() del tasklet_on_cpu[pos] # Add a newly-released Work item to the work_off_cpu queue if record.type_name == 'work_release': work_off_cpu.append(Job(record)) # Register a klitirqd threadto a Work item in the work_off_cpu elif record.type_name == 'work_begin': pos = _find_job(record,work_off_cpu) if pos is None: msg = "Event %d tried to begin to a work item that was not on the" msg += " work_off_cpu queue\n" msg = msg % (record.id) sys.stderr.write(msg) exit() job = work_off_cpu[pos] job.exe_pid = record.exe_pid del work_off_cpu[pos] work_on_cpu.append(job) # A Work item is end from a CPU. elif record.type_name == 'work_end': pos = _find_job(record,work_on_cpu) if pos is not None: del work_on_cpu[pos] else: msg = ("Event %d tried to end a work item" + " that hasn't not executed\n") msg = msg % (record.id) sys.stderr.write(msg) exit() # a Task has Priority inheritance elif record.type_name == 'eff_prio_change': inh_pos = _find_inh_job(record,off_cpu) if inh_pos is None: inh_pos = _find_inh_job(record,on_cpu) if inh_pos is not None: inh_job = on_cpu[inh_pos] else: inh_job = off_cpu[inh_pos] if inh_pos is not None: is_prio_inh=True inh_deadline = inh_job.deadline else: is_prio_inh=False pos = _find_job(record,on_cpu) if pos is None: pos = _find_job(record,off_cpu) if pos is None: msg = ("Event %d tried to change a jobs priority " + " that cannot found\n") msg = msg % (record.id) sys.stderr.write(msg) exit() if inh_pos is not None: off_cpu[pos].inh_deadline = inh_job.deadline else: off_cpu[pos].inh_deadline = off_cpu[pos].deadline else: if inh_pos is not None: on_cpu[pos].inh_deadline = inh_job.deadline else: on_cpu[pos].inh_deadline = on_cpu[pos].deadline last_time = record.when yield record ############################################################################### # Private Functions ############################################################################### # Internal representation of a Job class Job(object): def __init__(self, record): self.pid = record.pid self.cpu = record.cpu self.job = record.job self.deadline = None self.inh_deadline = None self.exe_pid = None self.is_complete = False self.is_blocked = False self.inversion_start = None self.inversion_end = None self.inversion_start_id = None self.inversion_start_triggering_event_id = None def __str__(self): return "(%d.%d:%d)" % (self.pid,self.job,self.deadline) # C-EDF errors: the start or end of an inversion class Error(object): id = 0 def __init__(self, job, c, off_cpu, on_cpu,first_event_this_timestamp,pi_type): Error.id += 1 self.id = Error.id self.job = copy.copy(job) self.off_cpu = copy.copy(off_cpu) self.on_cpu = copy.copy(on_cpu) self.record_type = 'error' self.triggering_event_id = first_event_this_timestamp if job.inversion_end is None: self.type_name = pi_type+'inversion_start' job.inversion_start_id = self.id job.inversion_start_triggering_event_id = self.triggering_event_id else: self.type_name = pi_type+'_inversion_end' self.inversion_start_id = job.inversion_start_id self.inversion_start_triggering_event_id = job.inversion_start_triggering_event_id # Returns the position of a Job in a list, or None def _find_job(record,list): for i in range(0,len(list)): if list[i].pid == record.pid and list[i].job == record.job: return i return None # Returns the position of a exe task for work item in a list, or None def _find_work_carrier(record,list): for i in range(0,len(list)): if list[i].pid == exe_pid: return i return None # Returns the position of a inheritanced Job in a list, or None def _find_inh_job(record,list): for i in range(0,len(list)): if list[i].pid == record.inh_pid: return i return None # Return records for any inversion_starts and inversion_ends def _cedf_check(off_cpu,on_cpu,when,m,first_event_this_timestamp): # List of error records to be returned errors = [] # List of all jobs that are contending for the CPU (neither complete nor # blocked) all = [] for x in on_cpu: if x.is_complete is not True and x.is_blocked is not True: all.append(x) for x in off_cpu: if x.is_blocked is not True: all.append(x) # Sort by on_cpu and then by deadline. sort() is guaranteed to be stable. # Thus, this gives us jobs ordered by deadline with preference to those # actually running. all.sort(key=lambda x: 0 if (x in on_cpu) else 1) all.sort(key=lambda x: x.deadline) # Check those that actually should be running, to look for priority # inversions for x in range(0,min(m,len(all))): job = all[x] # It's not running and an inversion_start has not been recorded if job not in on_cpu and job.inversion_start is None: job.inversion_start = when errors.append(Error(job, off_cpu, on_cpu, first_event_this_timestamp,"Task")) # It is running and an inversion_start exists (i.e. it it still # marked as being inverted) elif job in on_cpu and job.inversion_start is not None: job.inversion_end = when errors.append(Error(job, off_cpu, on_cpu, first_event_this_timestamp,"Task")) job.inversion_start = None job.inversion_end = None # Check those that actually should not be running, to record the end of any # priority inversions for x in range(m,len(all)): job = all[x] if job not in on_cpu and job.inversion_start is not None: job.inversion_end = when errors.append(Error(job, off_cpu, on_cpu, first_event_this_timestamp,"Task")) job.inversion_start = None job.inversion_end = None # Look for priority inversions among blocked tasks and end them all = filter(lambda x:x.is_blocked and x.inversion_start is not None, on_cpu + off_cpu) for job in all: job.inversion_end = when errors.append(Error(job,off_cpu, on_cpu, first_event_this_timestamp,"Task")) job.inversion_start = None job.inversion_end = None return errors # Return records for any inversion_starts and inversion_ends def _cedf_check_work(off_cpu,on_cpu, work_off_cpu,work_on_cpu, when,m,first_event_this_timestamp): # List of error records to be returned errors = [] #Look for all work is work_off_cpu (not running) for work in work_off_cpu: _on = _find_job(work,on_cpu) # List of all jobs that are contending for the CPU (neither complete nor # blocked) all = [] for x in on_cpu: if x.is_complete is not True and x.is_blocked is not True: all.append(x) for x in off_cpu: if x.is_blocked is not True: all.append(x) # Sort by on_cpu and then by deadline. sort() is guaranteed to be stable. # Thus, this gives us jobs ordered by deadline with preference to those # actually running. all.sort(key=lambda x: 0 if (x in on_cpu) else 1) all.sort(key=lambda x: x.deadline) # owner task is blocked or completed if work not in all and work.inversion_start is not None: work.inversion_end = when errors.append(Error(work, work_off_cpu, work_on_cpu, first_event_this_timestamp,"Work_Item")) work.inversion_start = None work.inversion_end = None continue elif work in all: pos = _find_job(work,all) # look for priorit of owner task # owner task is m-priority task if pos in range(0,min(m,len(all))): # if the owner task is not running if work not in on_cpu : if work.inversion_start is None: work.inversion_start = when errors.append(Error(work, work_off_cpu, work_on_cpu, first_event_this_timestamp,"Work_Item")) # if the owner task is running if work in on_cpu and work.inversion_start is not None: work.inversion_end = when errors.append(Error(work, work_off_cpu, work_on_cpu, first_event_this_timestamp,"Work_Item")) work.inversion_start = None work.inversion_end = None # owner task is not m-priority task elif pos in range(m,len(all)): if work.inversion_start is not None: work.inversion_end = when errors.append(Error(work, work_off_cpu, work_on_cpu, first_event_this_timestamp,"Work_Item")) work.inversion_start = None work.inversion_end = None # have klitirqd take care of for work in work_on_cpu: klit_pos = _find_work_carrier(work,on_cpu) # if the klitirqd task is running and is not blocked if klit_pos is not None and on_cpu[klit_pos].is_blocked == False and work.inversion_start is not None: work.inversion_end = when errors.append(Error(work, work_off_cpu, work_on_cpu, first_event_this_timestamp,"Work_Item")) work.inversion_start = None work.inversion_end = None continue _on = _find_job(work,on_cpu) # List of all jobs that are contending for the CPU (neither complete nor # blocked) all = [] for x in on_cpu: if x.is_complete is not True and x.is_blocked is not True: all.append(x) for x in off_cpu: if x.is_blocked is not True: all.append(x) # Sort by on_cpu and then by deadline. sort() is guaranteed to be stable. # Thus, this gives us jobs ordered by deadline with preference to those # actually running. all.sort(key=lambda x: 0 if (x in on_cpu) else 1) all.sort(key=lambda x: x.deadline) # owner task is blocked or completed if work not in all and work.inversion_start is not None: work.inversion_end = when errors.append(Error(work, work_off_cpu, work_on_cpu, first_event_this_timestamp,"Work_Item")) work.inversion_start = None work.inversion_end = None continue elif work in all: pos = _find_job(work,all) # look for priorit of owner task # owner task is m-priority task if pos in range(0,min(m,len(all))): # if the owner task is not running if work not in on_cpu : if work.inversion_start is None: work.inversion_start = when errors.append(Error(work, work_off_cpu, work_on_cpu, first_event_this_timestamp,"Work_Item")) # if the owner task is running if work in on_cpu and work.inversion_start is not None: work.inversion_end = when errors.append(Error(work, work_off_cpu, work_on_cpu, first_event_this_timestamp,"Work_Item")) work.inversion_start = None work.inversion_end = None # owner task is not m-priority task elif pos in range(m,len(all)): if work.inversion_start is not None: work.inversion_end = when errors.append(Error(work, work_off_cpu, work_on_cpu, first_event_this_timestamp,"Work_Item")) work.inversion_start = None work.inversion_end = None return errors # Return records for any inversion_starts and inversion_ends def _cedf_check_tasklet(off_cpu,on_cpu, tasklet_off_cpu, tasklet_on_cpu, when,m,first_event_this_timestamp): # List of error records to be returned errors = [] #Look for all tasklet is tasklet_off_cpu (not running) for tasklet in tasklet_off_cpu: _on = _find_job(tasklet,on_cpu) # List of all jobs that are contending for the CPU (neither complete nor # blocked) all = [] for x in on_cpu: if x.is_complete is not True and x.is_blocked is not True: all.append(x) for x in off_cpu: if x.is_blocked is not True: all.append(x) # Sort by on_cpu and then by deadline. sort() is guaranteed to be stable. # Thus, this gives us jobs ordered by deadline with preference to those # actually running. all.sort(key=lambda x: 0 if (x in on_cpu) else 1) all.sort(key=lambda x: x.deadline) # owner task is blocked or completed if tasklet not in all and tasklet.inversion_start is not None: tasklet.inversion_end = when errors.append(Error(tasklet, tasklet_off_cpu, tasklet_on_cpu, first_event_this_timestamp,"Tasklet")) tasklet.inversion_start = None tasklet.inversion_end = None continue elif tasklet in all: pos = _find_job(tasklet,all) # look for priorit of owner task # owner task is m-priority task in the cluster if pos in range(0,min(m,len(all))): # if the owner task is not running if tasklet not in on_cpu : if tasklet.inversion_start is None: tasklet.inversion_start = when errors.append(Error(tasklet, tasklet_off_cpu, tasklet_on_cpu, first_event_this_timestamp,"Tasklet")) # if the owner task is running if tasklet in on_cpu and tasklet.inversion_start is not None: tasklet.inversion_end = when errors.append(Error(tasklet, tasklet_off_cpu, tasklet_on_cpu, first_event_this_timestamp,"Tasklet")) tasklet.inversion_start = None tasklet.inversion_end = None # owner task is m-priority task elif pos in range(m,len(all)): if tasklet.inversion_start is not None: tasklet.inversion_end = when errors.append(Error(tasklet, tasklet_off_cpu, tasklet_on_cpu, first_event_this_timestamp,"Tasklet")) tasklet.inversion_start = None tasklet.inversion_end = None #Look for all tasklet is tasklet_on_cpu (running) for tasklet in tasklet_on_cpu: if tasklet.inversion_start is not None: tasklet.inversion_end = when errors.append(Error(tasklet, tasklet_off_cpu, tasklet_on_cpu, first_event_this_timestamp,"Tasklet")) tasklet.inversion_start = None tasklet.inversion_end = None return errors