diff options
Diffstat (limited to 'parse/sched.py')
-rw-r--r-- | parse/sched.py | 445 |
1 files changed, 161 insertions, 284 deletions
diff --git a/parse/sched.py b/parse/sched.py index 3e30880..ffc6224 100644 --- a/parse/sched.py +++ b/parse/sched.py | |||
@@ -1,306 +1,183 @@ | |||
1 | """ | ||
2 | TODO: No longer very pythonic, lot of duplicate code | ||
3 | print out task execution times or something | ||
4 | get miss ratio and tardiness directly from schedule OR | ||
5 | email list about turning on optional summary statistics OR | ||
6 | set up run exps to only get release and completions to get these stats | ||
7 | """ | ||
8 | |||
9 | import config.config as conf | 1 | import config.config as conf |
10 | import os | 2 | import os |
11 | import re | 3 | import re |
12 | import numpy as np | 4 | import struct |
13 | import subprocess | 5 | import subprocess |
14 | 6 | ||
15 | from collections import namedtuple,defaultdict | 7 | from collections import defaultdict,namedtuple |
16 | from operator import methodcaller | 8 | from common import recordtype |
17 | from point import Measurement,Type | 9 | from point import Measurement |
18 | |||
19 | PARAM_RECORD = r"(?P<RECORD>" +\ | ||
20 | r"PARAM *?(?P<PID>\d+)\/.*?" +\ | ||
21 | r"cost.*?(?P<WCET>[\d\.]+)ms.*?" +\ | ||
22 | r"period.*?(?P<PERIOD>[\d.]+)ms.*?" +\ | ||
23 | r"part.*?(?P<CPU>\d+)[, ]*" +\ | ||
24 | r"(?:class=(?P<CLASS>\w+))?[, ]*" +\ | ||
25 | r"(?:level=(?P<LEVEL>\w+))?).*$" | ||
26 | EXIT_RECORD = r"(?P<RECORD>" +\ | ||
27 | r"TASK_EXIT *?(?P<PID>\d+)/.*?" +\ | ||
28 | r"Avg.*?(?P<AVG>\d+).*?" +\ | ||
29 | r"Max.*?(?P<MAX>\d+))" | ||
30 | TARDY_RECORD = r"(?P<RECORD>" +\ | ||
31 | r"TASK_TARDY.*?(?P<PID>\d+)/(?P<JOB>\d+).*?" +\ | ||
32 | r"Tot.*?(?P<TOTAL>[\d\.]+).*?ms.*?" +\ | ||
33 | r"(?P<MAX>[\d\.]+).*?ms.*?" +\ | ||
34 | r"(?P<MISSES>[\d\.]+))" | ||
35 | COMPLETION_RECORD = r"(?P<RECORD>" +\ | ||
36 | r"COMPLETION.*?(?P<PID>\d+)/.*?" +\ | ||
37 | r"exec:.*?(?P<EXEC>[\d\.]+)ms.*?" +\ | ||
38 | r"flush:.*?(?P<FLUSH>[\d\.]+)ms.*?" +\ | ||
39 | r"flush_work:.*?(?P<FLUSH_WORK>[\d]+).*?" +\ | ||
40 | r"load:.*?(?P<LOAD>[\d\.]+)ms.*?" +\ | ||
41 | r"load_work:.*?(?P<LOAD_WORK>[\d]+))" | ||
42 | |||
43 | TaskConfig = namedtuple('TaskConfig', ['cpu','wcet','period','type','level']) | ||
44 | Task = namedtuple('Task', ['pid', 'config', 'run']) | ||
45 | |||
46 | class LeveledArray(object): | ||
47 | """ | ||
48 | Groups statistics by the level of the task to which they apply | ||
49 | """ | ||
50 | def __init__(self, name): | ||
51 | self.name = name | ||
52 | self.vals = defaultdict(lambda:[]) | ||
53 | |||
54 | def add(self, task, value): | ||
55 | self.vals[task.config.level] += [value] | ||
56 | |||
57 | |||
58 | def write_measurements(self, result): | ||
59 | for level, arr in self.vals.iteritems(): | ||
60 | name = "%s%s" % ("%s-" % level if level else "", self.name) | ||
61 | result[name] = Measurement(name).from_array(arr) | ||
62 | |||
63 | def get_st_output(data_dir, out_dir, force=False): | ||
64 | """ | ||
65 | Create and return files containing unpacked sched data | ||
66 | """ | ||
67 | bin_files = conf.FILES['sched_data'].format(".*") | ||
68 | bins = [f for f in os.listdir(data_dir) if re.match(bin_files, f)] | ||
69 | 10 | ||
70 | output_file = "%s/out-st" % out_dir | 11 | class TimeTracker: |
71 | 12 | '''Store stats for durations of time demarcated by sched_trace records.''' | |
72 | if os.path.isfile(output_file): | 13 | def __init__(self): |
73 | if force: | 14 | self.begin = self.avg = self.max = self.num = self.job = 0 |
74 | os.remove(output_file) | ||
75 | else: | ||
76 | return output_file | ||
77 | |||
78 | if len(bins) != 0: | ||
79 | cmd_arr = [conf.BINS['st_show']] | ||
80 | cmd_arr.extend(bins) | ||
81 | with open(output_file, "w") as f: | ||
82 | subprocess.call(cmd_arr, cwd=data_dir, stdout=f) | ||
83 | else: | ||
84 | return None | ||
85 | return output_file | ||
86 | |||
87 | def get_tasks(data): | ||
88 | ret = [] | ||
89 | for match in re.finditer(PARAM_RECORD, data, re.M): | ||
90 | try: | ||
91 | t = Task( int(match.group('PID')), | ||
92 | TaskConfig( int(match.group('CPU')), | ||
93 | float(match.group('WCET')), | ||
94 | float(match.group('PERIOD')), | ||
95 | match.group("CLASS"), | ||
96 | match.group("LEVEL")), []) | ||
97 | if not (t.config.period and t.pid): | ||
98 | raise Exception() | ||
99 | ret += [t] | ||
100 | except Exception as e: | ||
101 | raise Exception("Invalid task record: %s\nparsed:\n\t%s\n\t%s" % | ||
102 | (e, match.groupdict(), match.group('RECORD'))) | ||
103 | return ret | ||
104 | |||
105 | def get_task_dict(data): | ||
106 | tasks_list = get_tasks(data) | ||
107 | tasks_dict = {} | ||
108 | for t in tasks_list: | ||
109 | tasks_dict[t.pid] = t | ||
110 | return tasks_dict | ||
111 | |||
112 | def get_task_exits(data): | ||
113 | ret = [] | ||
114 | for match in re.finditer(EXIT_RECORD, data): | ||
115 | try: | ||
116 | m = Measurement( int(match.group('PID')), | ||
117 | {Type.Max : float(match.group('MAX')), | ||
118 | Type.Avg : float(match.group('AVG'))}) | ||
119 | except: | ||
120 | raise Exception("Invalid exit record, parsed:\n\t%s\n\t%s" % | ||
121 | (match.groupdict(), match.group('RECORD'))) | ||
122 | 15 | ||
123 | ret += [m] | 16 | def store_time(self, record): |
124 | return ret | 17 | '''End duration of time.''' |
18 | dur = record.when - self.begin | ||
125 | 19 | ||
20 | if self.job == record.job and dur > 0: | ||
21 | self.max = max(self.max, dur) | ||
22 | self.avg *= float(self.num / (self.num + 1)) | ||
23 | self.num += 1 | ||
24 | self.avg += dur / float(self.num) | ||
126 | 25 | ||
127 | def extract_tardy_vals(task_dict, data, exp_point): | 26 | self.begin = 0 |
128 | ratios = LeveledArray("miss-ratio") | 27 | self.job = 0 |
129 | avg_tards = LeveledArray("avg-rel-tardiness") | 28 | |
130 | max_tards = LeveledArray("max-rel-tardiness") | 29 | def start_time(self, record): |
30 | '''Start duration of time.''' | ||
31 | self.begin = record.when | ||
32 | self.job = record.job | ||
33 | |||
34 | # Data stored for each task | ||
35 | TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu']) | ||
36 | TaskData = recordtype('TaskData', ['params', 'jobs', 'blocks', 'misses']) | ||
37 | |||
38 | # Map of event ids to corresponding class, binary format, and processing methods | ||
39 | RecordInfo = namedtuple('RecordInfo', ['clazz', 'fmt', 'method']) | ||
40 | record_map = [0]*10 | ||
41 | |||
42 | # Common to all records | ||
43 | HEADER_FORMAT = '<bbhi' | ||
44 | HEADER_FIELDS = ['type', 'cpu', 'pid', 'job'] | ||
45 | RECORD_SIZE = 24 | ||
46 | |||
47 | NSEC_PER_MSEC = 1000000 | ||
48 | |||
49 | def register_record(name, id, method, fmt, fields): | ||
50 | '''Create record description from @fmt and @fields and map to @id, using | ||
51 | @method to process parsed record.''' | ||
52 | # Format of binary data (see python struct documentation) | ||
53 | rec_fmt = HEADER_FORMAT + fmt | ||
54 | |||
55 | # Corresponding field data | ||
56 | rec_fields = HEADER_FIELDS + fields | ||
57 | if "when" not in rec_fields: # Force a "when" field for everything | ||
58 | rec_fields += ["when"] | ||
59 | |||
60 | # Create mutable class with the given fields | ||
61 | field_class = recordtype(name, list(rec_fields)) | ||
62 | clazz = type(name, (field_class, object), {}) | ||
63 | |||
64 | record_map[id] = RecordInfo(clazz, rec_fmt, method) | ||
65 | |||
66 | def make_iterator(fname): | ||
67 | '''Iterate over (parsed record, processing method) in a | ||
68 | sched-trace file.''' | ||
69 | f = open(fname, 'rb') | ||
70 | max_type = len(record_map) | ||
71 | |||
72 | while True: | ||
73 | data = f.read(RECORD_SIZE) | ||
131 | 74 | ||
132 | for match in re.finditer(TARDY_RECORD, data): | ||
133 | try: | ||
134 | pid = int(match.group("PID")) | ||
135 | jobs = int(match.group("JOB")) | ||
136 | misses = int(match.group("MISSES")) | ||
137 | total_tard = float(match.group("TOTAL")) | ||
138 | max_tard = float(match.group("MAX")) | ||
139 | |||
140 | if not (jobs and pid): raise Exception() | ||
141 | except: | ||
142 | raise Exception("Invalid tardy record:\n\t%s\n\t%s" % | ||
143 | (match.groupdict(), match.group("RECORD"))) | ||
144 | |||
145 | if pid not in task_dict: | ||
146 | raise Exception("Invalid pid '%d' in tardy record:\n\t%s" % | ||
147 | (pid, match.group("RECORD"))) | ||
148 | |||
149 | t = task_dict[pid] | ||
150 | avg_tards.add(t, total_tard / (jobs * t.config.period)) | ||
151 | max_tards.add(t, max_tard / t.config.period) | ||
152 | ratios.add(t, misses / jobs) | ||
153 | |||
154 | map(methodcaller('write_measurements', exp_point), | ||
155 | [ratios, avg_tards, max_tards]) | ||
156 | |||
157 | # TODO: rename | ||
158 | def extract_variance(task_dict, data, exp_point): | ||
159 | varz = LeveledArray("exec-variance") | ||
160 | flushes = LeveledArray("cache-flush") | ||
161 | loads = LeveledArray("cache-load") | ||
162 | fworks = LeveledArray("flush-work") | ||
163 | lworks = LeveledArray("load-work") | ||
164 | |||
165 | completions = defaultdict(lambda: []) | ||
166 | missed = defaultdict(lambda: int()) | ||
167 | |||
168 | for match in re.finditer(COMPLETION_RECORD, data): | ||
169 | try: | 75 | try: |
170 | pid = int(match.group("PID")) | 76 | type_num = struct.unpack_from('b',data)[0] |
171 | duration = float(match.group("EXEC")) | 77 | except struct.error: |
172 | load = float(match.group("LOAD")) | 78 | break |
173 | flush = float(match.group("FLUSH")) | 79 | |
174 | lwork = int(match.group("LOAD_WORK")) | 80 | rdata = record_map[type_num] if type_num <= max_type else 0 |
175 | fwork = int(match.group("FLUSH_WORK")) | 81 | if not rdata: |
176 | |||
177 | if load: | ||
178 | loads.add(task_dict[pid], load) | ||
179 | lworks.add(task_dict[pid], lwork) | ||
180 | if not lwork: raise Exception() | ||
181 | if flush: | ||
182 | flushes.add(task_dict[pid], flush) | ||
183 | fworks.add(task_dict[pid], fwork) | ||
184 | if not fwork: raise Exception() | ||
185 | |||
186 | # Last (exit) record often has exec time of 0 | ||
187 | missed[pid] += not bool(duration) | ||
188 | |||
189 | if missed[pid] > 1 or not pid: #TODO: fix, raise Exception() | ||
190 | continue | ||
191 | except: | ||
192 | raise Exception("Invalid completion record, missed: %d:" | ||
193 | "\n\t%s\n\t%s" % (missed[pid], match.groupdict(), | ||
194 | match.group("RECORD"))) | ||
195 | completions[pid] += [duration] | ||
196 | |||
197 | for pid, durations in completions.iteritems(): | ||
198 | m = Measurement(pid).from_array(durations) | ||
199 | |||
200 | # TODO: not this, please | ||
201 | if not task_dict[pid].run: | ||
202 | task_dict[pid].run.append(m) | ||
203 | |||
204 | job_times = np.array(durations) | ||
205 | mean = job_times.mean() | ||
206 | |||
207 | if not mean or not durations: | ||
208 | continue | 82 | continue |
209 | 83 | ||
210 | # Coefficient of variation | 84 | try: |
211 | cv = job_times.std() / job_times.mean() | 85 | values = struct.unpack_from(rdata.fmt, data) |
212 | # Correction, assuming normal distributions | 86 | except struct.error: |
213 | corrected = (1 + 1/(4 * len(job_times))) * cv | ||
214 | |||
215 | varz.add(task_dict[pid], corrected) | ||
216 | # varz.add(task_dict[pid], m[Type.Var]) | ||
217 | |||
218 | if exp_point: | ||
219 | map(methodcaller('write_measurements', exp_point), | ||
220 | [varz, flushes, loads, fworks, lworks]) | ||
221 | |||
222 | def config_exit_stats(task_dict, data): | ||
223 | # # Dictionary of task exit measurements by pid | ||
224 | # exits = get_task_exits(data) | ||
225 | # exit_dict = dict((e.id, e) for e in exits) | ||
226 | extract_variance(task_dict, data, None) | ||
227 | |||
228 | # Dictionary where keys are configurations, values are list | ||
229 | # of tasks with those configuratino | ||
230 | config_dict = defaultdict(lambda: []) | ||
231 | for t in task_dict.itervalues(): | ||
232 | config_dict[t.config] += [t] | ||
233 | |||
234 | for config in config_dict: | ||
235 | task_list = sorted(config_dict[config]) | ||
236 | |||
237 | # # Replace tasks with corresponding exit stats | ||
238 | # if not t.pid in exit_dict: | ||
239 | # raise Exception("Missing exit record for task '%s' in '%s'" % | ||
240 | # (t, file.name)) | ||
241 | # exit_list = [exit_dict[t.pid] for t in task_list] | ||
242 | exit_list = [t.run[0] for t in task_list] | ||
243 | config_dict[config] = exit_list | ||
244 | |||
245 | return config_dict | ||
246 | |||
247 | saved_stats = {} | ||
248 | def get_base_stats(base_file): | ||
249 | if base_file in saved_stats: | ||
250 | return saved_stats[base_file] | ||
251 | with open(base_file, 'r') as f: | ||
252 | data = f.read() | ||
253 | task_dict = get_task_dict(data) | ||
254 | |||
255 | result = config_exit_stats(task_dict, data) | ||
256 | saved_stats[base_file] = result | ||
257 | return result | ||
258 | |||
259 | def extract_scaling_data(task_dict, data, result, base_file): | ||
260 | # Generate trees of tasks with matching configurations | ||
261 | data_stats = config_exit_stats(task_dict, data) | ||
262 | base_stats = get_base_stats(base_file) | ||
263 | |||
264 | # Scaling factors are calculated by matching groups of tasks with the same | ||
265 | # config, then comparing task-to-task exec times in order of PID within | ||
266 | # each group | ||
267 | max_scales = LeveledArray("max-scaling") | ||
268 | avg_scales = LeveledArray("avg-scaling") | ||
269 | |||
270 | for config in data_stats: | ||
271 | if len(data_stats[config]) != len(base_stats[config]): | ||
272 | # Quit, we are missing a record and can't guarantee | ||
273 | # a task-to-task comparison | ||
274 | continue | 87 | continue |
275 | 88 | ||
276 | for data_stat, base_stat in zip(data_stats[config],base_stats[config]): | 89 | obj = rdata.clazz(*values) |
277 | if not base_stat[Type.Avg] or not base_stat[Type.Max] or \ | 90 | yield (obj, rdata.method) |
278 | not data_stat[Type.Avg] or not data_stat[Type.Max]: | 91 | |
279 | continue | 92 | def read_data(task_dict, fnames): |
280 | # How much larger is their exec stat than ours? | 93 | '''Read records from @fnames and store per-pid stats in @task_dict.''' |
281 | avg_scale = float(base_stat[Type.Avg]) / float(data_stat[Type.Avg]) | 94 | buff = [] |
282 | max_scale = float(base_stat[Type.Max]) / float(data_stat[Type.Max]) | 95 | |
96 | def add_record(itera): | ||
97 | # Ordered insertion into buff | ||
98 | try: | ||
99 | next_ret = itera.next() | ||
100 | except StopIteration: | ||
101 | return | ||
283 | 102 | ||
284 | task = task_dict[data_stat.id] | 103 | arecord, method = next_ret |
104 | i = 0 | ||
105 | for (i, (brecord, m, t)) in enumerate(buff): | ||
106 | if brecord.when > arecord.when: | ||
107 | break | ||
108 | buff.insert(i, (arecord, method, itera)) | ||
285 | 109 | ||
286 | avg_scales.add(task, avg_scale) | 110 | for fname in fnames: |
287 | max_scales.add(task, max_scale) | 111 | itera = make_iterator(fname) |
112 | add_record(itera) | ||
288 | 113 | ||
289 | avg_scales.write_measurements(result) | 114 | while buff: |
290 | max_scales.write_measurements(result) | 115 | (record, method, itera) = buff.pop(0) |
291 | 116 | ||
292 | def extract_sched_data(data_file, result, base_file): | 117 | add_record(itera) |
293 | with open(data_file, 'r') as f: | 118 | method(task_dict, record) |
294 | data = f.read() | ||
295 | 119 | ||
296 | task_dict = get_task_dict(data) | 120 | def process_completion(task_dict, record): |
121 | task_dict[record.pid].misses.store_time(record) | ||
297 | 122 | ||
298 | try: | 123 | def process_release(task_dict, record): |
299 | extract_tardy_vals(task_dict, data, result) | 124 | data = task_dict[record.pid] |
300 | extract_variance(task_dict, data, result) | 125 | data.jobs += 1 |
301 | except Exception as e: | 126 | data.misses.start_time(record) |
302 | print("Error in %s" % data_file) | ||
303 | raise e | ||
304 | 127 | ||
305 | if (base_file): | 128 | def process_param(task_dict, record): |
306 | extract_scaling_data(task_dict, data, result, base_file) | 129 | params = TaskParams(record.wcet, record.period, record.partition) |
130 | task_dict[record.pid].params = params | ||
131 | |||
132 | def process_block(task_dict, record): | ||
133 | task_dict[record.pid].blocks.start_time(record) | ||
134 | |||
135 | def process_resume(task_dict, record): | ||
136 | task_dict[record.pid].blocks.store_time(record) | ||
137 | |||
138 | register_record('ResumeRecord', 9, process_resume, 'Q8x', ['when']) | ||
139 | register_record('BlockRecord', 8, process_block, 'Q8x', ['when']) | ||
140 | register_record('CompletionRecord', 7, process_completion, 'Q8x', ['when']) | ||
141 | register_record('ReleaseRecord', 3, process_release, 'QQ', ['release', 'when']) | ||
142 | register_record('ParamRecord', 2, process_param, 'IIIcc2x', | ||
143 | ['wcet','period','phase','partition', 'task_class']) | ||
144 | |||
145 | def extract_sched_data(result, data_dir, work_dir): | ||
146 | bin_files = conf.FILES['sched_data'].format(".*") | ||
147 | output_file = "%s/out-st" % work_dir | ||
148 | |||
149 | bins = [f for f in os.listdir(data_dir) if re.match(bin_files, f)] | ||
150 | if not len(bins): | ||
151 | return | ||
152 | |||
153 | # Save an in-english version of the data for debugging | ||
154 | cmd_arr = [conf.BINS['st_show']] | ||
155 | cmd_arr.extend(bins) | ||
156 | with open(output_file, "w") as f: | ||
157 | subprocess.call(cmd_arr, cwd=data_dir, stdout=f) | ||
158 | |||
159 | task_dict = defaultdict(lambda : | ||
160 | TaskData(0, 0, TimeTracker(), TimeTracker())) | ||
161 | |||
162 | # Gather per-task values | ||
163 | read_data(task_dict, bins) | ||
164 | |||
165 | stat_data = {"avg-tard" : [], "max-tard" : [], | ||
166 | "avg-block" : [], "max-block" : [], | ||
167 | "miss-ratio" : []} | ||
168 | |||
169 | # Group per-task values | ||
170 | for tdata in task_dict.itervalues(): | ||
171 | miss_ratio = float(tdata.misses.num) / tdata.jobs | ||
172 | # Scale average down to account for jobs with 0 tardiness | ||
173 | avg_tard = tdata.misses.avg * miss_ratio | ||
174 | |||
175 | stat_data["miss-ratio"].append(miss_ratio) | ||
176 | stat_data["avg-tard" ].append(avg_tard / tdata.params.wcet) | ||
177 | stat_data["max-tard" ].append(tdata.misses.max / tdata.params.wcet) | ||
178 | stat_data["avg-block" ].append(tdata.blocks.avg / NSEC_PER_MSEC) | ||
179 | stat_data["max-block" ].append(tdata.blocks.max / NSEC_PER_MSEC) | ||
180 | |||
181 | # Summarize value groups | ||
182 | for name, data in stat_data.iteritems(): | ||
183 | result[name] = Measurement(str(name)).from_array(data) | ||