aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJonathan Herman <hermanjl@cs.unc.edu>2013-04-23 14:01:35 -0400
committerJonathan Herman <hermanjl@cs.unc.edu>2013-04-23 14:01:35 -0400
commit7545402506aa76261e18d85af585ff0ac1cf05c1 (patch)
tree6b5a6d2e819c10311f3b4cdc94174877bdfcfbde
parent25ccdb0cbc6b959b1f96c89b8bce91963cb67b4c (diff)
Improved accuracy of sched_trace measurement parsing.
* Measurements from tasks missing > 20% of their scheduling records are ignored. This is configurable in config/config.py. * Measurements which only have zero values are ignored. * If either of these 2 situations are encountered print out a message the first time using the common.log_once() method. See parse_exps.py for how this is used with multiple threads. * Measurements from a task's last job are ignored. * Miss ratio is calculated only as a fraction of the number of jobs whose matching release and completion records were found, not just release.
-rw-r--r--common.py17
-rw-r--r--config/config.py4
-rw-r--r--parse/point.py4
-rw-r--r--parse/sched.py84
-rw-r--r--parse/tuple_table.py4
-rwxr-xr-xparse_exps.py159
6 files changed, 186 insertions, 86 deletions
diff --git a/common.py b/common.py
index a2c6224..7abf0f2 100644
--- a/common.py
+++ b/common.py
@@ -193,3 +193,20 @@ def is_device(dev):
193 return False 193 return False
194 mode = os.stat(dev)[stat.ST_MODE] 194 mode = os.stat(dev)[stat.ST_MODE]
195 return not (not mode & stat.S_IFCHR) 195 return not (not mode & stat.S_IFCHR)
196
197__logged = []
198
199def set_logged_list(logged):
200 global __logged
201 __logged = logged
202
203def log_once(id, msg = None, indent = True):
204 global __logged
205
206 msg = msg if msg else id
207
208 if id not in __logged:
209 __logged += [id]
210 if indent:
211 msg = ' ' + msg.strip('\t').replace('\n', '\n\t')
212 sys.stderr.write('\n' + msg.strip('\n') + '\n')
diff --git a/config/config.py b/config/config.py
index b631aa2..5e6f9e3 100644
--- a/config/config.py
+++ b/config/config.py
@@ -56,3 +56,7 @@ OVH_ALL_EVENTS = ["%s_%s" % (e, t) for (e,t) in
56OVH_ALL_EVENTS += ['RELEASE_LATENCY'] 56OVH_ALL_EVENTS += ['RELEASE_LATENCY']
57# This event doesn't have a START and END 57# This event doesn't have a START and END
58OVH_BASE_EVENTS += ['RELEASE_LATENCY'] 58OVH_BASE_EVENTS += ['RELEASE_LATENCY']
59
60# If a task is missing more than this many records, its measurements
61# are not included in sched_trace summaries
62MAX_RECORD_LOSS = .2
diff --git a/parse/point.py b/parse/point.py
index ac47c70..b1d9d53 100644
--- a/parse/point.py
+++ b/parse/point.py
@@ -133,6 +133,10 @@ class ExpPoint(object):
133 def get_stats(self): 133 def get_stats(self):
134 return self.stats.keys() 134 return self.stats.keys()
135 135
136 def __bool__(self):
137 return bool(self.stats)
138 __nonzero__ = __bool__
139
136 140
137class SummaryPoint(ExpPoint): 141class SummaryPoint(ExpPoint):
138 def __init__(self, id="", points=[], typemap = default_typemap): 142 def __init__(self, id="", points=[], typemap = default_typemap):
diff --git a/parse/sched.py b/parse/sched.py
index b56324b..4933037 100644
--- a/parse/sched.py
+++ b/parse/sched.py
@@ -5,35 +5,55 @@ import struct
5import subprocess 5import subprocess
6 6
7from collections import defaultdict,namedtuple 7from collections import defaultdict,namedtuple
8from common import recordtype 8from common import recordtype,log_once
9from point import Measurement 9from point import Measurement
10from ctypes import * 10from ctypes import *
11 11
12class TimeTracker: 12class TimeTracker:
13 '''Store stats for durations of time demarcated by sched_trace records.''' 13 '''Store stats for durations of time demarcated by sched_trace records.'''
14 def __init__(self): 14 def __init__(self):
15 self.begin = self.avg = self.max = self.num = self.job = 0 15 self.begin = self.avg = self.max = self.num = self.next_job = 0
16 16
17 def store_time(self, record): 17 # Count of times the job in start_time matched that in store_time
18 self.matches = 0
19 # And the times it didn't
20 self.disjoints = 0
21
22 # Measurements are recorded in store_ time using the previous matching
23 # record which was passed to store_time. This way, the last record for
24 # any task is always skipped
25 self.last_record = None
26
27 def store_time(self, next_record):
18 '''End duration of time.''' 28 '''End duration of time.'''
19 dur = record.when - self.begin 29 dur = (self.last_record.when - self.begin) if self.last_record else -1
20 30
21 if self.job == record.job and dur > 0: 31 if self.next_job == next_record.job:
22 self.max = max(self.max, dur) 32 self.last_record = next_record
23 self.avg *= float(self.num / (self.num + 1))
24 self.num += 1
25 self.avg += dur / float(self.num)
26 33
27 self.begin = 0 34 if self.last_record:
28 self.job = 0 35 self.matches += 1
36
37 if dur > 0:
38 self.max = max(self.max, dur)
39 self.avg *= float(self.num / (self.num + 1))
40 self.num += 1
41 self.avg += dur / float(self.num)
42
43 self.begin = 0
44 self.next_job = 0
45 else:
46 self.disjoints += 1
29 47
30 def start_time(self, record, time = None): 48 def start_time(self, record, time = None):
31 '''Start duration of time.''' 49 '''Start duration of time.'''
32 if not time: 50 if self.last_record:
33 self.begin = record.when 51 if not time:
34 else: 52 self.begin = self.last_record.when
35 self.begin = time 53 else:
36 self.job = record.job 54 self.begin = time
55
56 self.next_job = record.job
37 57
38# Data stored for each task 58# Data stored for each task
39TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu']) 59TaskParams = namedtuple('TaskParams', ['wcet', 'period', 'cpu'])
@@ -203,6 +223,12 @@ def create_task_dict(data_dir, work_dir = None):
203 223
204 return task_dict 224 return task_dict
205 225
226LOSS_MSG = """Found task missing more than %d%% of its scheduling records.
227These won't be included in scheduling statistics!"""%(100*conf.MAX_RECORD_LOSS)
228SKIP_MSG = """Measurement '%s' has no non-zero values.
229Measurements like these are not included in scheduling statistics.
230If a measurement is missing, this is why."""
231
206def extract_sched_data(result, data_dir, work_dir): 232def extract_sched_data(result, data_dir, work_dir):
207 task_dict = create_task_dict(data_dir, work_dir) 233 task_dict = create_task_dict(data_dir, work_dir)
208 stat_data = defaultdict(list) 234 stat_data = defaultdict(list)
@@ -213,19 +239,29 @@ def extract_sched_data(result, data_dir, work_dir):
213 # Currently unknown where these invalid tasks come from... 239 # Currently unknown where these invalid tasks come from...
214 continue 240 continue
215 241
216 miss_ratio = float(tdata.misses.num) / tdata.jobs 242 miss = tdata.misses
217 stat_data["miss-ratio"].append(float(tdata.misses.num) / tdata.jobs) 243
244 record_loss = float(miss.disjoints)/(miss.matches + miss.disjoints)
245 stat_data["record-loss"].append(record_loss)
246
247 if record_loss > conf.MAX_RECORD_LOSS:
248 log_once(LOSS_MSG)
249 continue
250
251 miss_ratio = float(miss.num) / miss.matches
252 avg_tard = miss.avg * miss_ratio
253
254 stat_data["miss-ratio" ].append(miss_ratio)
218 255
219 stat_data["max-tard" ].append(tdata.misses.max / tdata.params.wcet) 256 stat_data["max-tard"].append(miss.max / tdata.params.period)
220 # Scale average down to account for jobs with 0 tardiness 257 stat_data["avg-tard"].append(avg_tard / tdata.params.period)
221 avg_tard = tdata.misses.avg * miss_ratio
222 stat_data["avg-tard" ].append(avg_tard / tdata.params.wcet)
223 258
224 stat_data["avg-block" ].append(tdata.blocks.avg / NSEC_PER_MSEC) 259 stat_data["avg-block"].append(tdata.blocks.avg / NSEC_PER_MSEC)
225 stat_data["max-block" ].append(tdata.blocks.max / NSEC_PER_MSEC) 260 stat_data["max-block"].append(tdata.blocks.max / NSEC_PER_MSEC)
226 261
227 # Summarize value groups 262 # Summarize value groups
228 for name, data in stat_data.iteritems(): 263 for name, data in stat_data.iteritems():
229 if not data or not sum(data): 264 if not data or not sum(data):
265 log_once(SKIP_MSG, SKIP_MSG % name)
230 continue 266 continue
231 result[name] = Measurement(str(name)).from_array(data) 267 result[name] = Measurement(str(name)).from_array(data)
diff --git a/parse/tuple_table.py b/parse/tuple_table.py
index 47fb6b6..320d9dd 100644
--- a/parse/tuple_table.py
+++ b/parse/tuple_table.py
@@ -13,6 +13,10 @@ class TupleTable(object):
13 def get_col_map(self): 13 def get_col_map(self):
14 return self.col_map 14 return self.col_map
15 15
16 def __bool__(self):
17 return bool(self.table)
18 __nonzero__ = __bool__
19
16 def __getitem__(self, kv): 20 def __getitem__(self, kv):
17 key = self.col_map.get_key(kv) 21 key = self.col_map.get_key(kv)
18 return self.table[key] 22 return self.table[key]
diff --git a/parse_exps.py b/parse_exps.py
index c2cbedb..cc4372a 100755
--- a/parse_exps.py
+++ b/parse_exps.py
@@ -1,6 +1,8 @@
1#!/usr/bin/env python 1#!/usr/bin/env python
2from __future__ import print_function 2from __future__ import print_function
3 3
4import common as com
5import multiprocessing
4import os 6import os
5import parse.ft as ft 7import parse.ft as ft
6import parse.sched as st 8import parse.sched as st
@@ -10,13 +12,12 @@ import sys
10import traceback 12import traceback
11 13
12from collections import namedtuple 14from collections import namedtuple
13from common import load_params
14from config.config import DEFAULTS,PARAMS 15from config.config import DEFAULTS,PARAMS
15from optparse import OptionParser 16from optparse import OptionParser
16from parse.point import ExpPoint 17from parse.point import ExpPoint
17from parse.tuple_table import TupleTable 18from parse.tuple_table import TupleTable
18from parse.col_map import ColMapBuilder 19from parse.col_map import ColMapBuilder
19from multiprocessing import Pool, cpu_count 20
20 21
21def parse_args(): 22def parse_args():
22 parser = OptionParser("usage: %prog [options] [data_dir]...") 23 parser = OptionParser("usage: %prog [options] [data_dir]...")
@@ -33,18 +34,60 @@ def parse_args():
33 parser.add_option('-m', '--write-map', action='store_true', default=False, 34 parser.add_option('-m', '--write-map', action='store_true', default=False,
34 dest='write_map', 35 dest='write_map',
35 help='Output map of values instead of csv tree') 36 help='Output map of values instead of csv tree')
36 parser.add_option('-p', '--processors', default=max(cpu_count() - 1, 1), 37 parser.add_option('-p', '--processors',
38 default=max(multiprocessing.cpu_count() - 1, 1),
37 type='int', dest='processors', 39 type='int', dest='processors',
38 help='number of threads for processing') 40 help='number of threads for processing')
39 41
40 return parser.parse_args() 42 return parser.parse_args()
41 43
44
42ExpData = namedtuple('ExpData', ['path', 'params', 'work_dir']) 45ExpData = namedtuple('ExpData', ['path', 'params', 'work_dir'])
43 46
47
48def parse_exp(exp_force):
49 # Tupled for multiprocessing
50 exp, force = exp_force
51
52 result_file = exp.work_dir + "/exp_point.pkl"
53 should_load = not force and os.path.exists(result_file)
54
55 result = None
56 if should_load:
57 with open(result_file, 'rb') as f:
58 try:
59 # No need to go through this work twice
60 result = pickle.load(f)
61 except:
62 pass
63
64 if not result:
65 try:
66 # Create a readable name
67 name = os.path.relpath(exp.path)
68 name = name if name != "." else os.path.split(os.getcwd())[1]
69
70 result = ExpPoint(name)
71
72 # Write overheads into result
73 cycles = exp.params[PARAMS['cycles']]
74 ft.extract_ft_data(result, exp.path, exp.work_dir, cycles)
75
76 # Write scheduling statistics into result
77 st.extract_sched_data(result, exp.path, exp.work_dir)
78
79 with open(result_file, 'wb') as f:
80 pickle.dump(result, f)
81 except:
82 traceback.print_exc()
83
84 return (exp, result)
85
86
44def get_exp_params(data_dir, cm_builder): 87def get_exp_params(data_dir, cm_builder):
45 param_file = "%s/%s" % (data_dir, DEFAULTS['params_file']) 88 param_file = "%s/%s" % (data_dir, DEFAULTS['params_file'])
46 if os.path.isfile(param_file): 89 if os.path.isfile(param_file):
47 params = load_params(param_file) 90 params = com.load_params(param_file)
48 91
49 # Store parameters in cm_builder, which will track which parameters change 92 # Store parameters in cm_builder, which will track which parameters change
50 # across experiments 93 # across experiments
@@ -83,41 +126,8 @@ def load_exps(exp_dirs, cm_builder, force):
83 126
84 return exps 127 return exps
85 128
86def parse_exp(exp_force):
87 # Tupled for multiprocessing
88 exp, force = exp_force
89
90 result_file = exp.work_dir + "/exp_point.pkl"
91 should_load = not force and os.path.exists(result_file)
92
93 result = None
94 if should_load:
95 with open(result_file, 'rb') as f:
96 try:
97 # No need to go through this work twice
98 result = pickle.load(f)
99 except:
100 pass
101 129
102 if not result: 130def get_dirs(args):
103 try:
104 result = ExpPoint(exp.path)
105 cycles = exp.params[PARAMS['cycles']]
106
107 # Write overheads into result
108 ft.extract_ft_data(result, exp.path, exp.work_dir, cycles)
109
110 # Write scheduling statistics into result
111 st.extract_sched_data(result, exp.path, exp.work_dir)
112
113 with open(result_file, 'wb') as f:
114 pickle.dump(result, f)
115 except:
116 traceback.print_exc()
117
118 return (exp, result)
119
120def get_exps(args):
121 if args: 131 if args:
122 return args 132 return args
123 elif os.path.exists(DEFAULTS['out-run']): 133 elif os.path.exists(DEFAULTS['out-run']):
@@ -128,38 +138,32 @@ def get_exps(args):
128 sys.stderr.write("Reading data from current directory.\n") 138 sys.stderr.write("Reading data from current directory.\n")
129 return [os.getcwd()] 139 return [os.getcwd()]
130 140
131def main():
132 opts, args = parse_args()
133 exp_dirs = get_exps(args)
134
135 # Load exp parameters into a ColMap
136 builder = ColMapBuilder()
137 exps = load_exps(exp_dirs, builder, opts.force)
138 141
139 # Don't track changes in ignored parameters 142def fill_table(table, exps, opts):
140 if opts.ignore: 143 sys.stderr.write("Parsing data...\n")
141 for param in opts.ignore.split(","):
142 builder.try_remove(param)
143 builder.try_remove(PARAMS['trial']) # Always average multiple trials
144 builder.try_remove(PARAMS['cycles']) # Only need for feather-trace parsing
145 144
146 col_map = builder.build() 145 procs = min(len(exps), opts.processors)
147 result_table = TupleTable(col_map) 146 logged = multiprocessing.Manager().list()
148 147
149 sys.stderr.write("Parsing data...\n") 148 pool = multiprocessing.Pool(processes=procs,
149 # Share a list of previously logged messages amongst processes
150 # This is for the com.log_once method to use
151 initializer=com.set_logged_list, initargs=(logged,))
150 152
151 procs = min(len(exps), opts.processors)
152 pool = Pool(processes=procs)
153 pool_args = zip(exps, [opts.force]*len(exps)) 153 pool_args = zip(exps, [opts.force]*len(exps))
154 enum = pool.imap_unordered(parse_exp, pool_args, 1) 154 enum = pool.imap_unordered(parse_exp, pool_args, 1)
155 155
156 try: 156 try:
157 for i, (exp, result) in enumerate(enum): 157 for i, (exp, result) in enumerate(enum):
158 if not result:
159 continue
160
158 if opts.verbose: 161 if opts.verbose:
159 print(result) 162 print(result)
160 else: 163 else:
161 sys.stderr.write('\r {0:.2%}'.format(float(i)/len(exps))) 164 sys.stderr.write('\r {0:.2%}'.format(float(i)/len(exps)))
162 result_table[exp.params] += [result] 165 table[exp.params] += [result]
166
163 pool.close() 167 pool.close()
164 except: 168 except:
165 pool.terminate() 169 pool.terminate()
@@ -170,16 +174,17 @@ def main():
170 174
171 sys.stderr.write('\n') 175 sys.stderr.write('\n')
172 176
173 if opts.force and os.path.exists(opts.out):
174 sh.rmtree(opts.out)
175 177
176 reduced_table = result_table.reduce() 178def write_output(table, opts):
179 reduced_table = table.reduce()
177 180
178 if opts.write_map: 181 if opts.write_map:
179 sys.stderr.write("Writing python map into %s...\n" % opts.out) 182 sys.stderr.write("Writing python map into %s...\n" % opts.out)
180 # Write summarized results into map
181 reduced_table.write_map(opts.out) 183 reduced_table.write_map(opts.out)
182 else: 184 else:
185 if opts.force and os.path.exists(opts.out):
186 sh.rmtree(opts.out)
187
183 # Write out csv directories for all variable params 188 # Write out csv directories for all variable params
184 dir_map = reduced_table.to_dir_map() 189 dir_map = reduced_table.to_dir_map()
185 190
@@ -188,12 +193,42 @@ def main():
188 if not opts.verbose: 193 if not opts.verbose:
189 sys.stderr.write("Too little data to make csv files, " + 194 sys.stderr.write("Too little data to make csv files, " +
190 "printing results.\n") 195 "printing results.\n")
191 for key, exp in result_table: 196 for key, exp in table:
192 for e in exp: 197 for e in exp:
193 print(e) 198 print(e)
194 else: 199 else:
195 sys.stderr.write("Writing csvs into %s...\n" % opts.out) 200 sys.stderr.write("Writing csvs into %s...\n" % opts.out)
196 dir_map.write(opts.out) 201 dir_map.write(opts.out)
197 202
203
204def main():
205 opts, args = parse_args()
206 exp_dirs = get_dirs(args)
207
208 # Load experiment parameters into a ColMap
209 builder = ColMapBuilder()
210 exps = load_exps(exp_dirs, builder, opts.force)
211
212 # Don't track changes in ignored parameters
213 if opts.ignore:
214 for param in opts.ignore.split(","):
215 builder.try_remove(param)
216
217 # Always average multiple trials
218 builder.try_remove(PARAMS['trial'])
219 # Only need this for feather-trace parsing
220 builder.try_remove(PARAMS['cycles'])
221
222 col_map = builder.build()
223 table = TupleTable(col_map)
224
225 fill_table(table, exps, opts)
226
227 if not table:
228 sys.stderr.write("Found no data to parse!")
229 sys.exit(1)
230
231 write_output(table, opts)
232
198if __name__ == '__main__': 233if __name__ == '__main__':
199 main() 234 main()