1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
|
from collections import defaultdict
from .common import var_direct, var_indirect, var_preempt, add_var_mutex_constraints
from .common import set_blocking_objective, add_topology_constraints, enumerate_requests, find_per_cluster_resources
from .common import find_per_resource_tasks
from schedcat.util.linprog import LinearProgram
def find_prio_ceilings(taskset, resource_mapping):
per_resource_tasks = find_per_resource_tasks(taskset)
prio_ceilings = {}
for res_id in resource_mapping:
prio_ceilings[res_id] = len(taskset) + 1
for res_id in resource_mapping:
for t in per_resource_tasks[res_id]:
if t.id < prio_ceilings[res_id]:
prio_ceilings[res_id] = t.id
return prio_ceilings
def add_independent_cluster_constraints(resource_mapping, tasks, ti, linprog):
"""Constraint 8: no direct or indirect blocking due to resources
on clusters that ti doesn't even access.
"""
accessed_clusters = set((resource_mapping[res_id] for res_id in ti.resmodel))
for tx in tasks.without(ti):
for res_id_q in tx.resmodel:
c = resource_mapping[res_id_q]
if c not in accessed_clusters:
# cannot block us
vector = []
for req_num in enumerate_requests(tx, ti, res_id_q):
xd = var_direct(tx.id, res_id_q, req_num)
xi = var_indirect(tx.id, res_id_q, req_num)
vector += [ (1, xd), (1, xi) ]
# total contribution == 0
linprog.add_equality(vector, 0)
def bound_max_waiting_time(resource_mapping, prio_ceilings,
taskset, ti, requested_id):
req_cluster = resource_mapping[requested_id]
own_length = ti.resmodel[requested_id].max_length
assert ti.resmodel[requested_id].max_requests > 0
# compute maximum lower-priority request length
delay_by_lower = 0
for tx in taskset.with_lower_priority_than(ti):
for res_id in tx.resmodel:
ceiling = prio_ceilings[res_id]
if req_cluster == resource_mapping[res_id] and ceiling <= ti.id:
# is on the same processor and can block
length = tx.resmodel[res_id].max_length
# lower-priority => block once
delay_by_lower = max(delay_by_lower, length)
# start with own request length
wait_time = own_length
while True:
delay_by_higher = 0
for tx in taskset.with_higher_priority_than(ti):
tx_jobs = tx.maxjobs(wait_time)
for res_id in tx.resmodel:
if req_cluster == resource_mapping[res_id]:
# is on the same processor
length = tx.resmodel[res_id].max_length
# higher-priority => block repeatedly
num = tx.resmodel[res_id].max_requests
delay_by_higher += tx_jobs * num * length
new_estimate = own_length + delay_by_lower + delay_by_higher
if new_estimate == wait_time:
return wait_time
elif new_estimate > ti.response_time:
return False
else:
wait_time = new_estimate
class LazyWaitBounds(object):
"""Only compute wait bounds on demand, cache results."""
def __init__(self, *fixed_args):
self.fixed_args = list(fixed_args)
self.wait_bounds = {}
def __getitem__(self, res_id):
if not res_id in self.wait_bounds:
args = self.fixed_args + [res_id]
self.wait_bounds[res_id] = bound_max_waiting_time(*args)
return self.wait_bounds[res_id]
def add_max_wait_time_constraints(resource_mapping, tasks, ti, linprog,
prio_ceilings=None,
per_cluster_resources=None):
"""Constraint 8: Ti's maximum wait times limit the maximum number of times
that higher-priority tasks can directly and indirectly delay Ti.
"""
if prio_ceilings is None:
prio_ceilings = find_prio_ceilings(tasks, resource_mapping)
# First compute the maximum wait times for each resource.
# This is each W_{i,q} in the paper.
# We do this lazily because we don't need the response times if
# there are not higher-priority tasks with conflicting requests.
wait_bounds = LazyWaitBounds(resource_mapping, prio_ceilings, tasks, ti)
# Second, figure out which resources are on the same processor.
if per_cluster_resources is None:
per_cluster_resources = find_per_cluster_resources(resource_mapping, tasks)
# Apply Lemma 15.
for tx in tasks.with_higher_priority_than(ti):
for res_id_q in tx.resmodel:
# sum up right-hand side
c = resource_mapping[res_id_q]
num_per_job = tx.resmodel[res_id_q].max_requests
bound = 0
unbounded = False
# everything accessed by Ti in the same cluster as res_id_q
for res_id_y in per_cluster_resources[c]:
if res_id_y in ti.resmodel:
max_wait_time = wait_bounds[res_id_y]
if max_wait_time is False:
# oops, did not converge => can't derive an upper bound
unbounded = True
break
tx_jobs = tx.maxjobs(max_wait_time)
ti_reqs = ti.resmodel[res_id_y].max_requests
bound += num_per_job * tx_jobs * ti_reqs
if not unbounded:
# sum up left-hand side
vector = []
for req_num in enumerate_requests(tx, ti, res_id_q):
xd = var_direct(tx.id, res_id_q, req_num)
xi = var_indirect(tx.id, res_id_q, req_num)
vector += [ (1, xd), (1, xi) ]
# add constraint
linprog.add_inequality(vector, bound)
def add_conflict_set_constraints(taskset, ti, linprog,
resource_mapping, prio_ceilings=None):
"""Constraint 6: Requests for resources with priority ceilings lower than
ti's priority cannot delay ti directly or indirectly.
"""
if prio_ceilings is None:
prio_ceilings = find_prio_ceilings(taskset, resource_mapping)
for tx in taskset.without(ti):
for res_id in tx.resmodel:
ceiling = prio_ceilings[res_id]
# assumption: lower id == higher priority
if ceiling > ti.id:
# lower priority ceiling, cannot block (in)directly
for req_num in enumerate_requests(tx, ti, res_id):
xd = var_direct(tx.id, res_id, req_num)
xi = var_indirect(tx.id, res_id, req_num)
linprog.equality(1, xd, 1, xi, equal_to=0)
def add_atmostonce_lower_prio_constraints(tasks, ti, linprog, resource_mapping,
prio_ceilings=None, res_per_cluster=None):
"""Constraint 7: Each request can be directly delayed at most once by a
lower-priority task.
"""
if res_per_cluster is None:
res_per_cluster = find_per_cluster_resources(resource_mapping)
if prio_ceilings is None:
prio_ceilings = find_prio_ceilings(tasks, resource_mapping)
clusters = set(resource_mapping.values())
for c in clusters:
LHS = []
for tx in tasks.with_lower_priority_than(ti):
for res_id in tx.resmodel:
ceiling = prio_ceilings[res_id]
# assumption: lower id == higher priority
if ceiling <= ti.id:
#for all v
for req_num in enumerate_requests(tx, ti, res_id):
LHS.append((1, var_direct(tx.id, res_id, req_num)))
if LHS:
res_on_c = res_per_cluster[c]
req_to_c = sum((ti.resmodel[r].max_requests for r in ti.resmodel
if r in res_on_c))
linprog.add_inequality(LHS, req_to_c)
def get_lp_for_task(resource_locality,
taskset, ti, use_rta=True):
lp = LinearProgram()
set_blocking_objective(resource_locality, taskset, ti, lp)
# Constraint 1
add_var_mutex_constraints(taskset, ti, lp)
# Constraint 2
add_topology_constraints(resource_locality, taskset, ti, lp)
# Constraint 6
add_conflict_set_constraints(taskset, ti, lp, resource_locality)
# Constraint 7
add_atmostonce_lower_prio_constraints(taskset, ti, lp, resource_locality)
if use_rta:
# Constraint 8
add_max_wait_time_constraints(resource_locality, taskset, ti, lp)
else:
add_independent_cluster_constraints(resource_locality, taskset, ti, lp)
return lp
|