aboutsummaryrefslogtreecommitdiffstats
path: root/parse/sched.py
diff options
context:
space:
mode:
authorJonathan Herman <hermanjl@cs.unc.edu>2013-04-23 17:28:12 -0400
committerJonathan Herman <hermanjl@cs.unc.edu>2013-04-23 17:28:12 -0400
commit2ceaa6c607ef85bde4f14017634d9d1621efca29 (patch)
treec85e755e59907a48ff762fd56473449f33c23894 /parse/sched.py
parenta0e4b9fe9d7fab9a50a626cfeda3c614a9a6af5d (diff)
parent7545402506aa76261e18d85af585ff0ac1cf05c1 (diff)
Merge branch 'master' into wip-color-mc
Conflicts: gen/generator.py parse/sched.py parse_exps.py
Diffstat (limited to 'parse/sched.py')
-rw-r--r--parse/sched.py354
1 files changed, 211 insertions, 143 deletions
diff --git a/parse/sched.py b/parse/sched.py
index 1f07751..1033989 100644
--- a/parse/sched.py
+++ b/parse/sched.py
@@ -3,67 +3,119 @@ import os
3import re 3import re
4import struct 4import struct
5import subprocess 5import subprocess
6import sys
7 6
8from collections import defaultdict,namedtuple 7from collections import defaultdict,namedtuple
9from common import recordtype 8from common import recordtype,log_once
10from point import Measurement 9from point import Measurement
10from ctypes import *
11
12LOSS_MSG = """Found task missing more than %d%% of its scheduling records.
13These won't be included in scheduling statistics!"""%(100*conf.MAX_RECORD_LOSS)
14SKIP_MSG = """Measurement '%s' has no non-zero values.
15Measurements like these are not included in scheduling statistics.
16If a measurement is missing, this is why."""
17SCALE_MSG = """Task in {} with config {} has < 1.0 scale!
18These scales are skipped in measurements."""
19
20# Data stored for each task
21TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu', 'level'])
22TaskData = recordtype('TaskData', ['params', 'jobs', 'loads',
23 'blocks', 'misses', 'execs'])
24
25ScaleData = namedtuple('ScaleData', ['reg_tasks', 'base_tasks'])
11 26
12class TimeTracker: 27class TimeTracker:
13 '''Store stats for durations of time demarcated by sched_trace records.''' 28 '''Store stats for durations of time demarcated by sched_trace records.'''
14 def __init__(self): 29 def __init__(self):
15 self.begin = self.avg = self.max = self.num = self.job = 0 30 self.begin = self.avg = self.max = self.num = self.next_job = 0
31
32 # Count of times the job in start_time matched that in store_time
33 self.matches = 0
34 # And the times it didn't
35 self.disjoints = 0
16 36
17 def store_time(self, record): 37 # Measurements are recorded in store_ time using the previous matching
38 # record which was passed to store_time. This way, the last record for
39 # any task is always skipped
40 self.last_record = None
41
42 def store_time(self, next_record):
18 '''End duration of time.''' 43 '''End duration of time.'''
19 dur = record.when - self.begin 44 dur = (self.last_record.when - self.begin) if self.last_record else -1
45
46 if self.next_job == next_record.job:
47 self.last_record = next_record
20 48
21 if self.job == record.job and dur > 0: 49 if self.last_record:
22 self.max = max(self.max, dur) 50 self.matches += 1
23 self.avg *= float(self.num / (self.num + 1))
24 self.num += 1
25 self.avg += dur / float(self.num)
26 51
27 self.begin = 0 52 if dur > 0:
28 self.job = 0 53 self.max = max(self.max, dur)
54 self.avg *= float(self.num / (self.num + 1))
55 self.num += 1
56 self.avg += dur / float(self.num)
29 57
30 def start_time(self, record): 58 self.begin = 0
59 self.next_job = 0
60 else:
61 self.disjoints += 1
62
63 def start_time(self, record, time = None):
31 '''Start duration of time.''' 64 '''Start duration of time.'''
32 self.begin = record.when 65 if self.last_record:
33 self.job = record.job 66 if not time:
67 self.begin = self.last_record.when
68 else:
69 self.begin = time
34 70
35# Data stored for each task 71 self.next_job = record.job
36TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu', 'level'])
37TaskData = recordtype('TaskData', ['params', 'jobs', 'loads',
38 'blocks', 'misses', 'execs'])
39 72
40# Map of event ids to corresponding class, binary format, and processing methods
41RecordInfo = namedtuple('RecordInfo', ['clazz', 'fmt', 'method'])
42record_map = [0]*10
43 73
44# Common to all records 74class LeveledArray(object):
45HEADER_FORMAT = '<bbhi' 75 """Groups statistics by the level of the task to which they apply"""
46HEADER_FIELDS = ['type', 'cpu', 'pid', 'job'] 76 def __init__(self):
47RECORD_SIZE = 24 77 self.name = name
78 self.vals = defaultdict(lambda: defaultdict(lambda:[]))
79
80 def add(self, name, level, value):
81 if type(value) != type([]):
82 value = [value]
83 self.vals[name][level] += value
84
85 def write_measurements(self, result):
86 for stat_name, stat_data in self.vals.iteritems():
87 for level, values in stat_data.iteritems():
88 if not values or not sum(values):
89 log_once(SKIP_MSG, SKIP_MSG % stat_name)
90 continue
91
92 name = "%s%s" % ("%s-" % level if level else "", stat_name)
93 result[name] = Measurement(name).from_array(arr)
48 94
95# Map of event ids to corresponding class and format
96record_map = {}
97
98RECORD_SIZE = 24
49NSEC_PER_MSEC = 1000000 99NSEC_PER_MSEC = 1000000
50 100
51def register_record(name, id, method, fmt, fields): 101def register_record(id, clazz):
52 '''Create record description from @fmt and @fields and map to @id, using 102 fields = clazz.FIELDS
53 @method to process parsed record.'''
54 # Format of binary data (see python struct documentation)
55 rec_fmt = HEADER_FORMAT + fmt
56 103
57 # Corresponding field data 104 fsize = lambda fields : sum([sizeof(list(f)[1]) for f in fields])
58 rec_fields = HEADER_FIELDS + fields 105 diff = RECORD_SIZE - fsize(SchedRecord.FIELDS) - fsize(fields)
59 if "when" not in rec_fields: # Force a "when" field for everything
60 rec_fields += ["when"]
61 106
62 # Create mutable class with the given fields 107 # Create extra padding fields to make record the proper size
63 field_class = recordtype(name, list(rec_fields)) 108 # Creating one big field of c_uint64 and giving it a size of 8*diff
64 clazz = type(name, (field_class, object), {}) 109 # _shoud_ work, but doesn't. This is an uglier way of accomplishing
110 # the same goal
111 for d in range(diff):
112 fields += [("extra%d" % d, c_char)]
65 113
66 record_map[id] = RecordInfo(clazz, rec_fmt, method) 114 # Create structure with fields and methods of clazz
115 clazz2 = type("Dummy%d" % id, (LittleEndianStructure,clazz),
116 {'_fields_': SchedRecord.FIELDS + fields,
117 '_pack_' : 1})
118 record_map[id] = clazz2
67 119
68def make_iterator(fname): 120def make_iterator(fname):
69 '''Iterate over (parsed record, processing method) in a 121 '''Iterate over (parsed record, processing method) in a
@@ -73,7 +125,6 @@ def make_iterator(fname):
73 return 125 return
74 126
75 f = open(fname, 'rb') 127 f = open(fname, 'rb')
76 max_type = len(record_map)
77 128
78 while True: 129 while True:
79 data = f.read(RECORD_SIZE) 130 data = f.read(RECORD_SIZE)
@@ -83,156 +134,169 @@ def make_iterator(fname):
83 except struct.error: 134 except struct.error:
84 break 135 break
85 136
86 rdata = record_map[type_num] if type_num <= max_type else 0 137 if type_num not in record_map:
87 if not rdata:
88 continue 138 continue
89 139
90 try: 140 clazz = record_map[type_num]
91 values = struct.unpack_from(rdata.fmt, data) 141 obj = clazz()
92 except struct.error: 142 obj.fill(data)
93 continue
94 143
95 obj = rdata.clazz(*values) 144 if obj.job != 1:
96 yield (obj, rdata.method) 145 yield obj
146 else:
147 # Results from the first job are nonsense
148 pass
97 149
98def read_data(task_dict, fnames): 150def read_data(task_dict, fnames):
99 '''Read records from @fnames and store per-pid stats in @task_dict.''' 151 '''Read records from @fnames and store per-pid stats in @task_dict.'''
100 buff = [] 152 buff = []
101 153
154 def get_time(record):
155 return record.when if hasattr(record, 'when') else 0
156
102 def add_record(itera): 157 def add_record(itera):
103 # Ordered insertion into buff 158 # Ordered insertion into buff
104 try: 159 try:
105 next_ret = itera.next() 160 arecord = itera.next()
106 except StopIteration: 161 except StopIteration:
107 return 162 return
108 163
109 arecord, method = next_ret
110 i = 0 164 i = 0
111 for (i, (brecord, m, t)) in enumerate(buff): 165 for (i, (brecord, _)) in enumerate(buff):
112 if brecord.when > arecord.when: 166 if get_time(brecord) > get_time(arecord):
113 break 167 break
114 buff.insert(i, (arecord, method, itera)) 168 buff.insert(i, (arecord, itera))
115 169
116 for fname in fnames: 170 for fname in fnames:
117 itera = make_iterator(fname) 171 itera = make_iterator(fname)
118 add_record(itera) 172 add_record(itera)
119 173
120 while buff: 174 while buff:
121 (record, method, itera) = buff.pop(0) 175 record, itera = buff.pop(0)
122 176
123 add_record(itera) 177 add_record(itera)
124 method(task_dict, record) 178 record.process(task_dict)
125 179
126def process_completion(task_dict, record): 180class SchedRecord(object):
127 task_dict[record.pid].misses.store_time(record) 181 # Subclasses will have their FIELDs merged into this one
128 task_dict[record.pid].loads += [record.load] 182 FIELDS = [('type', c_uint8), ('cpu', c_uint8),
129 183 ('pid', c_uint16), ('job', c_uint32)]
130def process_release(task_dict, record): 184
131 data = task_dict[record.pid] 185 def fill(self, data):
132 data.jobs += 1 186 memmove(addressof(self), data, RECORD_SIZE)
133 data.misses.start_time(record) 187
134 188 def process(self, task_dict):
135def process_param(task_dict, record): 189 raise NotImplementedError()
136 level = chr(97 + record.level) 190
137 params = TaskParams(record.wcet, record.period, 191class ParamRecord(SchedRecord):
138 record.partition, level) 192 FIELDS = [('wcet', c_uint32), ('period', c_uint32),
139 task_dict[record.pid].params = params 193 ('phase', c_uint32), ('partition', c_uint8),
140 194 ('class', c_uint8), ('level', c_uint8)]
141def process_block(task_dict, record): 195
142 task_dict[record.pid].blocks.start_time(record) 196 def process(self, task_dict):
143 197 params = TaskParams(self.wcet, self.period,
144def process_resume(task_dict, record): 198 self.partition, self.level)
145 task_dict[record.pid].blocks.store_time(record) 199 task_dict[self.pid].params = params
146 200
147def process_switch_to(task_dict, record): 201class ReleaseRecord(SchedRecord):
148 task_dict[record.pid].execs.start_time(record) 202 FIELDS = [('when', c_uint64), ('release', c_uint64)]
149 203
150def process_switch_away(task_dict, record): 204 def process(self, task_dict):
151 task_dict[record.pid].execs.store_time(record) 205 data = task_dict[self.pid]
152 206 data.jobs += 1
153register_record('ResumeRecord', 9, process_resume, 'Q8x', ['when']) 207 if data.params:
154register_record('BlockRecord', 8, process_block, 'Q8x', ['when']) 208 data.misses.start_time(self, self.when + data.params.period)
155register_record('CompletionRecord', 7, process_completion, 'QQ', ['when', 'load']) 209
156register_record('ReleaseRecord', 3, process_release, 'QQ', ['release', 'when']) 210class CompletionRecord(SchedRecord):
157register_record('SwitchToRecord', 5, process_switch_to, 'Q8x', ['when']) 211 FIELDS = [('when', c_uint64)]
158register_record('SwitchAwayRecord', 6, process_switch_away, 'Q8x', ['when']) 212
159register_record('ParamRecord', 2, process_param, 'IIIcccx', 213 def process(self, task_dict):
160 ['wcet','period','phase','partition', 'task_class', 'level']) 214 task_dict[self.pid].misses.store_time(self)
161 215
162saved_stats = [] 216class BlockRecord(SchedRecord):
163def get_task_data(data_dir, work_dir = None): 217 FIELDS = [('when', c_uint64)]
218
219 def process(self, task_dict):
220 task_dict[self.pid].blocks.start_time(self)
221
222class ResumeRecord(SchedRecord):
223 FIELDS = [('when', c_uint64)]
224
225 def process(self, task_dict):
226 task_dict[self.pid].blocks.store_time(self)
227
228# Map records to sched_trace ids (see include/litmus/sched_trace.h
229register_record(2, ParamRecord)
230register_record(3, ReleaseRecord)
231register_record(7, CompletionRecord)
232register_record(8, BlockRecord)
233register_record(9, ResumeRecord)
234
235__all_dicts = {}
236
237def create_task_dict(data_dir, work_dir = None):
164 '''Parse sched trace files''' 238 '''Parse sched trace files'''
165 if data_dir in saved_stats: 239 if data_dir in __all_dicts:
166 return data_dir 240 return __all_dicts[data_dir]
167 241
168 bin_files = conf.FILES['sched_data'].format(".*") 242 bin_files = conf.FILES['sched_data'].format(".*")
169 output_file = "%s/out-st" % work_dir 243 output_file = "%s/out-st" % work_dir
170 244
171 bins = ["%s/%s" % (data_dir,f) for f in os.listdir(data_dir) if re.match(bin_files, f)] 245 task_dict = defaultdict(lambda :
172 if not len(bins): 246 TaskData(None, 1, TimeTracker(), TimeTracker()))
173 return 247
248 bin_names = [f for f in os.listdir(data_dir) if re.match(bin_files, f)]
249 if not len(bin_names):
250 return task_dict
174 251
175 # Save an in-english version of the data for debugging 252 # Save an in-english version of the data for debugging
176 # This is optional and will only be done if 'st_show' is in PATH 253 # This is optional and will only be done if 'st_show' is in PATH
177 if work_dir and conf.BINS['st_show']: 254 if work_dir and conf.BINS['st_show']:
178 cmd_arr = [conf.BINS['st_show']] 255 cmd_arr = [conf.BINS['st_show']]
179 cmd_arr.extend(bins) 256 cmd_arr.extend(bin_names)
180 with open(output_file, "w") as f: 257 with open(output_file, "w") as f:
181 print("calling %s" % cmd_arr)
182 subprocess.call(cmd_arr, cwd=data_dir, stdout=f) 258 subprocess.call(cmd_arr, cwd=data_dir, stdout=f)
183 259
184 task_dict = defaultdict(lambda :TaskData(0, 0, 0, [], TimeTracker(),
185 TimeTracker(), TimeTracker()))
186
187 # Gather per-task values 260 # Gather per-task values
188 read_data(task_dict, bins) 261 bin_paths = ["%s/%s" % (data_dir,f) for f in bin_names]
262 read_data(task_dict, bin_paths)
189 263
190 saved_stats[data_dir] = task_dict 264 __all_dicts[data_dir] = task_dict
191 return task_dict
192 265
193class LeveledArray(object): 266 return task_dict
194 """Groups statistics by the level of the task to which they apply"""
195 def __init__(self):
196 self.name = name
197 self.vals = defaultdict(lambda: defaultdict(lambda:[]))
198
199 def add(self, name, level, value):
200 if type(value) != type([]):
201 value = [value]
202 self.vals[name][task.config.level] += value
203
204 def write_measurements(self, result):
205 for stat_name, stat_data in self.vals.iteritems():
206 for level, values in stat_data.iteritems():
207 if not values:
208 continue
209
210 name = "%s%s" % ("%s-" % level if level else "", stat_name)
211 result[name] = Measurement(name).from_array(arr)
212 267
213def extract_sched_data(result, data_dir, work_dir): 268def extract_sched_data(result, data_dir, work_dir):
214 task_dict = get_task_data(data_dir, work_dir) 269 task_dict = create_task_dict(data_dir, work_dir)
215
216 stat_data = LeveledArray() 270 stat_data = LeveledArray()
271
217 for tdata in task_dict.itervalues(): 272 for tdata in task_dict.itervalues():
218 if not tdata.params: 273 if not tdata.params:
219 # Currently unknown where these invalid tasks come from... 274 # Currently unknown where these invalid tasks come from...
220 continue 275 continue
221 276
222 miss_ratio = float(tdata.misses.num) / tdata.jobs 277 level = tdata.config.level
223 # Scale average down to account for jobs with 0 tardiness 278 miss = tdata.misses
224 avg_tard = tdata.misses.avg * miss_ratio 279
280 record_loss = float(miss.disjoints)/(miss.matches + miss.disjoints)
281 stat_data("record-loss", level, record_loss)
282
283 if record_loss > conf.MAX_RECORD_LOSS:
284 log_once(LOSS_MSG)
285 continue
286
287 miss_ratio = float(miss.num) / miss.matches
288 avg_tard = miss.avg * miss_ratio
225 289
226 level = tdata.params.level 290 stat_data("miss-ratio", level, miss_ratio)
227 stat_data.add("miss-ratio", level, miss_ratio) 291
228 stat_data.add("avg-tard", level, avg_tard / tdata.params.wcet) 292 stat_data("max-tard", level, miss.max / tdata.params.period)
229 stat_data.add("max-tard", level, tdata.misses.max / tdata.params.wcet) 293 stat_data("avg-tard", level, avg_tard / tdata.params.period)
230 stat_data.add("avg-block", level, tdata.blocks.avg / NSEC_PER_MSEC) 294
231 stat_data.add("max-block", level, tdata.blocks.max / NSEC_PER_MSEC) 295 stat_data("avg-block", level, tdata.blocks.avg / NSEC_PER_MSEC)
296 stat_data("max-block", level, tdata.blocks.max / NSEC_PER_MSEC)
232 297
233 stat_data.write_measurements(result) 298 stat_data.write_measurements(result)
234 299
235ScaleData = namedtuple('ScaleData', ['reg_tasks', 'base_tasks'])
236def extract_mc_data(result, data_dir, base_dir): 300def extract_mc_data(result, data_dir, base_dir):
237 task_dict = get_task_data(data_dir) 301 task_dict = get_task_data(data_dir)
238 base_dict = get_task_data(base_dir) 302 base_dict = get_task_data(base_dir)
@@ -245,12 +309,14 @@ def extract_mc_data(result, data_dir, base_dir):
245 309
246 tasks_by_config = defaultdict(lambda: ScaleData([], [])) 310 tasks_by_config = defaultdict(lambda: ScaleData([], []))
247 311
248 # Add tasks in order of pid to tasks_by_config 312 # Add task execution times in order of pid to tasks_by_config
249 # Tasks must be ordered by pid or we can't make 1 to 1 comparisons
250 # when multiple tasks have the same config in each task set
251 for tasks, field in ((task_dict, 'reg_tasks'), (base_dict, 'base_tasks')): 313 for tasks, field in ((task_dict, 'reg_tasks'), (base_dict, 'base_tasks')):
314 # Sorted for tie breaking: if 3 regular tasks have the same config
315 # (so 3 base tasks also have the same config), match first pid regular
316 # with first pid base, etc. This matches tie breaking in kernel
252 for pid in sorted(tasks.keys()): 317 for pid in sorted(tasks.keys()):
253 tdata = tasks[pid] 318 tdata = tasks[pid]
319
254 tlist = getattr(tasks_by_config[tdata.params], field) 320 tlist = getattr(tasks_by_config[tdata.params], field)
255 tlist += [tdata.execs] 321 tlist += [tdata.execs]
256 322
@@ -260,7 +326,10 @@ def extract_mc_data(result, data_dir, base_dir):
260 # Can't make comparison if different numbers of tasks! 326 # Can't make comparison if different numbers of tasks!
261 continue 327 continue
262 328
329 # Tuples of (regular task execution times, base task execution times)
330 # where each has the same configuration
263 all_pairs = zip(scale_data.reg_tasks, scale_data.base_tasks) 331 all_pairs = zip(scale_data.reg_tasks, scale_data.base_tasks)
332
264 for reg_execs, base_execs in all_pairs: 333 for reg_execs, base_execs in all_pairs:
265 if not reg_execs.max or not reg_execs.avg or\ 334 if not reg_execs.max or not reg_execs.avg or\
266 not base_execs.max or not base_execs.avg: 335 not base_execs.max or not base_execs.avg:
@@ -271,8 +340,7 @@ def extract_mc_data(result, data_dir, base_dir):
271 avg_scale = float(base_execs.avg) / reg_execs.avg 340 avg_scale = float(base_execs.avg) / reg_execs.avg
272 341
273 if (avg_scale < 1 or max_scale < 1) and config.level == "b": 342 if (avg_scale < 1 or max_scale < 1) and config.level == "b":
274 sys.stderr.write("Task in {} with config {} has <1.0 scale!" 343 log_once(SCALE_MSG, SCALE_MSG.format(data_dir, config))
275 .format(data_dir, config)
276 continue 344 continue
277 345
278 stat_data.add('max-scale', config.level, max_scale) 346 stat_data.add('max-scale', config.level, max_scale)