aboutsummaryrefslogtreecommitdiffstats
path: root/run/experiment.py
diff options
context:
space:
mode:
authorJonathan Herman <hermanjl@cs.unc.edu>2013-04-22 15:32:12 -0400
committerJonathan Herman <hermanjl@cs.unc.edu>2013-04-22 15:32:12 -0400
commit25ccdb0cbc6b959b1f96c89b8bce91963cb67b4c (patch)
treedcaeb7d85f3dcc0f2afbb53d11c512c71fb712ab /run/experiment.py
parentfbd1df6f63eb551b99f71330d2370c570ff323f5 (diff)
Improved robustness of run_exps.py execution.
Thanks to bcw and gelliott for debugging and ideas. * Print out experiment number and total experiments when starting experiments. * Only sleep and re-release tasks if tasks are waiting to release. * Fail experiment with verbose messages if any tasks fail before becoming ready to release. * When waiting for tasks to become ready for release, reset the waiting time whenever a new task (or task(s)) become ready. * Start regular tracers BEFORE the plugin switch to log data from the switch. * Check the number of running tasks AFTER trying to switch the linux scheduler. This gives plugin deactivate code the opportunity to kill these tasks. * If an invalid executable is specified in the schedule file, fail before attempting to run the experiment and print out the problem. * Propogate exceptions up from experiment failures instead of creating ExperimentFailed exceptions. This commit also made clock-frequency automatically ignored by parse_exps.py. The value of this would change by +- a Mhz between experiments, ruining graphs.
Diffstat (limited to 'run/experiment.py')
-rw-r--r--run/experiment.py185
1 files changed, 113 insertions, 72 deletions
diff --git a/run/experiment.py b/run/experiment.py
index ff0e9f3..b0e46b6 100644
--- a/run/experiment.py
+++ b/run/experiment.py
@@ -1,9 +1,7 @@
1import common as com
2import os 1import os
3import time 2import time
4import run.litmus_util as lu 3import run.litmus_util as lu
5import shutil as sh 4import shutil as sh
6import traceback
7from operator import methodcaller 5from operator import methodcaller
8 6
9class ExperimentException(Exception): 7class ExperimentException(Exception):
@@ -11,16 +9,11 @@ class ExperimentException(Exception):
11 def __init__(self, name): 9 def __init__(self, name):
12 self.name = name 10 self.name = name
13 11
14
15class ExperimentDone(ExperimentException): 12class ExperimentDone(ExperimentException):
16 '''Raised when an experiment looks like it's been run already.''' 13 '''Raised when an experiment looks like it's been run already.'''
17 def __str__(self): 14 def __str__(self):
18 return "Experiment finished already: %d" % self.name 15 return "Experiment finished already: %d" % self.name
19 16
20class ExperimentFailed(ExperimentException):
21 def __str__(self):
22 return "Experiment failed during execution: %d" % self.name
23
24class SystemCorrupted(Exception): 17class SystemCorrupted(Exception):
25 pass 18 pass
26 19
@@ -31,7 +24,6 @@ class Experiment(object):
31 def __init__(self, name, scheduler, working_dir, finished_dir, 24 def __init__(self, name, scheduler, working_dir, finished_dir,
32 proc_entries, executables, tracer_types): 25 proc_entries, executables, tracer_types):
33 '''Run an experiment, optionally wrapped in tracing.''' 26 '''Run an experiment, optionally wrapped in tracing.'''
34
35 self.name = name 27 self.name = name
36 self.scheduler = scheduler 28 self.scheduler = scheduler
37 self.working_dir = working_dir 29 self.working_dir = working_dir
@@ -40,16 +32,10 @@ class Experiment(object):
40 self.executables = executables 32 self.executables = executables
41 self.exec_out = None 33 self.exec_out = None
42 self.exec_err = None 34 self.exec_err = None
35 self.tracer_types = tracer_types
43 36
44 self.task_batch = com.num_cpus() 37 def __setup_tracers(self):
45 38 tracers = [ t(self.working_dir) for t in self.tracer_types ]
46 self.__make_dirs()
47 self.__assign_executable_cwds()
48 self.__setup_tracers(tracer_types)
49
50
51 def __setup_tracers(self, tracer_types):
52 tracers = [ t(self.working_dir) for t in tracer_types ]
53 39
54 self.regular_tracers = [t for t in tracers if not t.is_exact()] 40 self.regular_tracers = [t for t in tracers if not t.is_exact()]
55 self.exact_tracers = [t for t in tracers if t.is_exact()] 41 self.exact_tracers = [t for t in tracers if t.is_exact()]
@@ -84,13 +70,11 @@ class Experiment(object):
84 map(assign_cwd, self.executables) 70 map(assign_cwd, self.executables)
85 71
86 def __kill_all(self): 72 def __kill_all(self):
87 # Give time for whatever failed to finish failing 73 if lu.waiting_tasks():
88 time.sleep(2) 74 released = lu.release_tasks()
89 75 self.log("Re-released %d tasks" % released)
90 released = lu.release_tasks()
91 self.log("Re-released %d tasks" % released)
92 76
93 time.sleep(5) 77 time.sleep(1)
94 78
95 self.log("Killing all tasks") 79 self.log("Killing all tasks")
96 for e in self.executables: 80 for e in self.executables:
@@ -99,7 +83,62 @@ class Experiment(object):
99 except: 83 except:
100 pass 84 pass
101 85
102 time.sleep(2) 86 time.sleep(1)
87
88 def __strip_path(self, path):
89 '''Shorten path to something more readable.'''
90 file_dir = os.path.split(self.working_dir)[0]
91 if path.index(file_dir) == 0:
92 path = path[len(file_dir):]
93
94 return path.strip("/")
95
96 def __check_tasks_status(self):
97 '''Raises an exception if any tasks have failed.'''
98 msgs = []
99
100 for e in self.executables:
101 status = e.poll()
102 if status != None and status:
103 err_msg = "Task %s failed with status: %s" % (e.wait(), status)
104 msgs += [err_msg]
105
106 if msgs:
107 # Show at most 3 messages so that every task failing doesn't blow
108 # up the terminal
109 if len(msgs) > 3:
110 num_errs = len(msgs) - 3
111 msgs = msgs[0:4] + ["...%d more task errors..." % num_errs]
112
113 out_name = self.__strip_path(self.exec_out.name)
114 err_name = self.__strip_path(self.exec_err.name)
115 help = "Check dmesg, %s, and %s" % (out_name, err_name)
116
117 raise Exception("\n".join(msgs + [help]))
118
119 def __wait_for_ready(self):
120 self.log("Sleeping until tasks are ready for release...")
121
122 wait_start = time.time()
123 num_ready = lu.waiting_tasks()
124
125 while num_ready < len(self.executables):
126 # Quit if too much time passes without a task becoming ready
127 if time.time() - wait_start > 180.0:
128 s = "waiting: %d, submitted: %d" %\
129 (lu.waiting_tasks(), len(self.executables))
130 raise Exception("Too much time spent waiting for tasks! %s" % s)
131
132 time.sleep(1)
133
134 # Quit if any tasks fail
135 self.__check_tasks_status()
136
137 # Reset the waiting time whenever more tasks become ready
138 now_ready = lu.waiting_tasks()
139 if now_ready != num_ready:
140 wait_start = time.time()
141 num_ready = lu.now_ready
103 142
104 def __run_tasks(self): 143 def __run_tasks(self):
105 self.log("Starting %d tasks" % len(self.executables)) 144 self.log("Starting %d tasks" % len(self.executables))
@@ -108,16 +147,9 @@ class Experiment(object):
108 try: 147 try:
109 e.execute() 148 e.execute()
110 except: 149 except:
111 raise Exception("Executable failed: %s" % e) 150 raise Exception("Executable failed to start: %s" % e)
112 151
113 self.log("Sleeping until tasks are ready for release...") 152 self.__wait_for_ready()
114 start = time.time()
115 while lu.waiting_tasks() < len(self.executables):
116 if time.time() - start > 30.0:
117 s = "waiting: %d, submitted: %d" %\
118 (lu.waiting_tasks(), len(self.executables))
119 raise Exception("Too much time spent waiting for tasks! %s" % s)
120 time.sleep(1)
121 153
122 # Exact tracers (like overheads) must be started right after release or 154 # Exact tracers (like overheads) must be started right after release or
123 # measurements will be full of irrelevant records 155 # measurements will be full of irrelevant records
@@ -148,60 +180,40 @@ class Experiment(object):
148 def __save_results(self): 180 def __save_results(self):
149 os.rename(self.working_dir, self.finished_dir) 181 os.rename(self.working_dir, self.finished_dir)
150 182
151 def log(self, msg): 183 def __to_linux(self):
152 print("[Exp %s]: %s" % (self.name, msg)) 184 msgs = []
153
154 def run_exp(self):
155 self.__check_system()
156
157 succ = False
158 try:
159 self.__setup()
160 185
186 sched = lu.scheduler()
187 if sched != "Linux":
161 try: 188 try:
162 self.__run_tasks() 189 lu.switch_scheduler("Linux")
163 self.log("Saving results in %s" % self.finished_dir)
164 succ = True
165 except: 190 except:
166 traceback.print_exc() 191 msgs += ["Scheduler is %s, cannot switch to Linux!" % sched]
167 self.__kill_all()
168 raise ExperimentFailed(self.name)
169 finally:
170 self.__teardown()
171 finally:
172 self.log("Switching to Linux scheduler")
173
174 # Give the tasks 10 seconds to finish before bailing
175 start = time.time()
176 while lu.all_tasks() > 0:
177 if time.time() - start < 10.0:
178 raise SystemCorrupted("%d tasks still running!" %
179 lu.all_tasks())
180
181 lu.switch_scheduler("Linux")
182
183 if succ:
184 self.__save_results()
185 self.log("Experiment done!")
186 192
187 def __check_system(self):
188 running = lu.all_tasks() 193 running = lu.all_tasks()
189 if running: 194 if running:
190 raise SystemCorrupted("%d tasks already running!" % running) 195 msgs += ["%d real-time tasks still running!" % running]
191 196
192 sched = lu.scheduler() 197 if msgs:
193 if sched != "Linux": 198 raise SystemCorrupted("\n".join(msgs))
194 raise SystemCorrupted("Scheduler is %s, should be Linux" % sched)
195 199
196 def __setup(self): 200 def __setup(self):
201 self.__make_dirs()
202 self.__assign_executable_cwds()
203 self.__setup_tracers()
204
197 self.log("Writing %d proc entries" % len(self.proc_entries)) 205 self.log("Writing %d proc entries" % len(self.proc_entries))
198 map(methodcaller('write_proc'), self.proc_entries) 206 map(methodcaller('write_proc'), self.proc_entries)
199 207
208 self.log("Starting %d regular tracers" % len(self.regular_tracers))
209 map(methodcaller('start_tracing'), self.regular_tracers)
210
211 time.sleep(1)
212
200 self.log("Switching to %s" % self.scheduler) 213 self.log("Switching to %s" % self.scheduler)
201 lu.switch_scheduler(self.scheduler) 214 lu.switch_scheduler(self.scheduler)
202 215
203 self.log("Starting %d regular tracers" % len(self.regular_tracers)) 216 time.sleep(1)
204 map(methodcaller('start_tracing'), self.regular_tracers)
205 217
206 self.exec_out = open('%s/exec-out.txt' % self.working_dir, 'w') 218 self.exec_out = open('%s/exec-out.txt' % self.working_dir, 'w')
207 self.exec_err = open('%s/exec-err.txt' % self.working_dir, 'w') 219 self.exec_err = open('%s/exec-err.txt' % self.working_dir, 'w')
@@ -217,3 +229,32 @@ class Experiment(object):
217 self.log("Stopping regular tracers") 229 self.log("Stopping regular tracers")
218 map(methodcaller('stop_tracing'), self.regular_tracers) 230 map(methodcaller('stop_tracing'), self.regular_tracers)
219 231
232 def log(self, msg):
233 print("[Exp %s]: %s" % (self.name, msg))
234
235 def run_exp(self):
236 self.__to_linux()
237
238 succ = False
239 try:
240 self.__setup()
241
242 try:
243 self.__run_tasks()
244 self.log("Saving results in %s" % self.finished_dir)
245 succ = True
246 except Exception as e:
247 # Give time for whatever failed to finish failing
248 time.sleep(2)
249 self.__kill_all()
250
251 raise e
252 finally:
253 self.__teardown()
254 finally:
255 self.log("Switching back to Linux scheduler")
256 self.__to_linux()
257
258 if succ:
259 self.__save_results()
260 self.log("Experiment done!")