From 44a8ade3ed5dc4810fd95c41dbe8ec3aa2fb0cf7 Mon Sep 17 00:00:00 2001 From: Gary Bressler Date: Mon, 1 Mar 2010 23:46:44 -0500 Subject: Reorganized tree, along with the visualizer --- viz/schedule.py | 571 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 571 insertions(+) create mode 100644 viz/schedule.py (limited to 'viz/schedule.py') diff --git a/viz/schedule.py b/viz/schedule.py new file mode 100644 index 0000000..f842c8d --- /dev/null +++ b/viz/schedule.py @@ -0,0 +1,571 @@ +#!/usr/bin/env python + +"""The data structures to store a schedule (task system), along with all +the job releases and other events that have occurred for each task. This gives +a high-level representation of a schedule that can be converted to, say, a +graphic.""" + +from draw import * +import util + +class TimeSlotArray(object): + """Represents another way of organizing the events. This structure organizes events by + the (approximate) time at which they occur. Events that occur at approximately the same + time are assigned the same ``slot'', and each slot organizes its events by task number + as well as by CPU.""" + + TASK_LIST = 0 + CPU_LIST = 1 + + def __init__(self, start, end, time_per_maj, num_tasks, num_cpus): + self.start = start + self.end = end + self.time_per_maj = time_per_maj + self.list_sizes = { TimeSlotArray.TASK_LIST : num_tasks, TimeSlotArray.CPU_LIST : num_cpus } + self.array = [{TimeSlotArray.TASK_LIST : [{} for i in range(0, num_tasks)], \ + TimeSlotArray.CPU_LIST : [{} for i in range (0, num_cpus)]} \ + for i in range(0, (end - start) // self.time_per_maj + 1)] + + def get_time_slot(self, time): + return int((time - self.start) // self.time_per_maj) + + def add_event_to_time_slot(self, event): + task_no = event.get_job().get_task().get_task_no() + cpu = event.get_cpu() + time_slot = self.get_time_slot(event.get_time()) + + self.array[time_slot][TimeSlotArray.TASK_LIST][task_no][event.__class__] = event + self.array[time_slot][TimeSlotArray.CPU_LIST][cpu][event.__class__] = event + + span_events = { SwitchAwayEvent : IsRunningDummy, InversionEndEvent : InversionDummy} + + for span_event in span_events: + if isinstance(event, span_event) and not event.is_erroneous(): + start_slot = self.get_time_slot(event.corresp_start_event.get_time()) + end_slot = self.get_time_slot(event.get_time()) + for slot in range(start_slot + 1, end_slot): + dummy = span_events[span_event](task_no, cpu) + dummy.corresp_start_event = event.corresp_start_event + self.array[slot][TimeSlotArray.TASK_LIST][task_no][dummy.__class__] = dummy + self.array[slot][TimeSlotArray.CPU_LIST][cpu][dummy.__class__] = dummy + + def iter_over_period(self, start, end, start_no, end_no, list_type, event_types): + if start > end: + raise ValueError('Litmus is not a time machine') + if start_no > end_no: + raise ValueError('start no should be less than end no') + + start_slot = max(0, self.get_time_slot(start)) + end_slot = min(len(self.array), self.get_time_slot(end) + 2) + + start_no = max(0, start_no) + end_no = min(self.list_sizes[list_type] - 1, end_no) + + for slot in range(start_slot, end_slot): + for no in range(start_no, end_no + 1): + for type in event_types: + if type in self.array[slot][list_type][no]: + yield self.array[slot][list_type][no][type] + +class Schedule(object): + """The total schedule (task system), consisting of a certain number of + tasks.""" + + def __init__(self, name, num_cpus, task_list=[]): + self.name = name + self.tasks = {} + self.task_list = [] + self.time_slot_array = None + self.cur_task_no = 0 + self.num_cpus = num_cpus + for task in task_list: + self.add_task(task) + + def set_time_params(self, time_per_maj=None): + if self.get_task_list() is None: + return (0, 0) + + def find_extreme_time_sched(sched, cmp): + def find_extreme_time_task(task, cmp): + def find_extreme_time_job(job, cmp): + extreme_time = None + for time in job.get_events(): + if extreme_time is None or cmp(time, extreme_time) < 0: + extreme_time = time + return extreme_time + + extreme_time = None + for job_no in task.get_jobs(): + time = find_extreme_time_job(task.get_jobs()[job_no], cmp) + if time is not None and (extreme_time is None or cmp(time, extreme_time) < 0): + extreme_time = time + return extreme_time + + extreme_time = None + for task in sched.get_task_list(): + time = find_extreme_time_task(task, cmp) + if time is not None and (extreme_time is None or cmp(time, extreme_time) < 0): + extreme_time = time + + return extreme_time + + def earliest_cmp(x, y): + diff = x - y + if diff > 0.0: + return 1 + elif diff == 0.0: + return 0 + elif diff < 0.0: + return -1 + + def latest_cmp(x, y): + diff = x - y + if diff < 0.0: + return 1 + elif diff == 0.0: + return 0 + elif diff > 0.0: + return -1 + + self.start = find_extreme_time_sched(self, earliest_cmp) + self.end = find_extreme_time_sched(self, latest_cmp) + self.time_per_maj = time_per_maj + self.time_slot_array = None + if self.time_per_maj is not None: + self.time_slot_array = TimeSlotArray(self.start, self.end, time_per_maj, \ + len(self.task_list), self.num_cpus) + + def get_time_slot_array(self): + return self.time_slot_array + + def get_time_bounds(self): + return (self.start, self.end) + + def scan(self, time_per_maj): + self.set_time_params(time_per_maj) + + # we scan the graph task by task, and job by job + switches = {} + for event in EVENT_LIST: + switches[event] = None + for task_no, task in enumerate(self.get_task_list()): + cur_cpu = [Event.NO_CPU] + for job_no in sorted(task.get_jobs().keys()): + job = task.get_jobs()[job_no] + for event_time in sorted(job.get_events().keys()): + # could have multiple events at the same time (unlikely but possible) + for event in job.get_events()[event_time]: + print "task, job, event:", task.name, job.job_no, event.__class__.__name__ + event.scan(cur_cpu, switches) + + def add_task(self, task): + if task.name in self.tasks: + raise ValueError("task already in list!") + self.tasks[task.name] = task + self.task_list.append(task) + task.schedule = self + task.task_no = self.cur_task_no + self.cur_task_no += 1 + + def get_tasks(self): + return self.tasks + + def get_task_list(self): + return self.task_list + + def get_name(self): + return self.name + + def get_num_cpus(self): + return self.num_cpus + +class Task(object): + """Represents a task, including the set of jobs that were run under + this task.""" + + def __init__(self, name, job_list=[]): + self.name = name + self.jobs = {} + self.task_no = None + self.schedule = None + for job in job_list: + self.add_job(job) + + def add_job(self, job): + if job.job_no in self.jobs: + raise ScheduleError("a job is already being released at this time for this task") + self.jobs[job.job_no] = job + job.task = self + + def get_schedule(self): + return self.schedule + + def get_jobs(self): + return self.jobs + + def get_task_no(self): + return self.task_no + + def get_name(self): + return self.name + +class Job(object): + """Represents a job, including everything that happens related to the job""" + def __init__(self, job_no, event_list=[]): + self.job_no = job_no + self.events = {} + self.task = None + for event in event_list: + self.add_event(event) + + def add_event(self, event): + if event.time not in self.events: + self.events[event.time] = [] + self.events[event.time].append(event) + event.job = self + + def get_events(self): + return self.events + + def get_task(self): + return self.task + + def get_job_no(self): + return self.job_no + +class DummyEvent(object): + """Represents some event that occurs, but might not actually be + a full-fledged ``event'' in the schedule. It might instead be a dummy + event added by the application to speed things up or keep track of + something. Such an event won't be added to the schedule tree, but + might appear in the time slot array.""" + + def __init__(self, time, cpu): + self.time = time + self.cpu = cpu + self.job = None + self.layer = None + + def __str__(self): + return '[Dummy Event]' + + def get_time(self): + return self.time + + def get_cpu(self): + return self.cpu + + def get_job(self): + return self.job + + def get_layer(self): + return self.layer + + def render(self, graph, layer, prev_events): + """Method that the visualizer calls to tell the event to render itself + Obviously only implemented by subclasses (actual event types)""" + raise NotImplementdError + +class Event(DummyEvent): + """Represents an event that occurs while a job is running (e.g. get scheduled + on a CPU, block, ...)""" + NO_CPU = -1 + NUM_DEC_PLACES = 2 + + def __init__(self, time, cpu): + super(Event, self).__init__(time, cpu) + self.erroneous = False + self.selected = False + + def __str__(self): + return '[Event]' + + def _common_str(self): + job = self.get_job() + task = job.get_task() + return ' for task ' + str(task.get_name()) + ': (TASK, JOB)=' + str((task.get_task_no(), \ + job.get_job_no())) + ', CPU=' + str(self.get_cpu()) + + def is_erroneous(self): + """An erroneous event is where something with the event is not quite right, + something significantly wrong that we don't have logical information telling + us how we should render the event.""" + return self.erroneous + + def is_selected(self): + """Returns whether the event has been selected by the user. (needed for rendering)""" + return self.selected + + def set_selected(self, sel): + """Sets the event's state to selected.""" + self.selected = sel + + def scan(self, cur_cpu, switches): + """Part of the procedure that walks through all the events and sets + some parameters that are unknown at first. For instance, a SwitchAwayEvent + should know when the previous corresponding SwitchToEvent occurred, but + the data does not tell us this, so we have to figure that out on our own + by scanning through the events. ``cur_cpu'' gives the current CPU at this + time in the scan, and ``switches'' gives the last time a certain switch + (e.g. SwitchToEvent, InversionStartEvent) occurred""" + + self.get_job().get_task().get_schedule().get_time_slot_array().add_event_to_time_slot(self) + +class ErrorEvent(Event): + pass + +class SuspendEvent(Event): + def __init__(self, time, cpu): + super(SuspendEvent, self).__init__(time, cpu) + self.layer = Canvas.MIDDLE_LAYER + + def __str__(self): + return 'Suspend' + self._common_str() + ', TIME=' + util.format_float(self.get_time(), Event.NUM_DEC_PLACES) + + def scan(self, cur_cpu, switches): + if self.get_cpu() != cur_cpu[0]: + self.erroneous = True + print "suspending on a CPU different from the CPU we are on!" + super(SuspendEvent, self).scan(cur_cpu, switches) + + def render(self, graph, layer, prev_events): + if layer == self.layer: + prev_events[self] = None + graph.draw_suspend_triangle_at_time(self.get_time(), self.get_job().get_task().get_task_no(), + self.get_cpu(), self.is_selected()) + graph.add_sel_suspend_triangle_at_time(self.get_time(), self.get_job().get_task().get_task_no(), + self.get_cpu(), self) + +class ResumeEvent(Event): + def __init__(self, time, cpu): + super(ResumeEvent, self).__init__(time, cpu) + self.layer = Canvas.MIDDLE_LAYER + + def __str__(self): + return 'Resume' + self._common_str() + ', TIME=' + util.format_float(self.get_time(), Event.NUM_DEC_PLACES) + + def scan(self, cur_cpu, switches): + if cur_cpu[0] != Event.NO_CPU and cur_cpu[0] != self.get_cpu(): + self.erroneous = True + print "Resuming when currently scheduled on a CPU, but on a different CPU from the current CPU!" + super(ResumeEvent, self).scan(cur_cpu, switches) + + def render(self, graph, layer, prev_events): + if layer == self.layer: + prev_events[self] = None + graph.draw_resume_triangle_at_time(self.get_time(), self.get_job().get_task().get_task_no(), + self.get_cpu(), self.is_selected()) + graph.add_sel_resume_triangle_at_time(self.get_time(), self.get_job().get_task().get_task_no(), + self.get_cpu(), self) + +class CompleteEvent(Event): + def __init__(self, time, cpu): + super(CompleteEvent, self).__init__(time, cpu) + self.layer = Canvas.TOP_LAYER + + def __str__(self): + return 'Complete' + self._common_str() + ', TIME=' + util.format_float(self.get_time(), Event.NUM_DEC_PLACES) + + def scan(self, cur_cpu, switches): + super(CompleteEvent, self).scan(cur_cpu, switches) + + def render(self, graph, layer, prev_events): + if layer == Canvas.TOP_LAYER: + prev_events[self] = None + graph.draw_completion_marker_at_time(self.get_time(), self.get_job().get_task().get_task_no(), + self.get_cpu(), self.is_selected()) + graph.add_sel_completion_marker_at_time(self.get_time(), self.get_job().get_task().get_task_no(), + self.get_cpu(), self) + + +class SwitchAwayEvent(Event): + def __init__(self, time, cpu): + super(SwitchAwayEvent, self).__init__(time, cpu) + self.layer = Canvas.BOTTOM_LAYER + + def __str__(self): + if self.corresp_start_event is None: + return 'Switch Away (w/o Switch To)' + self._common_str() + 'TIME=' \ + + self.get_time() + return str(self.corresp_start_event) + + def scan(self, cur_cpu, switches): + old_cur_cpu = cur_cpu[0] + + self.corresp_start_event = switches[SwitchToEvent] + + cur_cpu[0] = Event.NO_CPU + switches[SwitchToEvent] = None + + if self.corresp_start_event is not None: + self.corresp_start_event.corresp_end_event = self + + if self.get_cpu() != old_cur_cpu: + self.erroneous = True + print "switching away from a CPU different from the CPU we are currently on" + if self.corresp_start_event is None: + self.erroneous = True + print "switch away was not matched by a corresponding switch to" + elif self.get_time() < self.corresp_start_event.get_time(): + self.erroneous = True + print "switching away from a processor before we switched to it?!" + + super(SwitchAwayEvent, self).scan(cur_cpu, switches) + + def render(self, graph, layer, prev_events): + if self.corresp_start_event is None or self.corresp_start_event in prev_events: + return # erroneous switch away or already rendered + self.corresp_start_event.render(graph, layer, prev_events) + +class SwitchToEvent(Event): + def __init__(self, time, cpu): + super(SwitchToEvent, self).__init__(time, cpu) + self.layer = Canvas.BOTTOM_LAYER + + def __str__(self): + if self.corresp_end_event is None: + return 'Switch To (w/o Switch Away)' + self._common_str() + ', TIME=' \ + + self.get_time() + return 'Scheduled' + self._common_str() + ', START=' \ + + util.format_float(self.get_time(), Event.NUM_DEC_PLACES) \ + + ', END=' + util.format_float(self.corresp_end_event.get_time(), Event.NUM_DEC_PLACES) + + def scan(self, cur_cpu, switches): + old_cur_cpu = cur_cpu[0] + cur_cpu[0] = self.get_cpu() + switches[SwitchToEvent] = self + self.corresp_end_event = None + + if old_cur_cpu != Event.NO_CPU: + self.erroneous = True + print "currently scheduled somewhere, can't switch to a CPU" + + super(SwitchToEvent, self).scan(cur_cpu, switches) + + def render(self, graph, layer, prev_events): + if self.is_erroneous(): + return # erroneous switch to + if layer == Canvas.BOTTOM_LAYER: + prev_events[self] = None + cpu = self.get_cpu() + task_no = self.get_job().get_task().get_task_no() + graph.draw_bar_at_time(self.get_time(), self.corresp_end_event.get_time(), + task_no, cpu, self.get_job().get_job_no(), self.is_selected()) + graph.add_sel_bar_at_time(self.get_time(), self.corresp_end_event.get_time(), + task_no, cpu, self) + +class ReleaseEvent(Event): + def __init__(self, time, cpu): + super(ReleaseEvent, self).__init__(time, cpu) + self.layer = Canvas.TOP_LAYER + + def __str__(self): + return 'Release' + self._common_str() + ', TIME=' + util.format_float(self.get_time(), Event.NUM_DEC_PLACES) + + def scan(self, cur_cpu, switches): + super(ReleaseEvent, self).scan(cur_cpu, switches) + + def render(self, graph, layer, prev_events): + prev_events[self] = None + if layer == Canvas.TOP_LAYER: + graph.draw_release_arrow_at_time(self.get_time(), self.get_job().get_task().get_task_no(), + self.get_job().get_job_no(), self.is_selected()) + graph.add_sel_release_arrow_at_time(self.get_time(), self.get_job().get_task().get_task_no(), + self) + +class DeadlineEvent(Event): + def __init__(self, time, cpu): + super(DeadlineEvent, self).__init__(time, cpu) + self.layer = Canvas.TOP_LAYER + + def __str__(self): + return 'Deadline' + self._common_str() + ', TIME=' + util.format_float(self.get_time(), Event.NUM_DEC_PLACES) + + def scan(self, cur_cpu, switches): + super(DeadlineEvent, self).scan(cur_cpu, switches) + + def render(self, graph, layer, prev_events): + prev_events[self] = None + if layer == Canvas.TOP_LAYER: + graph.draw_deadline_arrow_at_time(self.get_time(), self.get_job().get_task().get_task_no(), + self.get_job().get_job_no(), self.is_selected()) + graph.add_sel_deadline_arrow_at_time(self.get_time(), self.get_job().get_task().get_task_no(), + self) + +class InversionStartEvent(ErrorEvent): + def __init__(self, time): + super(InversionStartEvent, self).__init__(time, Event.NO_CPU) + self.layer = Canvas.BOTTOM_LAYER + + def __str__(self): + if self.corresp_end_event is None: + print 'Inversion Start (w/o Inversion End)' + self._common_str() \ + + ', TIME=' + util.format_float(self.get_time(), Event.NUM_DEC_PLACES) + return 'Priority Inversion' + self._common_str() + ', START=' \ + + util.format_float(self.get_time(), Event.NUM_DEC_PLACES) \ + + ', END=' + util.format_float(self.corresp_end_event.get_time(), Event.NUM_DEC_PLACES) + + def scan(self, cur_cpu, switches): + switches[InversionStartEvent] = self + self.corresp_end_event = None + + # the corresp_end_event should already be set + super(InversionStartEvent, self).scan(cur_cpu, switches) + + def render(self, graph, layer, prev_events): + if layer == Canvas.BOTTOM_LAYER: + prev_events[self] = None + cpu = self.get_cpu() + task_no = self.get_job().get_task().get_task_no() + graph.draw_mini_bar_at_time(self.get_time(), self.corresp_end_event.get_time(), + task_no, cpu, self.get_job().get_job_no(), self.is_selected()) + graph.add_sel_mini_bar_at_time(self.get_time(), self.corresp_end_event.get_time(), + task_no, cpu, self) + +class InversionEndEvent(ErrorEvent): + def __init__(self, time): + super(InversionEndEvent, self).__init__(time, Event.NO_CPU) + self.layer = Canvas.BOTTOM_LAYER + + def __str__(self): + if self.corresp_start_event is None: + print 'Inversion End (w/o Inversion Start)' + self._common_str() \ + + ', TIME=' + util.format_float(self.get_time(), Event.NUM_DEC_PLACES) + + return str(self.corresp_start_event) + + def scan(self, cur_cpu, switches): + self.corresp_start_event = switches[InversionStartEvent] + + cur_cpu[0] = Event.NO_CPU + switches[InversionStartEvent] = None + + if self.corresp_start_event is not None: + self.corresp_start_event.corresp_end_event = self + + if self.corresp_start_event is None: + self.erroneous = True + print "inversion end was not matched by a corresponding inversion start" + + super(InversionEndEvent, self).scan(cur_cpu, switches) + + def render(self, graph, layer, prev_events): + if self.corresp_start_event is None or self.corresp_start_event in prev_events: + return # erroneous inversion end or already rendered + self.corresp_start_event.render(graph, layer, prev_events) + +class InversionDummy(DummyEvent): + def render(self, graph, layer, prev_events): + if self.corresp_start_event in prev_events: + return # we have already been rendered + self.corresp_start_event.render(graph, layer, prev_events) + +class IsRunningDummy(DummyEvent): + def render(self, graph, layer, prev_events): + if self.corresp_start_event in prev_events: + return # we have already been rendered + self.corresp_start_event.render(graph, layer, prev_events) + +EVENT_LIST = {SuspendEvent : None, ResumeEvent : None, CompleteEvent : None, SwitchAwayEvent : None, + SwitchToEvent : None, ReleaseEvent : None, DeadlineEvent : None, IsRunningDummy : None, + InversionStartEvent : None, InversionEndEvent : None, InversionDummy : None} -- cgit v1.2.2