1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
|
#!/usr/bin/env python
import xml.etree.ElementTree as ET
from .tasks import TaskSystem, SporadicTask
from .resources import ResourceRequirement, ResourceRequirements
from schedcat.util.storage import storage
EPSILON = 10**-7
def maybe_int(x):
"Try to interpret x as an integer. Convert from string, if necessary."
if type(x) == float and abs(x) % 1 <= EPSILON:
return int(x)
elif type(x) == str:
try:
return maybe_int(float(x))
except ValueError:
return x
else:
return x
def set_attribute(tag, attr_name, obj, field_name=None):
"Set XML attributes based on obj attributes that might not exist."
if field_name is None:
field_name = attr_name
if field_name in obj.__dict__:
tag.set(attr_name, str(obj.__dict__[field_name]))
def subtag_for_attribute(tag, obj, field_name, tag_name=None):
if tag_name is None:
tag_name = field_name
if field_name in obj.__dict__:
return ET.SubElement(tag, tag_name)
else:
return None
def res_requirement(r, rmodel=None):
if rmodel is None:
tag = ET.Element('requirement')
else:
tag = ET.SubElement(rmodel, 'requirement')
set_attribute(tag, 'res_id', r)
set_attribute(tag, 'max_reads', r)
set_attribute(tag, 'max_writes', r)
set_attribute(tag, 'max_read_length', r)
set_attribute(tag, 'max_write_length', r)
return tag
def task(t):
tag = ET.Element('task')
if not t.id is None:
set_attribute(tag, 'id', t)
set_attribute(tag, 'period', t)
set_attribute(tag, 'wcet', t, 'cost')
if not t.implicit_deadline():
set_attribute(tag, 'deadline', t)
set_attribute(tag, 'partition', t)
set_attribute(tag, 'response_time', t)
set_attribute(tag, 'wss', t)
rmodel = subtag_for_attribute(tag, t, 'resmodel', 'resources')
if not rmodel is None:
for res_id in t.resmodel:
res_requirement(t.resmodel[res_id], rmodel)
tag.task = t
task.xml = tag
return tag
def parse_request(req_node):
return ResourceRequirement(
maybe_int(req_node.get('res_id', 0)),
int(req_node.get('max_writes', 1)),
int(req_node.get('max_write_length', 1)),
int(req_node.get('max_reads', 0)),
int(req_node.get('max_read_length', 0)),
)
def parse_resmodel(node):
resmodel = node.find('resources')
if resmodel != None:
reqs = ResourceRequirements()
for req in [parse_request(n) for n in resmodel.findall('requirement')]:
reqs[req.res_id] = req
return reqs
else:
return None
def get_attribute(node, attr_name, obj, field_name=None, convert=lambda _: _):
if field_name is None:
field_name = attr_name
x = node.get(attr_name, None)
if not x is None:
obj.__dict__[field_name] = convert(x)
return True
else:
return False
def parse_task(node):
cost = maybe_int(node.get('wcet'))
period = maybe_int(node.get('period'))
t = SporadicTask(cost, period)
get_attribute(node, 'deadline', t, convert=maybe_int)
get_attribute(node, 'id', t, convert=maybe_int)
get_attribute(node, 'partition', t, convert=maybe_int)
get_attribute(node, 'wss', t, convert=maybe_int)
resmodel = parse_resmodel(node)
if not resmodel is None:
t.resmodel = resmodel
t.xml = node
node.task = t
return t
def taskset(ts):
tag = ET.Element('taskset')
prop = ET.SubElement(tag, 'properties')
prop.set('utilization', str(ts.utilization()))
prop.set('utilization_q', str(ts.utilization_q()))
prop.set('density_q', str(ts.density_q()))
prop.set('density', str(ts.density()))
prop.set('count', str(len(ts)))
hp = ts.hyperperiod()
if hp:
prop.set('hyperperiod', str(hp))
for t in ts:
tag.append(task(t))
return tag
def testpoint(tasksets, params):
tag = ET.Element('testpoint')
config = ET.SubElement(tag, 'config')
for k in params:
config.set(k, str(params[k]))
for ts in tasksets:
tag.append(taskset(ts))
return tag
def parse_taskset(node):
tasks = [parse_task(n) for n in node.findall('task')]
return TaskSystem(tasks)
def parse_testpoint(node):
params = {}
config = node.find('config')
if not config is None:
for k in config.keys():
params[k] = maybe_int(config.get(k))
tss = [parse_taskset(n) for n in node.findall('taskset')]
return (params, tss)
def write_xml(xml, fname):
tree = ET.ElementTree(xml)
tree.write(fname)
def write_testpoint(tasksets, params, fname):
xml = testpoint(tasksets, params)
write_xml(xml, fname)
def write(ts, fname):
xml = taskset(ts)
write_xml(xml, fname)
def load(file):
tree = ET.ElementTree()
tree.parse(file)
root = tree.getroot()
if root.tag == 'taskset':
ts = parse_taskset(root)
ts.xml = tree.getroot()
return ts
elif root.tag == 'testpoint':
params, tss = parse_testpoint(root)
return storage(xml=root, params=params, tasksets=tss)
elif root.tag == 'task':
t = parse_task(root)
return t
else:
return None
|