#!/usr/bin/env python """The data structures to store a schedule (task system), along with all the job releases and other events that have occurred for each task. This gives a high-level representation of a schedule that can be converted to, say, a graphic.""" from graph import * from collections import * import util import copy EVENT_LIST = None SPAN_EVENTS = None class TimeSlotArray(object): """Represents another way of organizing the events. This structure organizes events by the (approximate) time at which they occur. Events that occur at approximately the same time are assigned the same ``slot'', and each slot organizes its events by task number as well as by CPU.""" TASK_LIST = 0 CPU_LIST = 1 LIST_TYPES = (TASK_LIST, CPU_LIST) PRE_ITEM_NO = -1 POST_ITEM_NO = -2 PRE_SLOT = 'pre' POST_SLOT = 'post' ALL_SLOT = 'all' def __init__(self, time_per_maj=None, num_tasks=0, num_cpus=0): if time_per_maj is None: self.array = None return self.time_per_maj = time_per_maj self.list_sizes = { TimeSlotArray.TASK_LIST : num_tasks, TimeSlotArray.CPU_LIST : num_cpus} self.array = {} self.min_slot = None self.max_slot = None for type in self.list_sizes: num = self.list_sizes[type] self.array[type] = [] for i in range(0, num + 2): # two more for pre and post # for each slot in the array, we need a list of all events under this type # (for example, a list of all events that occur in this time slot, indexed # by task). self.array[type].append(dict(zip(EVENT_LIST, [{} for j in range(0, len(EVENT_LIST))]))) def get_time_slot(self, time): return int(time // self.time_per_maj) def put_event_in_slot(self, list_type, no, klass, slot, event): if slot not in self.array[list_type][no][klass]: self.array[list_type][no][klass][slot] = [] self.array[list_type][no][klass][slot].append(event) if self.min_slot is None or slot < self.min_slot: self.min_slot = slot if self.max_slot is None or slot > self.max_slot: self.max_slot = slot def add_event_to_time_slot(self, event, item_nos): task_no = event.get_job().get_task().get_task_no() cpu = event.get_cpu() slot = self.get_time_slot(event.get_time()) for type in item_nos: self.put_event_in_slot(type, item_nos[type], event.__class__, slot, event) def fill_span_event_from_start(self, event, item_nos): end_slot = None if event.corresp_end_event is None: end_slot = self.get_time_slot(event.get_schedule().end) + 1 else: end_slot = self.get_time_slot(event.corresp_end_event.get_time()) start_slot = self.get_time_slot(event.get_time()) for slot in range(start_slot + 1, end_slot): dummy = event.dummy_class() dummy.corresp_start_event = event dummy.corresp_end_event = event.corresp_end_event for type in item_nos: self.put_event_in_slot(type, item_nos[type], event.dummy_class, slot, dummy) def fill_span_event_from_end(self, event, item_nos): start_slot = None if event.corresp_start_event is None: start_slot = self.get_time_slot(event.get_schedule().start) - 1 else: start_slot = self.get_time_slot(event.corresp_start_event.get_time()) end_slot = self.get_time_slot(event.get_time()) for slot in range(start_slot + 1, end_slot): dummy = event.dummy_class() dummy.corresp_start_event = event.corresp_start_event dummy.corresp_end_event = event for type in item_nos: self.put_event_in_slot(type, item_nos[type], event.dummy_class, slot, dummy) def add_item_dummies(self, sched): start = sched.get_time_bounds()[0] for task in sched.get_tasks().values(): dummy = TaskDummy(start, task) self.put_event_in_slot(TimeSlotArray.TASK_LIST, task.get_task_no(), dummy.__class__, TimeSlotArray.PRE_SLOT, dummy) self.put_event_in_slot(TimeSlotArray.CPU_LIST, TimeSlotArray.PRE_ITEM_NO, dummy.__class__, TimeSlotArray.ALL_SLOT, dummy) for i in range(0, self.list_sizes[TimeSlotArray.CPU_LIST]): dummy = CPUDummy(start, sched, i) self.put_event_in_slot(TimeSlotArray.CPU_LIST, i, dummy.__class__, TimeSlotArray.PRE_SLOT, dummy) self.put_event_in_slot(TimeSlotArray.TASK_LIST, TimeSlotArray.PRE_ITEM_NO, dummy.__class__, TimeSlotArray.ALL_SLOT, dummy) def get_events(self, slots, list_type, event_types): for type in event_types: for slot in slots: for no in slots[slot]: if slot in self.array[list_type][no][type]: for event in self.array[list_type][no][type][slot]: yield event def get_slots(self, slots, start, end, start_no, end_no, list_type): if start > end: raise ValueError('Litmus is not a time machine') if start_no > end_no: raise ValueError('start no should be less than end no') if self.min_slot is None: # no events ever got added return {} start_slot = self.get_time_slot(start) end_slot = self.get_time_slot(end) + 1 item_no_list = [] if start_no < 0: item_no_list.append(TimeSlotArray.PRE_ITEM_NO) if end_no >= self.list_sizes[list_type]: item_no_list.append(TimeSlotArray.POST_ITEM_NO) start_no = max(0, start_no) end_no = min(self.list_sizes[list_type] - 1, end_no) item_no_list += range(start_no, end_no + 1) edge_slots = [TimeSlotArray.ALL_SLOT] if start_slot <= self.min_slot: edge_slots.append(TimeSlotArray.PRE_SLOT) if end_slot >= self.max_slot: edge_slots.append(TimeSlotArray.POST_SLOT) for slot in edge_slots: slots[slot] = {} for no in item_no_list: slots[slot][no] = None for slot in xrange(start_slot, end_slot + 1): if slot not in slots: slots[slot] = {} for no in item_no_list: slots[slot][no] = None class Schedule(object): """The total schedule (task system), consisting of a certain number of tasks.""" def __init__(self, name, num_cpus, task_list=[]): self.name = name self.tasks = {} self.task_list = [] self.selected = {} self.time_slot_array = None self.cur_task_no = 0 self.num_cpus = num_cpus for task in task_list: self.add_task(task) def get_selected(self): return self.selected def set_selected(self, selected): self.selected = selected def add_selected(self, selected): for layer in selected: if layer not in self.selected: self.selected[layer] = {} for event in selected[layer]: if event not in self.selected: self.selected[layer][event] = {} for graph in selected[layer][event]: self.selected[layer][event][graph] = selected[layer][event][graph] def remove_selected(self, selected): for layer in selected: if layer in self.selected: for event in selected[layer]: if event in self.selected[layer]: del self.selected[layer][event] def set_time_params(self, time_per_maj=None): self.time_per_maj = time_per_maj if self.time_per_maj is None: self.time_slot_array = TimeSlotArray() return self.time_slot_array = TimeSlotArray(self.time_per_maj, len(self.task_list), self.num_cpus) def get_time_slot_array(self): return self.time_slot_array def get_time_bounds(self): return (self.start, self.end) def scan(self, time_per_maj): print "Scanning" self.start = None self.end = None self.sort_task_nos_numeric() self.set_time_params(time_per_maj) # we scan the graph task by task, and job by job for task_no, task in enumerate(self.get_task_list()): switches = {} for event in EVENT_LIST: switches[event] = None cur_cpu = [Event.NO_CPU] for job_no in sorted(task.get_jobs().keys()): job = task.get_jobs()[job_no] for event_time in sorted(job.get_events().keys()): # could have multiple events at the same time (unlikely but possible) for event in job.get_events()[event_time]: event.scan(cur_cpu, switches) # What if one of the initial "span events" (switch to or inversion starting) never got a # corresponding end event? Well, then we assume that the end event was simply outside of # the range of whatever we read in. So we need to fill dummies starting from the initial # event all the way to the end of the graph, so that the renderer can see the event no matter # how far the user scrolls to the right. for event in EVENT_LIST: switch_event = switches[event] if switch_event is not None: switch_event.fill_span_event_from_start() # add events that correspond to the tasks and CPUS, at the very beginning self.time_slot_array.add_item_dummies(self) print "Done scanning" def add_task(self, task): if task.name in self.tasks: raise ValueError("task already in list!") self.tasks[task.name] = task self.task_list.append(task) task.schedule = self task.task_no = self.cur_task_no self.cur_task_no += 1 def sort_task_nos_numeric(self): # sort task numbers by the numeric value of the task names. nums = [] for task_name in self.tasks: nums.append((int(task_name), task_name)) nums.sort(key=lambda t: t[0]) for no, task in enumerate(nums): self.tasks[task[1]].task_no = no def get_tasks(self): return self.tasks def get_task_list(self): return self.task_list def get_name(self): return self.name def get_num_cpus(self): return self.num_cpus def deepcopy_selected(selected): selected_copy = {} for layer in selected: selected_copy[layer] = copy.copy(selected[layer]) return selected_copy def _format_time(time, unit): if time is None: return '(None)' return util.format_float(unit.nsec_to_native(time), Event.NUM_DEC_PLACES) \ + ' ' + str(unit) class Task(object): """Represents a task, including the set of jobs that were run under this task.""" def __init__(self, name, job_list=[]): self.name = name self.jobs = {} self.task_no = None self.schedule = None for job in job_list: self.add_job(job) def add_job(self, job): if job.job_no in self.jobs: raise ScheduleError("a job is already being released at this time for this task") self.jobs[job.job_no] = job job.task = self def get_schedule(self): return self.schedule def get_jobs(self): return self.jobs def get_task_no(self): return self.task_no def get_name(self): return self.name class Job(object): """Represents a job, including everything that happens related to the job""" def __init__(self, job_no, event_list=[]): self.job_no = job_no self.events = {} self.task = None self.start_run_time = None self.end_run_time = None self.release_time = None self.deadline_time = None for event in event_list: self.add_event(event) def get_name(self): return 'Job ' + str(self.job_no) + ' for task ' + str(self.get_task().get_task_no()) def str_long(self, unit): duration = None if self.start_run_time is not None and self.end_run_time is not None: duration = self.end_run_time - self.start_run_time return_str = 'Job Information\n---------------' + \ '\nTask Name: ' + str(self.get_task().get_name()) + \ '\n(Task no., Job no.): ' + str((self.get_task().get_task_no(), \ self.get_job_no())) + \ '\nStart Run Time: ' + _format_time(self.start_run_time, unit) if duration is not None: return_str += '\nDuration: ' + _format_time(duration, unit) return_str += '\nEnd Run Time: ' + _format_time(self.end_run_time, unit) \ + '\nRelease Time: ' + _format_time(self.release_time, unit) \ + '\nDeadline Time: ' + _format_time(self.deadline_time, unit) return return_str def str_run_short(self, unit): start = self.start_run_time end = self.end_run_time duration = None if start is not None and end is not None: duration = end - start return_str = self.get_name() + ': START=' + _format_time(start, unit) if duration is not None: return_str += ', DUR=' + _format_time(duration, unit) return_str += ', END=' + _format_time(end, unit) return return_str def str_rd_short(self, unit): return self.get_name() + ': RELEASE=' + _format_time(self.release_time, unit) \ + ', DEADLINE=' + _format_time(self.deadline_time, unit) def add_event(self, event): if event.time not in self.events: self.events[event.get_time()] = [] self.events[event.get_time()].append(event) event.job = self # update some statistics if isinstance(event, ReleaseEvent): if self.release_time is None or event.get_time() < self.release_time: self.release_time = event.get_time() elif isinstance(event, DeadlineEvent): if self.deadline_time is None or event.get_time() > self.deadline_time: self.deadline_time = event.get_time() else: if self.start_run_time is None or event.get_time() < self.start_run_time: self.start_run_time = event.get_time() if self.end_run_time is None or event.get_time() > self.end_run_time: self.end_run_time = event.get_time() def get_events(self): return self.events def get_task(self): return self.task def get_job_no(self): return self.job_no class DummyEvent(object): """Represents some event that occurs, but might not actually be a full-fledged ``event'' in the schedule. It might instead be a dummy event added by the application to speed things up or keep track of something. Such an event won't be added to the schedule tree, but might appear in the time slot array.""" def __init__(self, time, cpu): self.time = time self.cpu = cpu self.job = None self.layer = None def get_time(self): return self.time def get_cpu(self): return self.cpu def get_schedule(self): return self.get_task().get_schedule() def get_task(self): return self.get_job().get_task() def get_job(self): return self.job def get_layer(self): return self.layer def is_selected(self): """Returns whether the event has been selected by the user. (needed for rendering)""" selected = self.get_schedule().get_selected() return self.get_layer() in selected and self in selected[self.get_layer()] def render(self, graph, layer, prev_events, selectable=False): """Method that the visualizer calls to tell the event to render itself Obviously only implemented by subclasses (actual event types) ``Rendering'' can mean either actually drawing the event or just adding it as a selectable region. This is controlled by the ``selectable'' parameter""" raise NotImplementdError class Event(DummyEvent): """Represents an event that occurs while a job is running (e.g. get scheduled on a CPU, block, ...)""" NO_CPU = -1 NUM_DEC_PLACES = 2 def __init__(self, time, cpu): super(Event, self).__init__(time, cpu) self.erroneous = False def get_name(self): raise NotImplementedError def str_short(self, unit): return self.get_name() + self._common_str() + ', TIME=' + \ util.format_float(unit.nsec_to_native(self.get_time()), Event.NUM_DEC_PLACES) + \ ' ' + str(unit) def str_long(self, unit): """Prints the event as a string, in ``long'' form.""" return 'Event Information\n-----------------\n' + \ 'Event Type: ' + self.get_name() + \ '\nTask Name: ' + str(self.get_job().get_task().get_name()) + \ '\n(Task no., Job no.): ' + str((self.get_job().get_task().get_task_no(), \ self.get_job().get_job_no())) + \ '\nCPU: ' + str(self.get_cpu()) + \ '\nTime: ' + _format_time(self.get_time(), unit) + \ '\n\n' + self.get_job().str_long(unit) def _common_str(self): job = self.get_job() task = job.get_task() return ' for task ' + str(task.get_name()) + ': (TASK, JOB)=' + str((task.get_task_no(), \ job.get_job_no())) + ', CPU=' + str(self.get_cpu()) def is_erroneous(self): """An erroneous event is where something with the event is not quite right, something significantly wrong that we don't have logical information telling us how we should render the event.""" return self.erroneous def scan(self, cur_cpu, switches, item_nos=None): """Part of the procedure that walks through all the events and sets some parameters that are unknown at first. For instance, a SwitchAwayEvent should know when the previous corresponding SwitchToEvent occurred, but the data does not tell us this, so we have to figure that out on our own by scanning through the events. ``cur_cpu'' gives the current CPU at this time in the scan, and ``switches'' gives the last time a certain switch (e.g. SwitchToEvent, InversionStartEvent) occurred""" time = self.get_time() sched = self.get_schedule() if sched.start is None or time < sched.start: sched.start = time if sched.end is None or time > sched.end: sched.end = time if item_nos is None: item_nos = { TimeSlotArray.TASK_LIST : self.get_task().get_task_no(), TimeSlotArray.CPU_LIST : self.get_cpu() } sched.get_time_slot_array().add_event_to_time_slot(self, item_nos) self.fill_span_event_from_end() def fill_span_event_from_start(self): """This method exists for events that can ``range'' over a period of time (e.g. SwitchAway and SwitchTo). In case a start event is not paired with an end event, or vice versa, we want to fill in dummy events to range all the way to the beginning or end. Since most events occur only at a specific time, this is usually a no-op.""" pass def fill_span_event_from_end(self): """The mirror image of the last method.""" pass class SpanEvent(Event): def __init__(self, time, cpu, dummy_class): super(SpanEvent, self).__init__(time, cpu) self.dummy_class = dummy_class class SpanDummy(DummyEvent): def __init__(self): super(SpanDummy, self).__init__(None, None) def get_task(self): if self.corresp_start_event is not None: return self.corresp_start_event.get_task() if self.corresp_end_event is not None: return self.corresp_end_event.get_task() return None def get_schedule(self): if self.corresp_start_event is not None: return self.corresp_start_event.get_schedule() if self.corresp_end_event is not None: return self.corresp_end_event.get_schedule() return None def get_job(self): if self.corresp_start_event is not None: return self.corresp_start_event.get_job() if self.corresp_end_event is not None: return self.corresp_end_event.get_job() return None def get_cpu(self): if self.corresp_start_event is not None: return self.corresp_start_event.get_cpu() if self.corresp_end_event is not None: return self.corresp_end_event.get_cpu() return None def get_time(self): if self.corresp_start_event is not None: return self.corresp_start_event.get_time() if self.corresp_end_event is not None: return self.corresp_end_event.get_time() return None class StartSpanEvent(SpanEvent): def str_short(self, unit): if self.corresp_end_event is None: return super(StartSpanEvent, self).str_short(unit) start = self.get_time() end = self.corresp_end_event.get_time() duration = end - start return self.get_name() + self._common_str() + ', START=' \ + _format_time(start, unit) \ + ', DUR=' + _format_time(duration, unit) \ + ', END=' + _format_time(end, unit) def str_long(self, unit): if self.corresp_end_event is None: return super(StartSpanEvent, self).str_long(unit) else: start = self.get_time() end = self.corresp_end_event.get_time() duration = end - start return 'Event Type: ' + self.get_name() + \ '\nTask Name: ' + str(self.get_job().get_task().get_name()) + \ '\n(Task no., Job no.): ' + str((self.get_job().get_task().get_task_no(), \ self.get_job().get_job_no())) + \ '\nCPU: ' + str(self.get_cpu()) + \ '\nStart: ' + _format_time(start, unit) + \ '\nDuration: ' + _format_time(duration, unit) + \ '\nEnd: ' + _format_time(end, unit) + \ '\n\n' + self.get_job().str_long(unit) def fill_span_event_from_start(self): item_nos = { TimeSlotArray.TASK_LIST : self.get_task().get_task_no(), TimeSlotArray.CPU_LIST : self.get_cpu() } self.get_schedule().get_time_slot_array().fill_span_event_from_start(self, item_nos) class EndSpanEvent(SpanEvent): def str_short(self, unit): if self.corresp_start_event is None: return super(EndSpanEvent, self).str_short(unit) return self.corresp_start_event.str_short(unit) def str_long(self, unit): if self.corresp_start_event is None: return super(EndSpanEvent, self).str_long(unit) else: return self.corresp_start_event.str_long(unit) def fill_span_event_from_end(self): item_nos = { TimeSlotArray.TASK_LIST : self.get_task().get_task_no(), TimeSlotArray.CPU_LIST : self.get_cpu() } self.get_schedule().get_time_slot_array().fill_span_event_from_end(self, item_nos) class SuspendEvent(Event): def __init__(self, time, cpu): super(SuspendEvent, self).__init__(time, cpu) self.layer = Canvas.MIDDLE_LAYER def get_name(self): return 'Suspend' def scan(self, cur_cpu, switches): if self.get_cpu() != cur_cpu[0]: self.erroneous = True #fprint "suspending on a CPU different from the CPU we are on!" super(SuspendEvent, self).scan(cur_cpu, switches) def render(self, graph, layer, prev_events, selectable=False): if layer == self.layer: prev_events[self] = None if selectable: graph.add_sel_suspend_triangle_at_time(self.get_time(), self.get_job().get_task().get_task_no(), self.get_cpu(), self) else: graph.draw_suspend_triangle_at_time(self.get_time(), self.get_job().get_task().get_task_no(), self.get_cpu(), self.is_selected()) class ResumeEvent(Event): def __init__(self, time, cpu): super(ResumeEvent, self).__init__(time, cpu) self.layer = Canvas.MIDDLE_LAYER def get_name(self): return 'Resume' def scan(self, cur_cpu, switches): if cur_cpu[0] != Event.NO_CPU and cur_cpu[0] != self.get_cpu(): self.erroneous = True #print "Resuming when currently scheduled on a CPU, but on a different CPU from the current CPU!" super(ResumeEvent, self).scan(cur_cpu, switches) def render(self, graph, layer, prev_events, selectable=False): if layer == self.layer: prev_events[self] = None if selectable: graph.add_sel_resume_triangle_at_time(self.get_time(), self.get_job().get_task().get_task_no(), self.get_cpu(), self) else: graph.draw_resume_triangle_at_time(self.get_time(), self.get_job().get_task().get_task_no(), self.get_cpu(), self.is_selected()) class CompleteEvent(Event): def __init__(self, time, cpu): super(CompleteEvent, self).__init__(time, cpu) self.layer = Canvas.TOP_LAYER def get_name(self): return 'Complete' def scan(self, cur_cpu, switches): super(CompleteEvent, self).scan(cur_cpu, switches) def render(self, graph, layer, prev_events, selectable=False): if layer == Canvas.TOP_LAYER: prev_events[self] = None if selectable: graph.add_sel_completion_marker_at_time(self.get_time(), self.get_job().get_task().get_task_no(), self.get_cpu(), self) else: graph.draw_completion_marker_at_time(self.get_time(), self.get_job().get_task().get_task_no(), self.get_cpu(), self.is_selected()) class SwitchToEvent(StartSpanEvent): def __init__(self, time, cpu): super(SwitchToEvent, self).__init__(time, cpu, IsRunningDummy) self.layer = Canvas.BOTTOM_LAYER self.corresp_end_event = None def get_name(self): if self.corresp_end_event is None: return 'Switch To (w/o Switch Away)' else: return 'Scheduled' def scan(self, cur_cpu, switches): old_cur_cpu = cur_cpu[0] cur_cpu[0] = self.get_cpu() switches[SwitchToEvent] = self self.corresp_end_event = None if old_cur_cpu != Event.NO_CPU: self.erroneous = True #print "currently scheduled somewhere, can't switch to a CPU" super(SwitchToEvent, self).scan(cur_cpu, switches) def render(self, graph, layer, prev_events, selectable=False): if layer == self.layer: end_time = None clip = None if self.corresp_end_event is None: end_time = self.get_job().get_task().get_schedule().end clip = AlignMode.RIGHT else: end_time = self.corresp_end_event.get_time() prev_events[self] = None cpu = self.get_cpu() task_no = self.get_job().get_task().get_task_no() if selectable: graph.add_sel_bar_at_time(self.get_time(), end_time, task_no, cpu, self) else: graph.draw_bar_at_time(self.get_time(), end_time, task_no, cpu, self.get_job().get_job_no(), clip, self.is_selected()) class SwitchAwayEvent(EndSpanEvent): def __init__(self, time, cpu): super(SwitchAwayEvent, self).__init__(time, cpu, IsRunningDummy) self.layer = Canvas.BOTTOM_LAYER self.corresp_start_event = None def get_name(self): if self.corresp_start_event is None: return 'Switch Away (w/o Switch To)' else: return 'Scheduled' def scan(self, cur_cpu, switches): old_cur_cpu = cur_cpu[0] self.corresp_start_event = switches[SwitchToEvent] cur_cpu[0] = Event.NO_CPU switches[SwitchToEvent] = None if self.corresp_start_event is not None: self.corresp_start_event.corresp_end_event = self if self.get_cpu() != old_cur_cpu: self.erroneous = True #print "switching away from a CPU different from the CPU we are currently on" if self.corresp_start_event is None: self.erroneous = True #print "switch away was not matched by a corresponding switch to" elif self.get_time() < self.corresp_start_event.get_time(): self.erroneous = True #print "switching away from a processor before we switched to it?!" super(SwitchAwayEvent, self).scan(cur_cpu, switches) def render(self, graph, layer, prev_events, selectable=False): if self.corresp_start_event is None: # We never found a corresponding start event. In that case, we can assume it lies # in some part of the trace that was never read in. So draw a bar starting from # the very beginning. if layer == self.layer: prev_events[self] = None cpu = self.get_cpu() task_no = self.get_job().get_task().get_task_no() start = self.get_job().get_task().get_schedule().start if selectable: graph.add_sel_bar_at_time(start, self.get_time(), task_no, cpu, self) else: graph.draw_bar_at_time(start, self.get_time(), task_no, cpu, self.get_job().get_job_no(), AlignMode.LEFT, self.is_selected()) else: if self.corresp_start_event in prev_events: return # already rendered the bar self.corresp_start_event.render(graph, layer, prev_events, selectable) class ReleaseEvent(Event): def __init__(self, time, cpu): super(ReleaseEvent, self).__init__(time, cpu) self.layer = Canvas.TOP_LAYER def get_name(self): return 'Release' def scan(self, cur_cpu, switches): item_nos = { TimeSlotArray.TASK_LIST : self.get_task().get_task_no(), TimeSlotArray.CPU_LIST : TimeSlotArray.POST_ITEM_NO } super(ReleaseEvent, self).scan(cur_cpu, switches, item_nos) def render(self, graph, layer, prev_events, selectable=False): prev_events[self] = None if layer == Canvas.TOP_LAYER: if selectable: graph.add_sel_release_arrow_at_time(self.get_time(), self.get_job().get_task().get_task_no(), self) else: graph.draw_release_arrow_at_time(self.get_time(), self.get_job().get_task().get_task_no(), self.get_job().get_job_no(), self.is_selected()) class DeadlineEvent(Event): def __init__(self, time, cpu): super(DeadlineEvent, self).__init__(time, cpu) self.layer = Canvas.TOP_LAYER def get_name(self): return 'Deadline' def scan(self, cur_cpu, switches): item_nos = { TimeSlotArray.TASK_LIST : self.get_task().get_task_no(), TimeSlotArray.CPU_LIST : TimeSlotArray.POST_ITEM_NO } super(DeadlineEvent, self).scan(cur_cpu, switches, item_nos) def render(self, graph, layer, prev_events, selectable=False): prev_events[self] = None if layer == Canvas.TOP_LAYER: if selectable: graph.add_sel_deadline_arrow_at_time(self.get_time(), self.get_job().get_task().get_task_no(), self) else: graph.draw_deadline_arrow_at_time(self.get_time(), self.get_job().get_task().get_task_no(), self.get_job().get_job_no(), self.is_selected()) class InversionStartEvent(StartSpanEvent): def __init__(self, time): super(InversionStartEvent, self).__init__(time, Event.NO_CPU, InversionDummy) self.layer = Canvas.BOTTOM_LAYER self.corresp_end_event = None def get_name(self): if self.corresp_end_event is None: return 'Inversion Start (w/o Inversion End)' else: return 'Priority Inversion' def fill_span_event_from_start(self): item_nos = { TimeSlotArray.TASK_LIST : self.get_task().get_task_no(), TimeSlotArray.CPU_LIST : TimeSlotArray.POST_ITEM_NO } self.get_schedule().get_time_slot_array().fill_span_event_from_start(self, item_nos) def scan(self, cur_cpu, switches): switches[InversionStartEvent] = self self.corresp_end_event = None # the corresp_end_event should already be set item_nos = { TimeSlotArray.TASK_LIST : self.get_task().get_task_no(), TimeSlotArray.CPU_LIST : TimeSlotArray.POST_ITEM_NO } super(InversionStartEvent, self).scan(cur_cpu, switches, item_nos) def render(self, graph, layer, prev_events, selectable=False): if layer == self.layer: end_time = None clip = None if self.corresp_end_event is None: end_time = self.get_job().get_task().get_schedule().end clip = AlignMode.RIGHT else: end_time = self.corresp_end_event.get_time() if layer == self.layer: prev_events[self] = None cpu = self.get_cpu() task_no = self.get_job().get_task().get_task_no() if selectable: graph.add_sel_mini_bar_at_time(self.get_time(), end_time, task_no, cpu, self) else: graph.draw_mini_bar_at_time(self.get_time(), end_time, task_no, cpu, self.get_job().get_job_no(), clip, self.is_selected()) class InversionEndEvent(EndSpanEvent): def __init__(self, time): super(InversionEndEvent, self).__init__(time, Event.NO_CPU, InversionDummy) self.layer = Canvas.BOTTOM_LAYER self.corresp_start_event = None def get_name(self): if self.corresp_start_event is None: return 'Inversion End (w/o Inversion Start)' else: return 'Priority Inversion' def fill_span_event_from_end(self): item_nos = { TimeSlotArray.TASK_LIST : self.get_task().get_task_no(), TimeSlotArray.CPU_LIST : TimeSlotArray.POST_ITEM_NO } self.get_schedule().get_time_slot_array().fill_span_event_from_end(self, item_nos) def scan(self, cur_cpu, switches): self.corresp_start_event = switches[InversionStartEvent] cur_cpu[0] = Event.NO_CPU switches[InversionStartEvent] = None if self.corresp_start_event is not None: self.corresp_start_event.corresp_end_event = self if self.corresp_start_event is None: self.erroneous = True #print "inversion end was not matched by a corresponding inversion start" item_nos = { TimeSlotArray.TASK_LIST : self.get_task().get_task_no(), TimeSlotArray.CPU_LIST : TimeSlotArray.POST_ITEM_NO } super(InversionEndEvent, self).scan(cur_cpu, switches, item_nos) def render(self, graph, layer, prev_events, selectable=False): if self.corresp_start_event is None: # We never found a corresponding start event. In that case, we can assume it lies # in some part of the trace that was never read in. So draw a bar starting from # the very beginning. if layer == self.layer: prev_events[self] = None cpu = self.get_cpu() task_no = self.get_job().get_task().get_task_no() start = self.get_job().get_task().get_schedule().start if selectable: graph.add_sel_mini_bar_at_time(start, self.get_time(), task_no, cpu, self) else: graph.draw_mini_bar_at_time(start, self.get_time(), task_no, cpu, self.get_job().get_job_no(), AlignMode.LEFT, self.is_selected()) else: if self.corresp_start_event in prev_events: return # already rendered the bar self.corresp_start_event.render(graph, layer, prev_events, selectable) class InversionDummy(SpanDummy): def __init__(self): super(InversionDummy, self).__init__() self.layer = Canvas.BOTTOM_LAYER def render(self, graph, layer, prev_events, selectable=False): if self.corresp_start_event is None: if self.corresp_end_event in prev_events: return # we have already been rendered self.corresp_end_event.render(graph, layer, prev_events, selectable) else: if self.corresp_start_event in prev_events: return # we have already been rendered self.corresp_start_event.render(graph, layer, prev_events, selectable) class IsRunningDummy(SpanDummy): def __init__(self): super(IsRunningDummy, self).__init__() self.layer = Canvas.BOTTOM_LAYER def render(self, graph, layer, prev_events, selectable=False): if self.corresp_start_event is None: if self.corresp_end_event in prev_events: return # we have already been rendered self.corresp_end_event.render(graph, layer, prev_events, selectable) else: if self.corresp_start_event in prev_events: return # we have already been rendered self.corresp_start_event.render(graph, layer, prev_events, selectable) class TaskDummy(DummyEvent): """A dummy event that represents an entire task. Used for rendering the task's name.""" def __init__(self, time, task): super(TaskDummy, self).__init__(time, None) self.task = task self.layer = Canvas.BOTTOM_LAYER def get_name(self): return 'Task' def str_short(self, unit): return 'Task ' + self.task.get_name() def str_long(self, unit): return 'Task ' + self.task.get_name() def get_task(self): return self.task def render(self, graph, layer, prev_events, selectable=False): if selectable: graph.add_sel_task_item(self.task.get_task_no(), self) else: graph.draw_task_item(self.task.get_task_no(), self.is_selected()) class CPUDummy(DummyEvent): """A dummy event that represents a certain CPU. Used for rendering the CPU's ``name'' (i.e. ``CPU #n'')""" def __init__(self, time, sched, cpu): super(CPUDummy, self).__init__(time, cpu) self.sched = sched self.layer = Canvas.BOTTOM_LAYER def get_name(self): return 'CPU' def str_short(self, unit): return 'CPU #' + str(self.get_cpu()) def str_long(self, unit): return 'CPU #' + str(self.get_cpu()) def get_task(self): return None def get_schedule(self): # we don't have a job, so get the schedule from the schedule passed in return self.sched def render(self, graph, layer, prev_events, selectable=False): if selectable: graph.add_sel_cpu_item(self.cpu, self) else: graph.draw_cpu_item(self.cpu, self.is_selected()) EVENT_LIST = {SuspendEvent : None, ResumeEvent : None, CompleteEvent : None, SwitchAwayEvent : None, SwitchToEvent : None, ReleaseEvent : None, DeadlineEvent : None, IsRunningDummy : None, InversionStartEvent : None, InversionEndEvent : None, InversionDummy : None, TaskDummy : None, CPUDummy : None} SPAN_START_EVENTS = { SwitchToEvent : IsRunningDummy, InversionStartEvent : InversionDummy } SPAN_END_EVENTS = { SwitchAwayEvent : IsRunningDummy, InversionEndEvent : InversionDummy}