aboutsummaryrefslogtreecommitdiffstats
path: root/parse/sched.py
diff options
context:
space:
mode:
authorJonathan Herman <hermanjl@cs.unc.edu>2012-11-26 16:02:48 -0500
committerJonathan Herman <hermanjl@cs.unc.edu>2012-11-26 16:02:48 -0500
commitcb8db5d30ee769304c2c2b00f2a7d9bcb3c4098f (patch)
treec5352d84285af565d5246c3eb861ffba709761f1 /parse/sched.py
parent41c867480f1e20bd3b168258ed71450499ea6ccf (diff)
Removed 2-step parse for scheduling statistics.
Diffstat (limited to 'parse/sched.py')
-rw-r--r--parse/sched.py445
1 files changed, 161 insertions, 284 deletions
diff --git a/parse/sched.py b/parse/sched.py
index 3e30880..ffc6224 100644
--- a/parse/sched.py
+++ b/parse/sched.py
@@ -1,306 +1,183 @@
1"""
2TODO: No longer very pythonic, lot of duplicate code
3print out task execution times or something
4get miss ratio and tardiness directly from schedule OR
5email list about turning on optional summary statistics OR
6set up run exps to only get release and completions to get these stats
7"""
8
9import config.config as conf 1import config.config as conf
10import os 2import os
11import re 3import re
12import numpy as np 4import struct
13import subprocess 5import subprocess
14 6
15from collections import namedtuple,defaultdict 7from collections import defaultdict,namedtuple
16from operator import methodcaller 8from common import recordtype
17from point import Measurement,Type 9from point import Measurement
18
19PARAM_RECORD = r"(?P<RECORD>" +\
20 r"PARAM *?(?P<PID>\d+)\/.*?" +\
21 r"cost.*?(?P<WCET>[\d\.]+)ms.*?" +\
22 r"period.*?(?P<PERIOD>[\d.]+)ms.*?" +\
23 r"part.*?(?P<CPU>\d+)[, ]*" +\
24 r"(?:class=(?P<CLASS>\w+))?[, ]*" +\
25 r"(?:level=(?P<LEVEL>\w+))?).*$"
26EXIT_RECORD = r"(?P<RECORD>" +\
27 r"TASK_EXIT *?(?P<PID>\d+)/.*?" +\
28 r"Avg.*?(?P<AVG>\d+).*?" +\
29 r"Max.*?(?P<MAX>\d+))"
30TARDY_RECORD = r"(?P<RECORD>" +\
31 r"TASK_TARDY.*?(?P<PID>\d+)/(?P<JOB>\d+).*?" +\
32 r"Tot.*?(?P<TOTAL>[\d\.]+).*?ms.*?" +\
33 r"(?P<MAX>[\d\.]+).*?ms.*?" +\
34 r"(?P<MISSES>[\d\.]+))"
35COMPLETION_RECORD = r"(?P<RECORD>" +\
36 r"COMPLETION.*?(?P<PID>\d+)/.*?" +\
37 r"exec:.*?(?P<EXEC>[\d\.]+)ms.*?" +\
38 r"flush:.*?(?P<FLUSH>[\d\.]+)ms.*?" +\
39 r"flush_work:.*?(?P<FLUSH_WORK>[\d]+).*?" +\
40 r"load:.*?(?P<LOAD>[\d\.]+)ms.*?" +\
41 r"load_work:.*?(?P<LOAD_WORK>[\d]+))"
42
43TaskConfig = namedtuple('TaskConfig', ['cpu','wcet','period','type','level'])
44Task = namedtuple('Task', ['pid', 'config', 'run'])
45
46class LeveledArray(object):
47 """
48 Groups statistics by the level of the task to which they apply
49 """
50 def __init__(self, name):
51 self.name = name
52 self.vals = defaultdict(lambda:[])
53
54 def add(self, task, value):
55 self.vals[task.config.level] += [value]
56
57
58 def write_measurements(self, result):
59 for level, arr in self.vals.iteritems():
60 name = "%s%s" % ("%s-" % level if level else "", self.name)
61 result[name] = Measurement(name).from_array(arr)
62
63def get_st_output(data_dir, out_dir, force=False):
64 """
65 Create and return files containing unpacked sched data
66 """
67 bin_files = conf.FILES['sched_data'].format(".*")
68 bins = [f for f in os.listdir(data_dir) if re.match(bin_files, f)]
69 10
70 output_file = "%s/out-st" % out_dir 11class TimeTracker:
71 12 '''Store stats for durations of time demarcated by sched_trace records.'''
72 if os.path.isfile(output_file): 13 def __init__(self):
73 if force: 14 self.begin = self.avg = self.max = self.num = self.job = 0
74 os.remove(output_file)
75 else:
76 return output_file
77
78 if len(bins) != 0:
79 cmd_arr = [conf.BINS['st_show']]
80 cmd_arr.extend(bins)
81 with open(output_file, "w") as f:
82 subprocess.call(cmd_arr, cwd=data_dir, stdout=f)
83 else:
84 return None
85 return output_file
86
87def get_tasks(data):
88 ret = []
89 for match in re.finditer(PARAM_RECORD, data, re.M):
90 try:
91 t = Task( int(match.group('PID')),
92 TaskConfig( int(match.group('CPU')),
93 float(match.group('WCET')),
94 float(match.group('PERIOD')),
95 match.group("CLASS"),
96 match.group("LEVEL")), [])
97 if not (t.config.period and t.pid):
98 raise Exception()
99 ret += [t]
100 except Exception as e:
101 raise Exception("Invalid task record: %s\nparsed:\n\t%s\n\t%s" %
102 (e, match.groupdict(), match.group('RECORD')))
103 return ret
104
105def get_task_dict(data):
106 tasks_list = get_tasks(data)
107 tasks_dict = {}
108 for t in tasks_list:
109 tasks_dict[t.pid] = t
110 return tasks_dict
111
112def get_task_exits(data):
113 ret = []
114 for match in re.finditer(EXIT_RECORD, data):
115 try:
116 m = Measurement( int(match.group('PID')),
117 {Type.Max : float(match.group('MAX')),
118 Type.Avg : float(match.group('AVG'))})
119 except:
120 raise Exception("Invalid exit record, parsed:\n\t%s\n\t%s" %
121 (match.groupdict(), match.group('RECORD')))
122 15
123 ret += [m] 16 def store_time(self, record):
124 return ret 17 '''End duration of time.'''
18 dur = record.when - self.begin
125 19
20 if self.job == record.job and dur > 0:
21 self.max = max(self.max, dur)
22 self.avg *= float(self.num / (self.num + 1))
23 self.num += 1
24 self.avg += dur / float(self.num)
126 25
127def extract_tardy_vals(task_dict, data, exp_point): 26 self.begin = 0
128 ratios = LeveledArray("miss-ratio") 27 self.job = 0
129 avg_tards = LeveledArray("avg-rel-tardiness") 28
130 max_tards = LeveledArray("max-rel-tardiness") 29 def start_time(self, record):
30 '''Start duration of time.'''
31 self.begin = record.when
32 self.job = record.job
33
34# Data stored for each task
35TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu'])
36TaskData = recordtype('TaskData', ['params', 'jobs', 'blocks', 'misses'])
37
38# Map of event ids to corresponding class, binary format, and processing methods
39RecordInfo = namedtuple('RecordInfo', ['clazz', 'fmt', 'method'])
40record_map = [0]*10
41
42# Common to all records
43HEADER_FORMAT = '<bbhi'
44HEADER_FIELDS = ['type', 'cpu', 'pid', 'job']
45RECORD_SIZE = 24
46
47NSEC_PER_MSEC = 1000000
48
49def register_record(name, id, method, fmt, fields):
50 '''Create record description from @fmt and @fields and map to @id, using
51 @method to process parsed record.'''
52 # Format of binary data (see python struct documentation)
53 rec_fmt = HEADER_FORMAT + fmt
54
55 # Corresponding field data
56 rec_fields = HEADER_FIELDS + fields
57 if "when" not in rec_fields: # Force a "when" field for everything
58 rec_fields += ["when"]
59
60 # Create mutable class with the given fields
61 field_class = recordtype(name, list(rec_fields))
62 clazz = type(name, (field_class, object), {})
63
64 record_map[id] = RecordInfo(clazz, rec_fmt, method)
65
66def make_iterator(fname):
67 '''Iterate over (parsed record, processing method) in a
68 sched-trace file.'''
69 f = open(fname, 'rb')
70 max_type = len(record_map)
71
72 while True:
73 data = f.read(RECORD_SIZE)
131 74
132 for match in re.finditer(TARDY_RECORD, data):
133 try:
134 pid = int(match.group("PID"))
135 jobs = int(match.group("JOB"))
136 misses = int(match.group("MISSES"))
137 total_tard = float(match.group("TOTAL"))
138 max_tard = float(match.group("MAX"))
139
140 if not (jobs and pid): raise Exception()
141 except:
142 raise Exception("Invalid tardy record:\n\t%s\n\t%s" %
143 (match.groupdict(), match.group("RECORD")))
144
145 if pid not in task_dict:
146 raise Exception("Invalid pid '%d' in tardy record:\n\t%s" %
147 (pid, match.group("RECORD")))
148
149 t = task_dict[pid]
150 avg_tards.add(t, total_tard / (jobs * t.config.period))
151 max_tards.add(t, max_tard / t.config.period)
152 ratios.add(t, misses / jobs)
153
154 map(methodcaller('write_measurements', exp_point),
155 [ratios, avg_tards, max_tards])
156
157# TODO: rename
158def extract_variance(task_dict, data, exp_point):
159 varz = LeveledArray("exec-variance")
160 flushes = LeveledArray("cache-flush")
161 loads = LeveledArray("cache-load")
162 fworks = LeveledArray("flush-work")
163 lworks = LeveledArray("load-work")
164
165 completions = defaultdict(lambda: [])
166 missed = defaultdict(lambda: int())
167
168 for match in re.finditer(COMPLETION_RECORD, data):
169 try: 75 try:
170 pid = int(match.group("PID")) 76 type_num = struct.unpack_from('b',data)[0]
171 duration = float(match.group("EXEC")) 77 except struct.error:
172 load = float(match.group("LOAD")) 78 break
173 flush = float(match.group("FLUSH")) 79
174 lwork = int(match.group("LOAD_WORK")) 80 rdata = record_map[type_num] if type_num <= max_type else 0
175 fwork = int(match.group("FLUSH_WORK")) 81 if not rdata:
176
177 if load:
178 loads.add(task_dict[pid], load)
179 lworks.add(task_dict[pid], lwork)
180 if not lwork: raise Exception()
181 if flush:
182 flushes.add(task_dict[pid], flush)
183 fworks.add(task_dict[pid], fwork)
184 if not fwork: raise Exception()
185
186 # Last (exit) record often has exec time of 0
187 missed[pid] += not bool(duration)
188
189 if missed[pid] > 1 or not pid: #TODO: fix, raise Exception()
190 continue
191 except:
192 raise Exception("Invalid completion record, missed: %d:"
193 "\n\t%s\n\t%s" % (missed[pid], match.groupdict(),
194 match.group("RECORD")))
195 completions[pid] += [duration]
196
197 for pid, durations in completions.iteritems():
198 m = Measurement(pid).from_array(durations)
199
200 # TODO: not this, please
201 if not task_dict[pid].run:
202 task_dict[pid].run.append(m)
203
204 job_times = np.array(durations)
205 mean = job_times.mean()
206
207 if not mean or not durations:
208 continue 82 continue
209 83
210 # Coefficient of variation 84 try:
211 cv = job_times.std() / job_times.mean() 85 values = struct.unpack_from(rdata.fmt, data)
212 # Correction, assuming normal distributions 86 except struct.error:
213 corrected = (1 + 1/(4 * len(job_times))) * cv
214
215 varz.add(task_dict[pid], corrected)
216 # varz.add(task_dict[pid], m[Type.Var])
217
218 if exp_point:
219 map(methodcaller('write_measurements', exp_point),
220 [varz, flushes, loads, fworks, lworks])
221
222def config_exit_stats(task_dict, data):
223 # # Dictionary of task exit measurements by pid
224 # exits = get_task_exits(data)
225 # exit_dict = dict((e.id, e) for e in exits)
226 extract_variance(task_dict, data, None)
227
228 # Dictionary where keys are configurations, values are list
229 # of tasks with those configuratino
230 config_dict = defaultdict(lambda: [])
231 for t in task_dict.itervalues():
232 config_dict[t.config] += [t]
233
234 for config in config_dict:
235 task_list = sorted(config_dict[config])
236
237 # # Replace tasks with corresponding exit stats
238 # if not t.pid in exit_dict:
239 # raise Exception("Missing exit record for task '%s' in '%s'" %
240 # (t, file.name))
241 # exit_list = [exit_dict[t.pid] for t in task_list]
242 exit_list = [t.run[0] for t in task_list]
243 config_dict[config] = exit_list
244
245 return config_dict
246
247saved_stats = {}
248def get_base_stats(base_file):
249 if base_file in saved_stats:
250 return saved_stats[base_file]
251 with open(base_file, 'r') as f:
252 data = f.read()
253 task_dict = get_task_dict(data)
254
255 result = config_exit_stats(task_dict, data)
256 saved_stats[base_file] = result
257 return result
258
259def extract_scaling_data(task_dict, data, result, base_file):
260 # Generate trees of tasks with matching configurations
261 data_stats = config_exit_stats(task_dict, data)
262 base_stats = get_base_stats(base_file)
263
264 # Scaling factors are calculated by matching groups of tasks with the same
265 # config, then comparing task-to-task exec times in order of PID within
266 # each group
267 max_scales = LeveledArray("max-scaling")
268 avg_scales = LeveledArray("avg-scaling")
269
270 for config in data_stats:
271 if len(data_stats[config]) != len(base_stats[config]):
272 # Quit, we are missing a record and can't guarantee
273 # a task-to-task comparison
274 continue 87 continue
275 88
276 for data_stat, base_stat in zip(data_stats[config],base_stats[config]): 89 obj = rdata.clazz(*values)
277 if not base_stat[Type.Avg] or not base_stat[Type.Max] or \ 90 yield (obj, rdata.method)
278 not data_stat[Type.Avg] or not data_stat[Type.Max]: 91
279 continue 92def read_data(task_dict, fnames):
280 # How much larger is their exec stat than ours? 93 '''Read records from @fnames and store per-pid stats in @task_dict.'''
281 avg_scale = float(base_stat[Type.Avg]) / float(data_stat[Type.Avg]) 94 buff = []
282 max_scale = float(base_stat[Type.Max]) / float(data_stat[Type.Max]) 95
96 def add_record(itera):
97 # Ordered insertion into buff
98 try:
99 next_ret = itera.next()
100 except StopIteration:
101 return
283 102
284 task = task_dict[data_stat.id] 103 arecord, method = next_ret
104 i = 0
105 for (i, (brecord, m, t)) in enumerate(buff):
106 if brecord.when > arecord.when:
107 break
108 buff.insert(i, (arecord, method, itera))
285 109
286 avg_scales.add(task, avg_scale) 110 for fname in fnames:
287 max_scales.add(task, max_scale) 111 itera = make_iterator(fname)
112 add_record(itera)
288 113
289 avg_scales.write_measurements(result) 114 while buff:
290 max_scales.write_measurements(result) 115 (record, method, itera) = buff.pop(0)
291 116
292def extract_sched_data(data_file, result, base_file): 117 add_record(itera)
293 with open(data_file, 'r') as f: 118 method(task_dict, record)
294 data = f.read()
295 119
296 task_dict = get_task_dict(data) 120def process_completion(task_dict, record):
121 task_dict[record.pid].misses.store_time(record)
297 122
298 try: 123def process_release(task_dict, record):
299 extract_tardy_vals(task_dict, data, result) 124 data = task_dict[record.pid]
300 extract_variance(task_dict, data, result) 125 data.jobs += 1
301 except Exception as e: 126 data.misses.start_time(record)
302 print("Error in %s" % data_file)
303 raise e
304 127
305 if (base_file): 128def process_param(task_dict, record):
306 extract_scaling_data(task_dict, data, result, base_file) 129 params = TaskParams(record.wcet, record.period, record.partition)
130 task_dict[record.pid].params = params
131
132def process_block(task_dict, record):
133 task_dict[record.pid].blocks.start_time(record)
134
135def process_resume(task_dict, record):
136 task_dict[record.pid].blocks.store_time(record)
137
138register_record('ResumeRecord', 9, process_resume, 'Q8x', ['when'])
139register_record('BlockRecord', 8, process_block, 'Q8x', ['when'])
140register_record('CompletionRecord', 7, process_completion, 'Q8x', ['when'])
141register_record('ReleaseRecord', 3, process_release, 'QQ', ['release', 'when'])
142register_record('ParamRecord', 2, process_param, 'IIIcc2x',
143 ['wcet','period','phase','partition', 'task_class'])
144
145def extract_sched_data(result, data_dir, work_dir):
146 bin_files = conf.FILES['sched_data'].format(".*")
147 output_file = "%s/out-st" % work_dir
148
149 bins = [f for f in os.listdir(data_dir) if re.match(bin_files, f)]
150 if not len(bins):
151 return
152
153 # Save an in-english version of the data for debugging
154 cmd_arr = [conf.BINS['st_show']]
155 cmd_arr.extend(bins)
156 with open(output_file, "w") as f:
157 subprocess.call(cmd_arr, cwd=data_dir, stdout=f)
158
159 task_dict = defaultdict(lambda :
160 TaskData(0, 0, TimeTracker(), TimeTracker()))
161
162 # Gather per-task values
163 read_data(task_dict, bins)
164
165 stat_data = {"avg-tard" : [], "max-tard" : [],
166 "avg-block" : [], "max-block" : [],
167 "miss-ratio" : []}
168
169 # Group per-task values
170 for tdata in task_dict.itervalues():
171 miss_ratio = float(tdata.misses.num) / tdata.jobs
172 # Scale average down to account for jobs with 0 tardiness
173 avg_tard = tdata.misses.avg * miss_ratio
174
175 stat_data["miss-ratio"].append(miss_ratio)
176 stat_data["avg-tard" ].append(avg_tard / tdata.params.wcet)
177 stat_data["max-tard" ].append(tdata.misses.max / tdata.params.wcet)
178 stat_data["avg-block" ].append(tdata.blocks.avg / NSEC_PER_MSEC)
179 stat_data["max-block" ].append(tdata.blocks.max / NSEC_PER_MSEC)
180
181 # Summarize value groups
182 for name, data in stat_data.iteritems():
183 result[name] = Measurement(str(name)).from_array(data)