aboutsummaryrefslogtreecommitdiffstats
path: root/parse
diff options
context:
space:
mode:
Diffstat (limited to 'parse')
-rw-r--r--parse/sched.py236
1 files changed, 128 insertions, 108 deletions
diff --git a/parse/sched.py b/parse/sched.py
index 652378b..0ab16ce 100644
--- a/parse/sched.py
+++ b/parse/sched.py
@@ -9,53 +9,76 @@ from common import recordtype,log_once
9from point import Measurement 9from point import Measurement
10from ctypes import * 10from ctypes import *
11 11
12from heapq import *
13
12class TimeTracker: 14class TimeTracker:
13 '''Store stats for durations of time demarcated by sched_trace records.''' 15 '''Store stats for durations of time demarcated by sched_trace records.'''
14 def __init__(self, allow_negative = False): 16 def __init__(self, is_valid_duration = lambda x: True, delay_buffer_size = 1, max_pending = 100):
15 self.begin = self.avg = self.max = self.num = self.next_job = 0 17 self.validator = is_valid_duration
18 self.avg = self.max = self.num = 0
16 19
17 # Count of times the job in start_time matched that in store_time
18 self.matches = 0 20 self.matches = 0
19 # And the times it didn't
20 self.disjoints = 0
21
22 self.allow_negative = allow_negative
23
24 # Measurements are recorded in store_ time using the previous matching
25 # record which was passed to store_time. This way, the last record for
26 # any task is always skipped
27 self.last_record = None
28
29 def store_time(self, next_record):
30 '''End duration of time.'''
31 dur = (self.last_record.when - self.begin) if self.last_record else -1
32
33 if self.next_job == next_record.job:
34 self.last_record = next_record
35 21
36 if self.last_record: 22 self.max_pending = max_pending
37 self.matches += 1 23 self.discarded = 0
38 24
39 if dur > 0 or self.allow_negative: 25 self.delay_buffer_size = delay_buffer_size
40 self.max = max(self.max, dur) 26 self.start_delay_buffer = []
27 self.end_delay_buffer = []
28 self.start_records = {}
29 self.end_records = {}
30
31 def disjoints(self):
32 unmatched = len(self.start_records) + len(self.end_records)
33 return self.discarded + unmatched
34
35 def process_completed(self):
36 completed = self.start_records.viewkeys() & self.end_records.viewkeys()
37 self.matches += len(completed)
38 for c in completed:
39 s, stime = self.start_records[c]
40 e, etime = self.end_records[c]
41 del self.start_records[c]
42 del self.end_records[c]
43
44 dur = etime - stime
45 if self.validator(dur):
46 self.max = max(self.max, dur)
41 self.avg *= float(self.num / (self.num + 1)) 47 self.avg *= float(self.num / (self.num + 1))
42 self.num += 1 48 self.num += 1
43 self.avg += dur / float(self.num) 49 self.avg += dur / float(self.num)
44 50
45 self.begin = 0 51 # Give up on some jobs if they've been hanging around too long.
46 self.next_job = 0 52 # While not strictly needed, it helps improve performance and
47 else: 53 # it is unlikey to cause too much trouble.
48 self.disjoints += 1 54 if(len(self.start_records) > self.max_pending):
55 to_discard = len(self.start_records) - self.max_pending
56 for i in range(to_discard):
57 # pop off the oldest jobs
58 del self.start_records[self.start_records.iterkeys().next()]
59 self.discarded += to_discard
60 if(len(self.end_records) > self.max_pending):
61 to_discard = len(self.end_records) - self.max_pending
62 for i in range(to_discard):
63 # pop off the oldest jobs
64 del self.end_records[self.end_records.iterkeys().next()]
65 self.discarded += to_discard
66
67 def end_time(self, record, time):
68 '''End duration of time.'''
69 if len(self.end_delay_buffer) == self.delay_buffer_size:
70 to_queue = self.end_delay_buffer.pop(0)
71 self.end_records[to_queue[0].job] = to_queue
72 self.end_delay_buffer.append((record, time))
73 self.process_completed()
49 74
50 def start_time(self, record, time = None): 75 def start_time(self, record, time):
51 '''Start duration of time.''' 76 '''Start duration of time.'''
52 if self.last_record: 77 if len(self.start_delay_buffer) == self.delay_buffer_size:
53 if not time: 78 to_queue = self.start_delay_buffer.pop(0)
54 self.begin = self.last_record.when 79 self.start_records[to_queue[0].job] = to_queue
55 else: 80 self.start_delay_buffer.append((record, time))
56 self.begin = time 81 self.process_completed()
57
58 self.next_job = record.job
59 82
60# Data stored for each task 83# Data stored for each task
61TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu']) 84TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu'])
@@ -152,7 +175,12 @@ def make_iterator(fname):
152 175
153def read_data(task_dict, graph_dict, fnames): 176def read_data(task_dict, graph_dict, fnames):
154 '''Read records from @fnames and store per-pid stats in @task_dict.''' 177 '''Read records from @fnames and store per-pid stats in @task_dict.'''
155 buff = [] 178 q = []
179
180 # Number of trace records to buffer from each stream/file.
181 # A sorted window is maintained in order to deal with
182 # events that were recorded out-of-order.
183 window_size = 500
156 184
157 def get_time(record): 185 def get_time(record):
158 return record.when if hasattr(record, 'when') else 0 186 return record.when if hasattr(record, 'when') else 0
@@ -164,19 +192,18 @@ def read_data(task_dict, graph_dict, fnames):
164 except StopIteration: 192 except StopIteration:
165 return 193 return
166 194
167 i = 0 195 sort_key = (get_time(arecord), arecord.job, arecord.pid)
168 for (i, (brecord, _)) in enumerate(buff): 196 heappush(q, (sort_key, arecord, itera))
169 if get_time(brecord) > get_time(arecord):
170 break
171 buff.insert(i, (arecord, itera))
172 197
173 for fname in fnames: 198 for fname in fnames:
174 itera = make_iterator(fname) 199 itera = make_iterator(fname)
175 add_record(itera) 200 for w in range(window_size):
176 201 add_record(itera)
177 while buff:
178 record, itera = buff.pop(0)
179 202
203 sys_released = False
204 while q:
205 sort_key, record, itera = heappop(q)
206 # fetch another record
180 add_record(itera) 207 add_record(itera)
181 record.process(task_dict) 208 record.process(task_dict)
182 record.process_pgm(task_dict, graph_dict) 209 record.process_pgm(task_dict, graph_dict)
@@ -205,7 +232,7 @@ class ParamRecord(SchedRecord):
205 task_dict[self.pid].params = params 232 task_dict[self.pid].params = params
206 233
207class ReleaseRecord(SchedRecord): 234class ReleaseRecord(SchedRecord):
208 # renames the 'release' field to 'when' 235 # renames the 'release' field to 'when' to enable sorting
209 FIELDS = [('when', c_uint64), ('deadline', c_uint64)] 236 FIELDS = [('when', c_uint64), ('deadline', c_uint64)]
210 237
211 def process(self, task_dict): 238 def process(self, task_dict):
@@ -215,18 +242,15 @@ class ReleaseRecord(SchedRecord):
215 data.misses.start_time(self, self.deadline) 242 data.misses.start_time(self, self.deadline)
216 data.lateness.start_time(self, self.deadline) 243 data.lateness.start_time(self, self.deadline)
217 244
218 print ' release %d: r=%d d=%d' % (self.pid, self.when, self.deadline)
219
220 def process_pgm(self, task_dict, graph_dict): 245 def process_pgm(self, task_dict, graph_dict):
221 data = task_dict[self.pid] 246 data = task_dict[self.pid]
222 data.pgm_response.start_time(self, self.when) 247 data.pgm_response.start_time(self, self.when)
223 data.pgm_misses.start_time(self, self.deadline) 248 data.pgm_misses.start_time(self, self.deadline)
224 data.pgm_lateness.start_time(self, self.deadline) 249 data.pgm_lateness.start_time(self, self.deadline)
225 250
226 return
227 ntype = task_dict[self.pid].pgm_params.node_type 251 ntype = task_dict[self.pid].pgm_params.node_type
228 if ntype == PGM_SRC or ntype == PGM_SRC_SINK: 252 if ntype == PGM_SRC or ntype == PGM_SRC_SINK:
229 gid = task_dict[self.pid].pgm_params.graph_pid 253 gid = task_dict[self.pid].pgm_params.gid
230 gdata = graph_dict[gid] 254 gdata = graph_dict[gid]
231 gdata.jobs += 1 255 gdata.jobs += 1
232 gdata.response.start_time(self, self.when) 256 gdata.response.start_time(self, self.when)
@@ -236,34 +260,34 @@ class CompletionRecord(SchedRecord):
236 260
237 def process(self, task_dict): 261 def process(self, task_dict):
238 data = task_dict[self.pid] 262 data = task_dict[self.pid]
239 data.response.store_time(self) 263 data.response.end_time(self, self.when)
240 data.misses.store_time(self) 264 data.misses.end_time(self, self.when)
241 data.lateness.store_time(self) 265 data.lateness.end_time(self, self.when)
242 266
243 def process_pgm(self, task_dict, graph_dict): 267 def process_pgm(self, task_dict, graph_dict):
244 data = task_dict[self.pid] 268 data = task_dict[self.pid]
245 data.pgm_response.store_time(self) 269 data.pgm_response.end_time(self, self.when)
246 data.pgm_misses.store_time(self) 270 data.pgm_misses.end_time(self, self.when)
247 data.pgm_lateness.store_time(self) 271 data.pgm_lateness.end_time(self, self.when)
248 272
249 return 273 if data.pgm_params:
250 ntype = data.pgm_params.node_type 274 ntype = data.pgm_params.node_type
251 if ntype == PGM_SINK or ntype == PGM_SRC_SINK: 275 if ntype == PGM_SINK or ntype == PGM_SRC_SINK:
252 gid = data.pgm_params.graph_pid 276 gid = data.pgm_params.gid
253 gdata = graph_dict[gid] 277 gdata = graph_dict[gid]
254 gdata.response.store_time(self) 278 gdata.response.end_time(self, self.when)
255 279
256class BlockRecord(SchedRecord): 280class BlockRecord(SchedRecord):
257 FIELDS = [('when', c_uint64)] 281 FIELDS = [('when', c_uint64)]
258 282
259 def process(self, task_dict): 283 def process(self, task_dict):
260 task_dict[self.pid].blocks.start_time(self) 284 task_dict[self.pid].blocks.start_time(self, self.when)
261 285
262class ResumeRecord(SchedRecord): 286class ResumeRecord(SchedRecord):
263 FIELDS = [('when', c_uint64)] 287 FIELDS = [('when', c_uint64)]
264 288
265 def process(self, task_dict): 289 def process(self, task_dict):
266 task_dict[self.pid].blocks.store_time(self) 290 task_dict[self.pid].blocks.end_time(self, self.when)
267 291
268class SysReleaseRecord(SchedRecord): 292class SysReleaseRecord(SchedRecord):
269 FIELDS = [('when', c_uint64), ('release', c_uint64)] 293 FIELDS = [('when', c_uint64), ('release', c_uint64)]
@@ -278,12 +302,9 @@ class PgmParamRecord(SchedRecord):
278 pass 302 pass
279 303
280 def process_pgm(self, task_dict, graph_dict): 304 def process_pgm(self, task_dict, graph_dict):
281
282 pgm_params = PgmTaskParams(self.node_type, self.graph_pid) 305 pgm_params = PgmTaskParams(self.node_type, self.graph_pid)
283 task_dict[self.pid].pgm_params = pgm_params 306 task_dict[self.pid].pgm_params = pgm_params
284 307
285 print '%d: graph id = %d, node type = %d' % (self.pid, self.graph_pid, self.node_type)
286
287 if self.node_type == PGM_SRC or self.node_type == PGM_SINK or self.node_type == PGM_SRC_SINK: 308 if self.node_type == PGM_SRC or self.node_type == PGM_SINK or self.node_type == PGM_SRC_SINK:
288 graph_data = graph_dict[self.graph_pid] 309 graph_data = graph_dict[self.graph_pid]
289 if not graph_data.params: 310 if not graph_data.params:
@@ -313,15 +334,12 @@ class PgmReleaseRecord(SchedRecord):
313 data.pgm_misses.start_time(self, self.deadline) 334 data.pgm_misses.start_time(self, self.deadline)
314 data.pgm_lateness.start_time(self, self.deadline) 335 data.pgm_lateness.start_time(self, self.deadline)
315 336
316 print 'pgm_release %d: r=%d d=%d' % (self.pid, self.when, self.deadline) 337 if data.pgm_params:
317 338 ntype = data.pgm_params.node_type
318 return 339 if ntype == PGM_SRC or ntype == PGM_SRC_SINK:
319 340 gid = data.pgm_params.graph_pid
320 ntype = data.pgm_params.node_type 341 gdata = graph_dict[gid]
321 if ntype == PGM_SRC or ntype == PGM_SRC_SINK: 342 gdata.response.start_time(self, self.when)
322 gid = data.pgm_params.graph_pid
323 gdata = graph_dict[gid]
324 gdata.response.start_time(self, self.when)
325 343
326# Map records to sched_trace ids (see include/litmus/sched_trace.h 344# Map records to sched_trace ids (see include/litmus/sched_trace.h
327register_record(2, ParamRecord) 345register_record(2, ParamRecord)
@@ -339,11 +357,11 @@ def create_trace_dict(data_dir, work_dir = None):
339 output_file = "%s/out-st" % work_dir 357 output_file = "%s/out-st" % work_dir
340 358
341 task_dict = defaultdict(lambda : 359 task_dict = defaultdict(lambda :
342 TaskData(None, None, 1, TimeTracker(), 360 TaskData(None, None, 1, TimeTracker(is_valid_duration = lambda x: x > 0),
343 TimeTracker(), TimeTracker(True), TimeTracker(), 361 TimeTracker(), TimeTracker(), TimeTracker(is_valid_duration = lambda x: x > 0),
344 TimeTracker(), TimeTracker(True), TimeTracker())) 362 TimeTracker(), TimeTracker(), TimeTracker(is_valid_duration = lambda x: x > 0)))
345 graph_dict = defaultdict(lambda: 363 graph_dict = defaultdict(lambda:
346 GraphData(None, 1, TimeTracker())) 364 GraphData(None, 1, TimeTracker(is_valid_duration = lambda x: x > 0)))
347 365
348 bin_names = [f for f in os.listdir(data_dir) if re.match(bin_files, f)] 366 bin_names = [f for f in os.listdir(data_dir) if re.match(bin_files, f)]
349 if not len(bin_names): 367 if not len(bin_names):
@@ -375,7 +393,7 @@ def extract_sched_data(result, data_dir, work_dir):
375 gstat_data = defaultdict(list) 393 gstat_data = defaultdict(list)
376 394
377 # Group per-task values 395 # Group per-task values
378 for tdata in task_dict.itervalues(): 396 for task, tdata in task_dict.iteritems():
379 if not tdata.params: 397 if not tdata.params:
380 # Currently unknown where these invalid tasks come from... 398 # Currently unknown where these invalid tasks come from...
381 continue 399 continue
@@ -387,7 +405,7 @@ def extract_sched_data(result, data_dir, work_dir):
387 pgm_lateness = tdata.pgm_lateness 405 pgm_lateness = tdata.pgm_lateness
388 pgm_miss = tdata.pgm_misses 406 pgm_miss = tdata.pgm_misses
389 407
390 record_loss = float(miss.disjoints)/(miss.matches + miss.disjoints) 408 record_loss = float(miss.disjoints())/(miss.matches + miss.disjoints())
391 stat_data["record-loss"].append(record_loss) 409 stat_data["record-loss"].append(record_loss)
392 410
393 if record_loss > conf.MAX_RECORD_LOSS: 411 if record_loss > conf.MAX_RECORD_LOSS:
@@ -404,57 +422,59 @@ def extract_sched_data(result, data_dir, work_dir):
404 # start with basic task information 422 # start with basic task information
405 stat_data["miss-ratio" ].append(miss_ratio) 423 stat_data["miss-ratio" ].append(miss_ratio)
406 424
407 stat_data["max-response"].append(response.max) 425 stat_data["max-response"].append(float(response.max)/NSEC_PER_MSEC)
408 stat_data["avg-response"].append(response.avg) 426 stat_data["avg-response"].append(response.avg/NSEC_PER_MSEC)
409 stat_data["max-response-prop"].append(response.max / tdata.params.period) 427 stat_data["max-response-prop"].append(float(response.max) / tdata.params.period)
410 stat_data["avg-response-prop"].append(response.avg / tdata.params.period) 428 stat_data["avg-response-prop"].append(response.avg / tdata.params.period)
411 429
412 stat_data["max-tard"].append(miss.max) 430 stat_data["max-tard"].append(float(miss.max)/NSEC_PER_MSEC)
413 stat_data["avg-tard"].append(avg_tard) 431 stat_data["avg-tard"].append(avg_tard/NSEC_PER_MSEC)
414 stat_data["max-tard-prop"].append(miss.max / tdata.params.period) 432 stat_data["max-tard-prop"].append(float(miss.max) / tdata.params.period)
415 stat_data["avg-tard-prop"].append(avg_tard / tdata.params.period) 433 stat_data["avg-tard-prop"].append(avg_tard / tdata.params.period)
416 434
417 stat_data["max-response"].append(lateness.max) 435 stat_data["max-response"].append(float(lateness.max)/NSEC_PER_MSEC)
418 stat_data["avg-response"].append(lateness.avg) 436 stat_data["avg-response"].append(lateness.avg/NSEC_PER_MSEC)
419 stat_data["max-response-prop"].append(lateness.max / tdata.params.period) 437 stat_data["max-response-prop"].append(float(lateness.max) / tdata.params.period)
420 stat_data["avg-response-prop"].append(lateness.avg / tdata.params.period) 438 stat_data["avg-response-prop"].append(lateness.avg / tdata.params.period)
421 439
422 # same data, but with PGM-adjusted release times (shifted deadlines) 440 # same data, but with PGM-adjusted release times (shifted deadlines)
423 stat_data["pgm-miss-ratio" ].append(pgm_miss_ratio) 441 stat_data["pgm-miss-ratio" ].append(pgm_miss_ratio)
424 442
425 stat_data["pgm-max-response"].append(pgm_response.max) 443 stat_data["pgm-max-response"].append(float(pgm_response.max)/NSEC_PER_MSEC)
426 stat_data["pgm-avg-response"].append(pgm_response.avg) 444 stat_data["pgm-avg-response"].append(pgm_response.avg/NSEC_PER_MSEC)
427 stat_data["pgm-max-response-prop"].append(pgm_response.max / tdata.params.period) 445 stat_data["pgm-max-response-prop"].append(float(pgm_response.max) / tdata.params.period)
428 stat_data["pgm-avg-response-prop"].append(pgm_response.avg / tdata.params.period) 446 stat_data["pgm-avg-response-prop"].append(pgm_response.avg / tdata.params.period)
429 447
430 stat_data["pgm-max-tard"].append(pgm_miss.max) 448 stat_data["pgm-max-tard"].append(float(pgm_miss.max)/NSEC_PER_MSEC)
431 stat_data["pgm-avg-tard"].append(pgm_avg_tard) 449 stat_data["pgm-avg-tard"].append(pgm_avg_tard/NSEC_PER_MSEC)
432 stat_data["pgm-max-tard-prop"].append(pgm_miss.max / tdata.params.period) 450 stat_data["pgm-max-tard-prop"].append(float(pgm_miss.max) / tdata.params.period)
433 stat_data["pgm-avg-tard-prop"].append(pgm_avg_tard / tdata.params.period) 451 stat_data["pgm-avg-tard-prop"].append(pgm_avg_tard / tdata.params.period)
434 452
435 stat_data["pgm-max-response"].append(pgm_lateness.max) 453 stat_data["pgm-max-response"].append(float(pgm_lateness.max)/NSEC_PER_MSEC)
436 stat_data["pgm-avg-response"].append(pgm_lateness.avg) 454 stat_data["pgm-avg-response"].append(pgm_lateness.avg/NSEC_PER_MSEC)
437 stat_data["pgm-max-response-prop"].append(pgm_lateness.max / tdata.params.period) 455 stat_data["pgm-max-response-prop"].append(float(pgm_lateness.max) / tdata.params.period)
438 stat_data["pgm-avg-response-prop"].append(pgm_lateness.avg / tdata.params.period) 456 stat_data["pgm-avg-response-prop"].append(pgm_lateness.avg / tdata.params.period)
439 457
440 for gdata in graph_dict.itervalues(): 458 for gid, gdata in graph_dict.iteritems():
441 if not gdata.params: 459 if not gdata.params:
442 continue 460 continue
443
444 response = gdata.response 461 response = gdata.response
445 record_loss = float(response.disjoints)/(response.matches + response.disjoints) 462 if response.matches + response.disjoints() == 0:
463 record_loss = 0
464 else:
465 record_loss = float(response.disjoints())/(response.matches + response.disjoints())
446 gstat_data["graph-record-loss"].append(record_loss) 466 gstat_data["graph-record-loss"].append(record_loss)
447 467
448 if record_los > conf.MAX_RECORD_LOSS: 468 if record_loss > conf.MAX_RECORD_LOSS:
449 log_once(LOSS_MSG) 469 log_once(LOSS_MSG)
450 continue 470 continue
451 471
452 gstat_data["graph-max-response"].append(response.max) 472 gstat_data["graph-max-response"].append(float(response.max)/NSEC_PER_MSEC)
453 gstat_data["graph-avg-response"].append(response.avg) 473 gstat_data["graph-avg-response"].append(response.avg/NSEC_PER_MSEC)
454 474
455 # Summarize value groups 475 # Summarize value groups
456 for name, data in stat_data.iteritems(): 476 for name, data in stat_data.iteritems():
457 if not data or not sum(data): 477 if not data:
458 log_once(SKIP_MSG, SKIP_MSG % name) 478 log_once(SKIP_MSG, SKIP_MSG % name)
459 continue 479 continue
460 result[name] = Measurement(str(name)).from_array(data) 480 result[name] = Measurement(str(name)).from_array(data)