aboutsummaryrefslogtreecommitdiffstats
path: root/parse/sched.py
diff options
context:
space:
mode:
authorGlenn Elliott <gelliott@cs.unc.edu>2014-01-17 20:18:08 -0500
committerGlenn Elliott <gelliott@cs.unc.edu>2014-01-17 20:18:08 -0500
commit69bdd9a6095fbd85cc90447862cc898c3391b3a7 (patch)
tree69969a35847f1656b16d0281b9a269c486254f83 /parse/sched.py
parentcdfc95b4a4fe1bbccac56ad40c8b18e898c8f684 (diff)
parse_exps.py: Make data analysis more accurate.
This patch makes the data analysis of parse_exps.py. The old code suffered from fragility---logs that were out of order lead to many dropped records. This is because it had a look-ahead buffer of 1. This patch does: 1) Maintains buffers of unmatched records. 2) Preloads 200 records from every sched stream and processes them in sorted order. This patch does increase runtime, but prior results could be useless for many trace logs.
Diffstat (limited to 'parse/sched.py')
-rw-r--r--parse/sched.py236
1 files changed, 128 insertions, 108 deletions
diff --git a/parse/sched.py b/parse/sched.py
index 652378b..0ab16ce 100644
--- a/parse/sched.py
+++ b/parse/sched.py
@@ -9,53 +9,76 @@ from common import recordtype,log_once
9from point import Measurement 9from point import Measurement
10from ctypes import * 10from ctypes import *
11 11
12from heapq import *
13
12class TimeTracker: 14class TimeTracker:
13 '''Store stats for durations of time demarcated by sched_trace records.''' 15 '''Store stats for durations of time demarcated by sched_trace records.'''
14 def __init__(self, allow_negative = False): 16 def __init__(self, is_valid_duration = lambda x: True, delay_buffer_size = 1, max_pending = 100):
15 self.begin = self.avg = self.max = self.num = self.next_job = 0 17 self.validator = is_valid_duration
18 self.avg = self.max = self.num = 0
16 19
17 # Count of times the job in start_time matched that in store_time
18 self.matches = 0 20 self.matches = 0
19 # And the times it didn't
20 self.disjoints = 0
21
22 self.allow_negative = allow_negative
23
24 # Measurements are recorded in store_ time using the previous matching
25 # record which was passed to store_time. This way, the last record for
26 # any task is always skipped
27 self.last_record = None
28
29 def store_time(self, next_record):
30 '''End duration of time.'''
31 dur = (self.last_record.when - self.begin) if self.last_record else -1
32
33 if self.next_job == next_record.job:
34 self.last_record = next_record
35 21
36 if self.last_record: 22 self.max_pending = max_pending
37 self.matches += 1 23 self.discarded = 0
38 24
39 if dur > 0 or self.allow_negative: 25 self.delay_buffer_size = delay_buffer_size
40 self.max = max(self.max, dur) 26 self.start_delay_buffer = []
27 self.end_delay_buffer = []
28 self.start_records = {}
29 self.end_records = {}
30
31 def disjoints(self):
32 unmatched = len(self.start_records) + len(self.end_records)
33 return self.discarded + unmatched
34
35 def process_completed(self):
36 completed = self.start_records.viewkeys() & self.end_records.viewkeys()
37 self.matches += len(completed)
38 for c in completed:
39 s, stime = self.start_records[c]
40 e, etime = self.end_records[c]
41 del self.start_records[c]
42 del self.end_records[c]
43
44 dur = etime - stime
45 if self.validator(dur):
46 self.max = max(self.max, dur)
41 self.avg *= float(self.num / (self.num + 1)) 47 self.avg *= float(self.num / (self.num + 1))
42 self.num += 1 48 self.num += 1
43 self.avg += dur / float(self.num) 49 self.avg += dur / float(self.num)
44 50
45 self.begin = 0 51 # Give up on some jobs if they've been hanging around too long.
46 self.next_job = 0 52 # While not strictly needed, it helps improve performance and
47 else: 53 # it is unlikey to cause too much trouble.
48 self.disjoints += 1 54 if(len(self.start_records) > self.max_pending):
55 to_discard = len(self.start_records) - self.max_pending
56 for i in range(to_discard):
57 # pop off the oldest jobs
58 del self.start_records[self.start_records.iterkeys().next()]
59 self.discarded += to_discard
60 if(len(self.end_records) > self.max_pending):
61 to_discard = len(self.end_records) - self.max_pending
62 for i in range(to_discard):
63 # pop off the oldest jobs
64 del self.end_records[self.end_records.iterkeys().next()]
65 self.discarded += to_discard
66
67 def end_time(self, record, time):
68 '''End duration of time.'''
69 if len(self.end_delay_buffer) == self.delay_buffer_size:
70 to_queue = self.end_delay_buffer.pop(0)
71 self.end_records[to_queue[0].job] = to_queue
72 self.end_delay_buffer.append((record, time))
73 self.process_completed()
49 74
50 def start_time(self, record, time = None): 75 def start_time(self, record, time):
51 '''Start duration of time.''' 76 '''Start duration of time.'''
52 if self.last_record: 77 if len(self.start_delay_buffer) == self.delay_buffer_size:
53 if not time: 78 to_queue = self.start_delay_buffer.pop(0)
54 self.begin = self.last_record.when 79 self.start_records[to_queue[0].job] = to_queue
55 else: 80 self.start_delay_buffer.append((record, time))
56 self.begin = time 81 self.process_completed()
57
58 self.next_job = record.job
59 82
60# Data stored for each task 83# Data stored for each task
61TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu']) 84TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu'])
@@ -152,7 +175,12 @@ def make_iterator(fname):
152 175
153def read_data(task_dict, graph_dict, fnames): 176def read_data(task_dict, graph_dict, fnames):
154 '''Read records from @fnames and store per-pid stats in @task_dict.''' 177 '''Read records from @fnames and store per-pid stats in @task_dict.'''
155 buff = [] 178 q = []
179
180 # Number of trace records to buffer from each stream/file.
181 # A sorted window is maintained in order to deal with
182 # events that were recorded out-of-order.
183 window_size = 500
156 184
157 def get_time(record): 185 def get_time(record):
158 return record.when if hasattr(record, 'when') else 0 186 return record.when if hasattr(record, 'when') else 0
@@ -164,19 +192,18 @@ def read_data(task_dict, graph_dict, fnames):
164 except StopIteration: 192 except StopIteration:
165 return 193 return
166 194
167 i = 0 195 sort_key = (get_time(arecord), arecord.job, arecord.pid)
168 for (i, (brecord, _)) in enumerate(buff): 196 heappush(q, (sort_key, arecord, itera))
169 if get_time(brecord) > get_time(arecord):
170 break
171 buff.insert(i, (arecord, itera))
172 197
173 for fname in fnames: 198 for fname in fnames:
174 itera = make_iterator(fname) 199 itera = make_iterator(fname)
175 add_record(itera) 200 for w in range(window_size):
176 201 add_record(itera)
177 while buff:
178 record, itera = buff.pop(0)
179 202
203 sys_released = False
204 while q:
205 sort_key, record, itera = heappop(q)
206 # fetch another record
180 add_record(itera) 207 add_record(itera)
181 record.process(task_dict) 208 record.process(task_dict)
182 record.process_pgm(task_dict, graph_dict) 209 record.process_pgm(task_dict, graph_dict)
@@ -205,7 +232,7 @@ class ParamRecord(SchedRecord):
205 task_dict[self.pid].params = params 232 task_dict[self.pid].params = params
206 233
207class ReleaseRecord(SchedRecord): 234class ReleaseRecord(SchedRecord):
208 # renames the 'release' field to 'when' 235 # renames the 'release' field to 'when' to enable sorting
209 FIELDS = [('when', c_uint64), ('deadline', c_uint64)] 236 FIELDS = [('when', c_uint64), ('deadline', c_uint64)]
210 237
211 def process(self, task_dict): 238 def process(self, task_dict):
@@ -215,18 +242,15 @@ class ReleaseRecord(SchedRecord):
215 data.misses.start_time(self, self.deadline) 242 data.misses.start_time(self, self.deadline)
216 data.lateness.start_time(self, self.deadline) 243 data.lateness.start_time(self, self.deadline)
217 244
218 print ' release %d: r=%d d=%d' % (self.pid, self.when, self.deadline)
219
220 def process_pgm(self, task_dict, graph_dict): 245 def process_pgm(self, task_dict, graph_dict):
221 data = task_dict[self.pid] 246 data = task_dict[self.pid]
222 data.pgm_response.start_time(self, self.when) 247 data.pgm_response.start_time(self, self.when)
223 data.pgm_misses.start_time(self, self.deadline) 248 data.pgm_misses.start_time(self, self.deadline)
224 data.pgm_lateness.start_time(self, self.deadline) 249 data.pgm_lateness.start_time(self, self.deadline)
225 250
226 return
227 ntype = task_dict[self.pid].pgm_params.node_type 251 ntype = task_dict[self.pid].pgm_params.node_type
228 if ntype == PGM_SRC or ntype == PGM_SRC_SINK: 252 if ntype == PGM_SRC or ntype == PGM_SRC_SINK:
229 gid = task_dict[self.pid].pgm_params.graph_pid 253 gid = task_dict[self.pid].pgm_params.gid
230 gdata = graph_dict[gid] 254 gdata = graph_dict[gid]
231 gdata.jobs += 1 255 gdata.jobs += 1
232 gdata.response.start_time(self, self.when) 256 gdata.response.start_time(self, self.when)
@@ -236,34 +260,34 @@ class CompletionRecord(SchedRecord):
236 260
237 def process(self, task_dict): 261 def process(self, task_dict):
238 data = task_dict[self.pid] 262 data = task_dict[self.pid]
239 data.response.store_time(self) 263 data.response.end_time(self, self.when)
240 data.misses.store_time(self) 264 data.misses.end_time(self, self.when)
241 data.lateness.store_time(self) 265 data.lateness.end_time(self, self.when)
242 266
243 def process_pgm(self, task_dict, graph_dict): 267 def process_pgm(self, task_dict, graph_dict):
244 data = task_dict[self.pid] 268 data = task_dict[self.pid]
245 data.pgm_response.store_time(self) 269 data.pgm_response.end_time(self, self.when)
246 data.pgm_misses.store_time(self) 270 data.pgm_misses.end_time(self, self.when)
247 data.pgm_lateness.store_time(self) 271 data.pgm_lateness.end_time(self, self.when)
248 272
249 return 273 if data.pgm_params:
250 ntype = data.pgm_params.node_type 274 ntype = data.pgm_params.node_type
251 if ntype == PGM_SINK or ntype == PGM_SRC_SINK: 275 if ntype == PGM_SINK or ntype == PGM_SRC_SINK:
252 gid = data.pgm_params.graph_pid 276 gid = data.pgm_params.gid
253 gdata = graph_dict[gid] 277 gdata = graph_dict[gid]
254 gdata.response.store_time(self) 278 gdata.response.end_time(self, self.when)
255 279
256class BlockRecord(SchedRecord): 280class BlockRecord(SchedRecord):
257 FIELDS = [('when', c_uint64)] 281 FIELDS = [('when', c_uint64)]
258 282
259 def process(self, task_dict): 283 def process(self, task_dict):
260 task_dict[self.pid].blocks.start_time(self) 284 task_dict[self.pid].blocks.start_time(self, self.when)
261 285
262class ResumeRecord(SchedRecord): 286class ResumeRecord(SchedRecord):
263 FIELDS = [('when', c_uint64)] 287 FIELDS = [('when', c_uint64)]
264 288
265 def process(self, task_dict): 289 def process(self, task_dict):
266 task_dict[self.pid].blocks.store_time(self) 290 task_dict[self.pid].blocks.end_time(self, self.when)
267 291
268class SysReleaseRecord(SchedRecord): 292class SysReleaseRecord(SchedRecord):
269 FIELDS = [('when', c_uint64), ('release', c_uint64)] 293 FIELDS = [('when', c_uint64), ('release', c_uint64)]
@@ -278,12 +302,9 @@ class PgmParamRecord(SchedRecord):
278 pass 302 pass
279 303
280 def process_pgm(self, task_dict, graph_dict): 304 def process_pgm(self, task_dict, graph_dict):
281
282 pgm_params = PgmTaskParams(self.node_type, self.graph_pid) 305 pgm_params = PgmTaskParams(self.node_type, self.graph_pid)
283 task_dict[self.pid].pgm_params = pgm_params 306 task_dict[self.pid].pgm_params = pgm_params
284 307
285 print '%d: graph id = %d, node type = %d' % (self.pid, self.graph_pid, self.node_type)
286
287 if self.node_type == PGM_SRC or self.node_type == PGM_SINK or self.node_type == PGM_SRC_SINK: 308 if self.node_type == PGM_SRC or self.node_type == PGM_SINK or self.node_type == PGM_SRC_SINK:
288 graph_data = graph_dict[self.graph_pid] 309 graph_data = graph_dict[self.graph_pid]
289 if not graph_data.params: 310 if not graph_data.params:
@@ -313,15 +334,12 @@ class PgmReleaseRecord(SchedRecord):
313 data.pgm_misses.start_time(self, self.deadline) 334 data.pgm_misses.start_time(self, self.deadline)
314 data.pgm_lateness.start_time(self, self.deadline) 335 data.pgm_lateness.start_time(self, self.deadline)
315 336
316 print 'pgm_release %d: r=%d d=%d' % (self.pid, self.when, self.deadline) 337 if data.pgm_params:
317 338 ntype = data.pgm_params.node_type
318 return 339 if ntype == PGM_SRC or ntype == PGM_SRC_SINK:
319 340 gid = data.pgm_params.graph_pid
320 ntype = data.pgm_params.node_type 341 gdata = graph_dict[gid]
321 if ntype == PGM_SRC or ntype == PGM_SRC_SINK: 342 gdata.response.start_time(self, self.when)
322 gid = data.pgm_params.graph_pid
323 gdata = graph_dict[gid]
324 gdata.response.start_time(self, self.when)
325 343
326# Map records to sched_trace ids (see include/litmus/sched_trace.h 344# Map records to sched_trace ids (see include/litmus/sched_trace.h
327register_record(2, ParamRecord) 345register_record(2, ParamRecord)
@@ -339,11 +357,11 @@ def create_trace_dict(data_dir, work_dir = None):
339 output_file = "%s/out-st" % work_dir 357 output_file = "%s/out-st" % work_dir
340 358
341 task_dict = defaultdict(lambda : 359 task_dict = defaultdict(lambda :
342 TaskData(None, None, 1, TimeTracker(), 360 TaskData(None, None, 1, TimeTracker(is_valid_duration = lambda x: x > 0),
343 TimeTracker(), TimeTracker(True), TimeTracker(), 361 TimeTracker(), TimeTracker(), TimeTracker(is_valid_duration = lambda x: x > 0),
344 TimeTracker(), TimeTracker(True), TimeTracker())) 362 TimeTracker(), TimeTracker(), TimeTracker(is_valid_duration = lambda x: x > 0)))
345 graph_dict = defaultdict(lambda: 363 graph_dict = defaultdict(lambda:
346 GraphData(None, 1, TimeTracker())) 364 GraphData(None, 1, TimeTracker(is_valid_duration = lambda x: x > 0)))
347 365
348 bin_names = [f for f in os.listdir(data_dir) if re.match(bin_files, f)] 366 bin_names = [f for f in os.listdir(data_dir) if re.match(bin_files, f)]
349 if not len(bin_names): 367 if not len(bin_names):
@@ -375,7 +393,7 @@ def extract_sched_data(result, data_dir, work_dir):
375 gstat_data = defaultdict(list) 393 gstat_data = defaultdict(list)
376 394
377 # Group per-task values 395 # Group per-task values
378 for tdata in task_dict.itervalues(): 396 for task, tdata in task_dict.iteritems():
379 if not tdata.params: 397 if not tdata.params:
380 # Currently unknown where these invalid tasks come from... 398 # Currently unknown where these invalid tasks come from...
381 continue 399 continue
@@ -387,7 +405,7 @@ def extract_sched_data(result, data_dir, work_dir):
387 pgm_lateness = tdata.pgm_lateness 405 pgm_lateness = tdata.pgm_lateness
388 pgm_miss = tdata.pgm_misses 406 pgm_miss = tdata.pgm_misses
389 407
390 record_loss = float(miss.disjoints)/(miss.matches + miss.disjoints) 408 record_loss = float(miss.disjoints())/(miss.matches + miss.disjoints())
391 stat_data["record-loss"].append(record_loss) 409 stat_data["record-loss"].append(record_loss)
392 410
393 if record_loss > conf.MAX_RECORD_LOSS: 411 if record_loss > conf.MAX_RECORD_LOSS:
@@ -404,57 +422,59 @@ def extract_sched_data(result, data_dir, work_dir):
404 # start with basic task information 422 # start with basic task information
405 stat_data["miss-ratio" ].append(miss_ratio) 423 stat_data["miss-ratio" ].append(miss_ratio)
406 424
407 stat_data["max-response"].append(response.max) 425 stat_data["max-response"].append(float(response.max)/NSEC_PER_MSEC)
408 stat_data["avg-response"].append(response.avg) 426 stat_data["avg-response"].append(response.avg/NSEC_PER_MSEC)
409 stat_data["max-response-prop"].append(response.max / tdata.params.period) 427 stat_data["max-response-prop"].append(float(response.max) / tdata.params.period)
410 stat_data["avg-response-prop"].append(response.avg / tdata.params.period) 428 stat_data["avg-response-prop"].append(response.avg / tdata.params.period)
411 429
412 stat_data["max-tard"].append(miss.max) 430 stat_data["max-tard"].append(float(miss.max)/NSEC_PER_MSEC)
413 stat_data["avg-tard"].append(avg_tard) 431 stat_data["avg-tard"].append(avg_tard/NSEC_PER_MSEC)
414 stat_data["max-tard-prop"].append(miss.max / tdata.params.period) 432 stat_data["max-tard-prop"].append(float(miss.max) / tdata.params.period)
415 stat_data["avg-tard-prop"].append(avg_tard / tdata.params.period) 433 stat_data["avg-tard-prop"].append(avg_tard / tdata.params.period)
416 434
417 stat_data["max-response"].append(lateness.max) 435 stat_data["max-response"].append(float(lateness.max)/NSEC_PER_MSEC)
418 stat_data["avg-response"].append(lateness.avg) 436 stat_data["avg-response"].append(lateness.avg/NSEC_PER_MSEC)
419 stat_data["max-response-prop"].append(lateness.max / tdata.params.period) 437 stat_data["max-response-prop"].append(float(lateness.max) / tdata.params.period)
420 stat_data["avg-response-prop"].append(lateness.avg / tdata.params.period) 438 stat_data["avg-response-prop"].append(lateness.avg / tdata.params.period)
421 439
422 # same data, but with PGM-adjusted release times (shifted deadlines) 440 # same data, but with PGM-adjusted release times (shifted deadlines)
423 stat_data["pgm-miss-ratio" ].append(pgm_miss_ratio) 441 stat_data["pgm-miss-ratio" ].append(pgm_miss_ratio)
424 442
425 stat_data["pgm-max-response"].append(pgm_response.max) 443 stat_data["pgm-max-response"].append(float(pgm_response.max)/NSEC_PER_MSEC)
426 stat_data["pgm-avg-response"].append(pgm_response.avg) 444 stat_data["pgm-avg-response"].append(pgm_response.avg/NSEC_PER_MSEC)
427 stat_data["pgm-max-response-prop"].append(pgm_response.max / tdata.params.period) 445 stat_data["pgm-max-response-prop"].append(float(pgm_response.max) / tdata.params.period)
428 stat_data["pgm-avg-response-prop"].append(pgm_response.avg / tdata.params.period) 446 stat_data["pgm-avg-response-prop"].append(pgm_response.avg / tdata.params.period)
429 447
430 stat_data["pgm-max-tard"].append(pgm_miss.max) 448 stat_data["pgm-max-tard"].append(float(pgm_miss.max)/NSEC_PER_MSEC)
431 stat_data["pgm-avg-tard"].append(pgm_avg_tard) 449 stat_data["pgm-avg-tard"].append(pgm_avg_tard/NSEC_PER_MSEC)
432 stat_data["pgm-max-tard-prop"].append(pgm_miss.max / tdata.params.period) 450 stat_data["pgm-max-tard-prop"].append(float(pgm_miss.max) / tdata.params.period)
433 stat_data["pgm-avg-tard-prop"].append(pgm_avg_tard / tdata.params.period) 451 stat_data["pgm-avg-tard-prop"].append(pgm_avg_tard / tdata.params.period)
434 452
435 stat_data["pgm-max-response"].append(pgm_lateness.max) 453 stat_data["pgm-max-response"].append(float(pgm_lateness.max)/NSEC_PER_MSEC)
436 stat_data["pgm-avg-response"].append(pgm_lateness.avg) 454 stat_data["pgm-avg-response"].append(pgm_lateness.avg/NSEC_PER_MSEC)
437 stat_data["pgm-max-response-prop"].append(pgm_lateness.max / tdata.params.period) 455 stat_data["pgm-max-response-prop"].append(float(pgm_lateness.max) / tdata.params.period)
438 stat_data["pgm-avg-response-prop"].append(pgm_lateness.avg / tdata.params.period) 456 stat_data["pgm-avg-response-prop"].append(pgm_lateness.avg / tdata.params.period)
439 457
440 for gdata in graph_dict.itervalues(): 458 for gid, gdata in graph_dict.iteritems():
441 if not gdata.params: 459 if not gdata.params:
442 continue 460 continue
443
444 response = gdata.response 461 response = gdata.response
445 record_loss = float(response.disjoints)/(response.matches + response.disjoints) 462 if response.matches + response.disjoints() == 0:
463 record_loss = 0
464 else:
465 record_loss = float(response.disjoints())/(response.matches + response.disjoints())
446 gstat_data["graph-record-loss"].append(record_loss) 466 gstat_data["graph-record-loss"].append(record_loss)
447 467
448 if record_los > conf.MAX_RECORD_LOSS: 468 if record_loss > conf.MAX_RECORD_LOSS:
449 log_once(LOSS_MSG) 469 log_once(LOSS_MSG)
450 continue 470 continue
451 471
452 gstat_data["graph-max-response"].append(response.max) 472 gstat_data["graph-max-response"].append(float(response.max)/NSEC_PER_MSEC)
453 gstat_data["graph-avg-response"].append(response.avg) 473 gstat_data["graph-avg-response"].append(response.avg/NSEC_PER_MSEC)
454 474
455 # Summarize value groups 475 # Summarize value groups
456 for name, data in stat_data.iteritems(): 476 for name, data in stat_data.iteritems():
457 if not data or not sum(data): 477 if not data:
458 log_once(SKIP_MSG, SKIP_MSG % name) 478 log_once(SKIP_MSG, SKIP_MSG % name)
459 continue 479 continue
460 result[name] = Measurement(str(name)).from_array(data) 480 result[name] = Measurement(str(name)).from_array(data)