aboutsummaryrefslogtreecommitdiffstats
path: root/schedcat/sched/edf/gel_pl.py
blob: 2349f7b0d26ce1571cdc8427ae73572424a66cc2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
"""This module computes tardiness bounds for G-EDF-like schedulers.  Currently
   G-EDF and G-FL (See Erickson and Anderson ECRTS'12) are supported.  This
   module works by analyzing compliant vectors (see Erickson, Devi, Baruah
   ECRTS'10), analyzing the involved piecewise linear functions (see Erickson,
   Guan, Baruah OPODIS'10).  Notation is used as in EA'12.

   The analysis is general enough to support other GEL schedulers trivially.
"""

from __future__ import division

from math import ceil

from fractions import Fraction

from schedcat.util.quantor import forall

import schedcat.sched
if schedcat.sched.using_native:
    import schedcat.sched.native as native

import heapq

class AnalysisDetails:
    def __init__(self, tasks, gel_obj = None):
        num_tasks = len(tasks)
        self.tasks = tasks
        if gel_obj is not None:
            self.bounds = [gel_obj.get_bound(i) for i in range(num_tasks)]
            self.S_i = [gel_obj.get_Si(i) for i in range(num_tasks)]
            self.G_i = [gel_obj.get_Gi(i) for i in range(num_tasks)]

    def max_lateness(self):
        return max([self.bounds[i] - self.tasks[i].deadline
                    for i in range(len(self.tasks))])

def compute_gfl_response_details(no_cpus, tasks, rounds):
    """This function computes response time bounds for the given set of tasks
       using G-FL.  "rounds" determines the number of rounds of binary search;
       using "0" results in using the exact algorithm.
    """
    for task in tasks:
        task.prio_pt = task.deadline - \
                int((no_cpus - 1) / (no_cpus) * task.cost)
    return compute_response_details(no_cpus, tasks, rounds)

def compute_gedf_response_details(no_cpus, tasks, rounds):
    """This function computes response time bounds for a given set of tasks
       using G-EDF.  "rounds" determines the number of rounds of binary search;
       using "0" results in using the exact algorithm.
    """
    for task in tasks:
        task.prio_pt = task.deadline
    return compute_response_details(no_cpus, tasks, rounds)

def compute_response_details(no_cpus, tasks, rounds):
    if (no_cpus == 1) and forall(tasks)(lambda t: t.prio_pt == t.period):
        details = AnalysisDetails(tasks)
        details.bounds = [task.period for task in tasks]
        details.S_i = [0.0 for task in tasks]
        details.G_i = [0.0 for task in tasks]
        return details
    if schedcat.sched.using_native:
        native_ts = schedcat.sched.get_native_taskset(tasks)
        gel_obj = native.GELPl(no_cpus, native_ts, rounds)
        return AnalysisDetails(tasks, gel_obj)
    else:
        return compute_response_bounds(no_cpus, tasks, rounds)

def compute_response_bounds(no_cpus, tasks, rounds):
    """This function computes response time bounds for the given set of tasks
       and priority points.  "rounds" determines the number of rounds of binary
       search; using "0" results in using the exact algorithm.
    """

    if not has_bounded_tardiness(no_cpus, tasks):
        return None

    # Compute utilization ceiling
    util_ceil = int(ceil(tasks.utilization_q()))
    # First uniformly reduce scheduler priority points to derive analysis
    # priority points.  Due to uniform reduction, does not change scheduling
    # decisions.  Shown in EA'12 to improve bounds.
    sched_pps = [task.prio_pt for task in tasks]
    min_priority_point = min(sched_pps)
    analysis_pps = [sched_pps[i] - min_priority_point
                    for i in range(len(sched_pps))]

    #Initialize S_i and Y_ints
    S_i = [Fraction(0)]*len(tasks)
    Y_ints = [Fraction(0)]*len(tasks)
    utilizations = [Fraction(task.cost, task.period) for task in tasks];
    # Calculate S_i and y-intercept (for G(\vec{x}) contributors)
    for i, task in enumerate(tasks):
        S_i[i] = max(Fraction(0), Fraction(task.cost) * (Fraction(1)
                     - Fraction(analysis_pps[i], task.period)))
        Y_ints[i] = (Fraction(0) - Fraction(task.cost, no_cpus)) * \
                     utilizations[i] + Fraction(task.cost) - S_i[i]

    if rounds == 0:
        s = compute_exact_s(no_cpus, tasks, util_ceil, sum(S_i), Y_ints,
                            utilizations)
    else:
        s = compute_binsearch_s(no_cpus, tasks, util_ceil, sum(S_i), Y_ints,
                                utilizations, rounds)

    details = AnalysisDetails(tasks)
    details.bounds = [int(ceil(s - Fraction(tasks[i].cost, no_cpus) + 
                      Fraction(tasks[i].cost) + Fraction(analysis_pps[i])))
                      for i in range(len(tasks))]
    details.S_i = S_i
    details.G_i = [Y_ints[i] + s * utilizations[i]
                   for i in range(len(tasks))]
    return details

def compute_exact_s(no_cpus, tasks, util_ceil, S, Y_ints, utilizations):
    replacements = []
    for i, task1 in enumerate(tasks):
        for j in range(i+1, len(tasks)):
            task2 = tasks[j]
            # Parallel lines do not intersect, and choice of identical lines
            # doesn't matter.  Ignore all such cases.
            if utilizations[i] != utilizations[j]:
                intersect = Fraction(Y_ints[j] - Y_ints[i],
                                     utilizations[i] - utilizations[j])
                if intersect >= 0:
                    if utilizations[i] < utilizations[j]:
                        replacements.append( (intersect, i, j) )
                    else:
                        replacements.append( (intersect, j, i) )
    # Break ties by order of increasing utilization of replaced tasks.  Avoids
    # an edge case.  Consider tasks A, B, C, in order of decreasing
    # utilization.  If the top m-1 items include tasks B and C, it is possible
    # (without the tie break) to have the following replacements:
    # C->B (no change)
    # B->A (now A and C in set considered)
    # C->A (no change)
    #
    # However, proper behavior should include A and B in considered set.  The
    # tie break avoids this case.
    replacements.sort(key=lambda r: (r[0], utilizations[r[1]]))

    # The piecewise linear function we are tracing is G(s) + S(\tau) - ms, as
    # discussed (with L(s) in place of G(s)) in EGB'10.  We start at s = 0 and
    # trace its value each time a slope changes, hoping for a value of zero.

    # While tracing piecewise linear function, keep track of whether each task
    # contributes to G(\vec{x}).  Array implementation allows O(1) lookups and
    # changes.
    task_pres = [False]*len(tasks)
    
    # Initial value and slope.
    current_value = S
    current_slope = Fraction(-1 * no_cpus)

    init_pairs = heapq.nlargest(util_ceil - 1, enumerate(Y_ints),
                                key=lambda p: p[1])

    # For s = 0, just use Y-intercepts to determine what is present.
    for pair in init_pairs:
        task_pres[pair[0]] = True
        current_value += pair[1]
        current_slope += utilizations[pair[0]]

    # Index of the next replacement
    rindex = 0
    next_s = Fraction(0)
    zero = next_s + Fraction(1)
    while zero > next_s:
        current_s = next_s
        zero = current_s - Fraction(current_value, current_slope)
        if rindex < len(replacements):
            replacement = replacements[rindex]
            next_s = replacement[0]
            current_value += (next_s - current_s) * current_slope
            # Apply replacement, if appropriate.
            if task_pres[replacement[1]] and not task_pres[replacement[2]]:
                task_pres[replacement[1]] = False
                current_slope -= utilizations[replacement[1]]
                task_pres[replacement[2]] = True
                current_slope += utilizations[replacement[2]]
            rindex += 1
        else:
            next_s = zero + 1
    return zero

def compute_binsearch_s(no_cpus, tasks, util_ceil, S, Y_ints, utilizations,
                        rounds):
    def M(s):
        Gvals = heapq.nlargest(util_ceil - 1,
                               [Y_ints[i] + utilizations[i] * s
                                for i in range(len(tasks))])
        return sum(Gvals) + S - Fraction(no_cpus) * s

    min_s = Fraction(0)
    max_s = Fraction(1)
    while M(max_s) > 0:
        min_s = max_s
        max_s *= Fraction(2)

    for i in range(rounds):
        middle = Fraction(max_s + min_s, 2)
        if M(middle) < 0:
            max_s = middle
        else:
            min_s = middle

    #max_s is guaranteed to be a legal bound.
    return max_s

def has_bounded_tardiness(no_cpus, tasks):
    return tasks.utilization() <= no_cpus and \
        forall(tasks)(lambda t: t.period >= t.cost)

def bound_gedf_response_times(no_cpus, tasks, rounds):
    response_details = compute_gedf_response_details(no_cpus, tasks, rounds)

    return bound_response_times(tasks, response_details)

def bound_gfl_response_times(no_cpus, tasks, rounds):
    response_details = compute_gfl_response_details(no_cpus, tasks, rounds)

    return bound_response_times(tasks, response_details)

def bound_response_times(tasks, response_details):
    if response_details is None:
        return False

    else:
        for i in range(len(tasks)):
            tasks[i].response_time = response_details.bounds[i]
        return True