1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
"""EDF hard and soft schedulability tests,
for uni- and multiprocessors.
"""
from __future__ import division
from .gfb import is_schedulable as is_schedulable_gfb
from .gfb import is_schedulable as gfb_test
from .bak import is_schedulable as bak_test
from .bar import is_schedulable as bar_test
from .bcl import is_schedulable as bcl_test
from .bcl_iterative import is_schedulable as bcli_test
from .rta import is_schedulable as rta_test
from .ffdbf import is_schedulable as ffdbf_test
from .da import bound_response_times as da_tardiness_bounds
from .rta import bound_response_times as rta_response_times
from schedcat.util.quantor import forall
# hard real-time tests
HRT_TESTS = {
'GFB' : gfb_test,
'BAK' : bak_test,
'BAR' : bar_test,
'BCL' : bcl_test,
'BCLI' : bcli_test,
'RTA' : rta_test,
'FF-DBF' : ffdbf_test,
}
# A somewhat arbitrary heuristic to curb pseudo-polynomial runtimes...
def should_use_baruah_test(threshold, taskset, no_cpus):
if threshold is True:
return True
elif threshold is False:
return False
else:
slack = no_cpus - taskset.utilization()
if not slack:
# can't apply test for zero slack; avoid division by zero
return False
n = len(taskset)
score = n * (no_cpus * no_cpus) / (slack * slack)
return score <= threshold
# all (pure Python implementation)
def is_schedulable_py(no_cpus, tasks,
rta_min_step=1,
want_baruah=3000,
want_rta=True,
want_ffdbf=False,
want_load=False):
if tasks.utilization() > no_cpus or \
not forall(tasks)(lambda t: t.period >= t.cost):
# trivially infeasible
return False
else:
not_arbitrary = tasks.only_constrained_deadlines()
if no_cpus == 1 and tasks.density() <= 1:
# simply uniprocessor density condition
return True
elif no_cpus > 1:
# Baker's test can handle arbitrary deadlines.
if bak_test(no_cpus, tasks):
return True
# The other tests cannot.
if not_arbitrary:
# The density test is cheap, try it first.
if gfb_test(no_cpus, tasks):
return True
# Ok, try the slower ones.
if should_use_baruah_test(want_baruah, tasks, no_cpus) and \
bar_test(no_cpus, tasks):
return True
if want_rta and \
rta_test(no_cpus, tasks,
min_fixpoint_step=rta_min_step):
return True
# FF-DBF is almost always too slow.
if want_ffdbf and ffdbf_test(no_cpus, tasks):
return True
# If we get here, none of the tests passed.
return False
import schedcat.sched
if schedcat.sched.using_native:
import schedcat.sched.native as native
def is_schedulable_cpp(no_cpus, tasks,
rta_min_step=1,
want_baruah=True,
want_rta=True,
want_ffdbf=False,
want_load=False):
if no_cpus == 1:
native_test = native.QPATest(no_cpus);
else:
native_test = native.GlobalEDF(no_cpus, rta_min_step,
want_baruah != False,
want_rta,
want_ffdbf,
want_load)
ts = schedcat.sched.get_native_taskset(tasks)
return native_test.is_schedulable(ts)
is_schedulable = is_schedulable_cpp
else:
is_schedulable = is_schedulable_py
def bound_response_times(no_cpus, tasks, *args, **kargs):
if is_schedulable(no_cpus, tasks, *args, **kargs):
# HRT schedualble => no tardiness
# See if we can get RTA to provide a good response time bound.
rta_step = kargs['rta_min_step'] if 'rta_min_step' in kargs else 0
if rta_response_times(no_cpus, tasks, min_fixpoint_step=rta_step):
# Great, we got RTA estimates.
return True
else:
# RTA failed, use conservative bounds.
for t in tasks:
t.response_time = t.deadline
return True
# Not HRT schedulable, use SRT analysis.
elif da_tardiness_bounds(no_cpus, tasks):
return True
else:
# Could not find a safe response time bound for each task.
return False
|