aboutsummaryrefslogtreecommitdiffstats
path: root/schedcat/sched/edf/__init__.py
blob: 1f0e3a1eccc24cd8fee1481cc6cff02ce9719e36 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""EDF hard and soft schedulability tests,
for uni- and multiprocessors.
"""
from __future__ import division

from .gfb import is_schedulable as is_schedulable_gfb

from .gfb import is_schedulable as gfb_test
from .bak import is_schedulable as bak_test
from .bar import is_schedulable as bar_test
from .bcl import is_schedulable as bcl_test
from .bcl_iterative import is_schedulable as bcli_test
from .rta import is_schedulable as rta_test
from .ffdbf import is_schedulable as ffdbf_test

from .da import bound_response_times as da_tardiness_bounds
from .rta import bound_response_times as rta_response_times

from schedcat.util.quantor import forall

# hard real-time tests
HRT_TESTS = {
    'GFB'    : gfb_test,
    'BAK'    : bak_test,
    'BAR'    : bar_test,
    'BCL'    : bcl_test,
    'BCLI'   : bcli_test,
    'RTA'    : rta_test,
    'FF-DBF' : ffdbf_test,
    }

# A somewhat arbitrary heuristic to curb pseudo-polynomial runtimes...
def should_use_baruah_test(threshold, taskset, no_cpus):
    if threshold is True:
        return True
    elif threshold is False:
        return False
    else:
        slack = no_cpus - taskset.utilization()
        if not slack:
            # can't apply test for zero slack; avoid division by zero
            return False
        n     = len(taskset)
        score = n * (no_cpus * no_cpus) / (slack * slack)
        return score <= threshold

# all (pure Python implementation)
def is_schedulable_py(no_cpus, tasks,
                      rta_min_step=1,
                      want_baruah=3000,
                      want_rta=True,
                      want_ffdbf=False,
                      want_load=False):
    if tasks.utilization() > no_cpus or \
        not forall(tasks)(lambda t: t.period >= t.cost):
        # trivially infeasible
        return False
    else:
        not_arbitrary = tasks.only_constrained_deadlines()
        if no_cpus == 1 and tasks.density() <= 1:
            # simply uniprocessor density condition
            return True
        elif no_cpus > 1:
            # Baker's test can handle arbitrary deadlines.
            if bak_test(no_cpus, tasks):
                return True
            # The other tests cannot.
            if not_arbitrary:
                # The density test is cheap, try it first.
                if gfb_test(no_cpus, tasks):
                    return True
                # Ok, try the slower ones.
                if should_use_baruah_test(want_baruah, tasks, no_cpus) and \
                    bar_test(no_cpus, tasks):
                    return True
                if want_rta and \
                    rta_test(no_cpus, tasks,
                             min_fixpoint_step=rta_min_step):
                    return True
                # FF-DBF is almost always too slow.
                if want_ffdbf and ffdbf_test(no_cpus, tasks):
                    return True
    # If we get here, none of the tests passed.
    return False

import schedcat.sched

if schedcat.sched.using_native:
    import schedcat.sched.native as native

    def is_schedulable_cpp(no_cpus, tasks,
                           rta_min_step=1,
                           want_baruah=True,
                           want_rta=True,
                           want_ffdbf=False,
                           want_load=False):
        if no_cpus == 1:
            native_test = native.QPATest(no_cpus);
        else:
            native_test = native.GlobalEDF(no_cpus, rta_min_step,
                                           want_baruah != False,
                                           want_rta,
                                           want_ffdbf,
                                           want_load)
        ts = schedcat.sched.get_native_taskset(tasks)
        return native_test.is_schedulable(ts)

    is_schedulable = is_schedulable_cpp

else:
    is_schedulable = is_schedulable_py


def bound_response_times(no_cpus, tasks, *args, **kargs):
    if is_schedulable(no_cpus, tasks, *args, **kargs):
        # HRT schedualble => no tardiness
        # See if we can get RTA to provide a good response time bound.
        rta_step = kargs['rta_min_step'] if 'rta_min_step' in kargs else 0
        if rta_response_times(no_cpus, tasks, min_fixpoint_step=rta_step):
            # Great, we got RTA estimates.
            return True
        else:
            # RTA failed, use conservative bounds.
            for t in tasks:
                t.response_time = t.deadline
            return True
    # Not HRT schedulable, use SRT analysis.
    elif da_tardiness_bounds(no_cpus, tasks):
        return True
    else:
        # Could not find a safe response time bound for each task.
        return False