aboutsummaryrefslogtreecommitdiffstats
path: root/run
diff options
context:
space:
mode:
authorJonathan Herman <hermanjl@cs.unc.edu>2013-04-22 15:32:12 -0400
committerJonathan Herman <hermanjl@cs.unc.edu>2013-04-22 15:32:12 -0400
commit25ccdb0cbc6b959b1f96c89b8bce91963cb67b4c (patch)
treedcaeb7d85f3dcc0f2afbb53d11c512c71fb712ab /run
parentfbd1df6f63eb551b99f71330d2370c570ff323f5 (diff)
Improved robustness of run_exps.py execution.
Thanks to bcw and gelliott for debugging and ideas. * Print out experiment number and total experiments when starting experiments. * Only sleep and re-release tasks if tasks are waiting to release. * Fail experiment with verbose messages if any tasks fail before becoming ready to release. * When waiting for tasks to become ready for release, reset the waiting time whenever a new task (or task(s)) become ready. * Start regular tracers BEFORE the plugin switch to log data from the switch. * Check the number of running tasks AFTER trying to switch the linux scheduler. This gives plugin deactivate code the opportunity to kill these tasks. * If an invalid executable is specified in the schedule file, fail before attempting to run the experiment and print out the problem. * Propogate exceptions up from experiment failures instead of creating ExperimentFailed exceptions. This commit also made clock-frequency automatically ignored by parse_exps.py. The value of this would change by +- a Mhz between experiments, ruining graphs.
Diffstat (limited to 'run')
-rw-r--r--run/executable/executable.py3
-rw-r--r--run/experiment.py185
-rw-r--r--run/litmus_util.py13
3 files changed, 121 insertions, 80 deletions
diff --git a/run/executable/executable.py b/run/executable/executable.py
index e6f2003..a2426f1 100644
--- a/run/executable/executable.py
+++ b/run/executable/executable.py
@@ -59,6 +59,9 @@ class Executable(object):
59 def interrupt(self): 59 def interrupt(self):
60 self.sp.send_signal(signal.SIGINT) 60 self.sp.send_signal(signal.SIGINT)
61 61
62 def poll(self):
63 return self.sp.poll()
64
62 def terminate(self): 65 def terminate(self):
63 '''Send the terminate signal to the binary.''' 66 '''Send the terminate signal to the binary.'''
64 self.sp.terminate() 67 self.sp.terminate()
diff --git a/run/experiment.py b/run/experiment.py
index ff0e9f3..b0e46b6 100644
--- a/run/experiment.py
+++ b/run/experiment.py
@@ -1,9 +1,7 @@
1import common as com
2import os 1import os
3import time 2import time
4import run.litmus_util as lu 3import run.litmus_util as lu
5import shutil as sh 4import shutil as sh
6import traceback
7from operator import methodcaller 5from operator import methodcaller
8 6
9class ExperimentException(Exception): 7class ExperimentException(Exception):
@@ -11,16 +9,11 @@ class ExperimentException(Exception):
11 def __init__(self, name): 9 def __init__(self, name):
12 self.name = name 10 self.name = name
13 11
14
15class ExperimentDone(ExperimentException): 12class ExperimentDone(ExperimentException):
16 '''Raised when an experiment looks like it's been run already.''' 13 '''Raised when an experiment looks like it's been run already.'''
17 def __str__(self): 14 def __str__(self):
18 return "Experiment finished already: %d" % self.name 15 return "Experiment finished already: %d" % self.name
19 16
20class ExperimentFailed(ExperimentException):
21 def __str__(self):
22 return "Experiment failed during execution: %d" % self.name
23
24class SystemCorrupted(Exception): 17class SystemCorrupted(Exception):
25 pass 18 pass
26 19
@@ -31,7 +24,6 @@ class Experiment(object):
31 def __init__(self, name, scheduler, working_dir, finished_dir, 24 def __init__(self, name, scheduler, working_dir, finished_dir,
32 proc_entries, executables, tracer_types): 25 proc_entries, executables, tracer_types):
33 '''Run an experiment, optionally wrapped in tracing.''' 26 '''Run an experiment, optionally wrapped in tracing.'''
34
35 self.name = name 27 self.name = name
36 self.scheduler = scheduler 28 self.scheduler = scheduler
37 self.working_dir = working_dir 29 self.working_dir = working_dir
@@ -40,16 +32,10 @@ class Experiment(object):
40 self.executables = executables 32 self.executables = executables
41 self.exec_out = None 33 self.exec_out = None
42 self.exec_err = None 34 self.exec_err = None
35 self.tracer_types = tracer_types
43 36
44 self.task_batch = com.num_cpus() 37 def __setup_tracers(self):
45 38 tracers = [ t(self.working_dir) for t in self.tracer_types ]
46 self.__make_dirs()
47 self.__assign_executable_cwds()
48 self.__setup_tracers(tracer_types)
49
50
51 def __setup_tracers(self, tracer_types):
52 tracers = [ t(self.working_dir) for t in tracer_types ]
53 39
54 self.regular_tracers = [t for t in tracers if not t.is_exact()] 40 self.regular_tracers = [t for t in tracers if not t.is_exact()]
55 self.exact_tracers = [t for t in tracers if t.is_exact()] 41 self.exact_tracers = [t for t in tracers if t.is_exact()]
@@ -84,13 +70,11 @@ class Experiment(object):
84 map(assign_cwd, self.executables) 70 map(assign_cwd, self.executables)
85 71
86 def __kill_all(self): 72 def __kill_all(self):
87 # Give time for whatever failed to finish failing 73 if lu.waiting_tasks():
88 time.sleep(2) 74 released = lu.release_tasks()
89 75 self.log("Re-released %d tasks" % released)
90 released = lu.release_tasks()
91 self.log("Re-released %d tasks" % released)
92 76
93 time.sleep(5) 77 time.sleep(1)
94 78
95 self.log("Killing all tasks") 79 self.log("Killing all tasks")
96 for e in self.executables: 80 for e in self.executables:
@@ -99,7 +83,62 @@ class Experiment(object):
99 except: 83 except:
100 pass 84 pass
101 85
102 time.sleep(2) 86 time.sleep(1)
87
88 def __strip_path(self, path):
89 '''Shorten path to something more readable.'''
90 file_dir = os.path.split(self.working_dir)[0]
91 if path.index(file_dir) == 0:
92 path = path[len(file_dir):]
93
94 return path.strip("/")
95
96 def __check_tasks_status(self):
97 '''Raises an exception if any tasks have failed.'''
98 msgs = []
99
100 for e in self.executables:
101 status = e.poll()
102 if status != None and status:
103 err_msg = "Task %s failed with status: %s" % (e.wait(), status)
104 msgs += [err_msg]
105
106 if msgs:
107 # Show at most 3 messages so that every task failing doesn't blow
108 # up the terminal
109 if len(msgs) > 3:
110 num_errs = len(msgs) - 3
111 msgs = msgs[0:4] + ["...%d more task errors..." % num_errs]
112
113 out_name = self.__strip_path(self.exec_out.name)
114 err_name = self.__strip_path(self.exec_err.name)
115 help = "Check dmesg, %s, and %s" % (out_name, err_name)
116
117 raise Exception("\n".join(msgs + [help]))
118
119 def __wait_for_ready(self):
120 self.log("Sleeping until tasks are ready for release...")
121
122 wait_start = time.time()
123 num_ready = lu.waiting_tasks()
124
125 while num_ready < len(self.executables):
126 # Quit if too much time passes without a task becoming ready
127 if time.time() - wait_start > 180.0:
128 s = "waiting: %d, submitted: %d" %\
129 (lu.waiting_tasks(), len(self.executables))
130 raise Exception("Too much time spent waiting for tasks! %s" % s)
131
132 time.sleep(1)
133
134 # Quit if any tasks fail
135 self.__check_tasks_status()
136
137 # Reset the waiting time whenever more tasks become ready
138 now_ready = lu.waiting_tasks()
139 if now_ready != num_ready:
140 wait_start = time.time()
141 num_ready = lu.now_ready
103 142
104 def __run_tasks(self): 143 def __run_tasks(self):
105 self.log("Starting %d tasks" % len(self.executables)) 144 self.log("Starting %d tasks" % len(self.executables))
@@ -108,16 +147,9 @@ class Experiment(object):
108 try: 147 try:
109 e.execute() 148 e.execute()
110 except: 149 except:
111 raise Exception("Executable failed: %s" % e) 150 raise Exception("Executable failed to start: %s" % e)
112 151
113 self.log("Sleeping until tasks are ready for release...") 152 self.__wait_for_ready()
114 start = time.time()
115 while lu.waiting_tasks() < len(self.executables):
116 if time.time() - start > 30.0:
117 s = "waiting: %d, submitted: %d" %\
118 (lu.waiting_tasks(), len(self.executables))
119 raise Exception("Too much time spent waiting for tasks! %s" % s)
120 time.sleep(1)
121 153
122 # Exact tracers (like overheads) must be started right after release or 154 # Exact tracers (like overheads) must be started right after release or
123 # measurements will be full of irrelevant records 155 # measurements will be full of irrelevant records
@@ -148,60 +180,40 @@ class Experiment(object):
148 def __save_results(self): 180 def __save_results(self):
149 os.rename(self.working_dir, self.finished_dir) 181 os.rename(self.working_dir, self.finished_dir)
150 182
151 def log(self, msg): 183 def __to_linux(self):
152 print("[Exp %s]: %s" % (self.name, msg)) 184 msgs = []
153
154 def run_exp(self):
155 self.__check_system()
156
157 succ = False
158 try:
159 self.__setup()
160 185
186 sched = lu.scheduler()
187 if sched != "Linux":
161 try: 188 try:
162 self.__run_tasks() 189 lu.switch_scheduler("Linux")
163 self.log("Saving results in %s" % self.finished_dir)
164 succ = True
165 except: 190 except:
166 traceback.print_exc() 191 msgs += ["Scheduler is %s, cannot switch to Linux!" % sched]
167 self.__kill_all()
168 raise ExperimentFailed(self.name)
169 finally:
170 self.__teardown()
171 finally:
172 self.log("Switching to Linux scheduler")
173
174 # Give the tasks 10 seconds to finish before bailing
175 start = time.time()
176 while lu.all_tasks() > 0:
177 if time.time() - start < 10.0:
178 raise SystemCorrupted("%d tasks still running!" %
179 lu.all_tasks())
180
181 lu.switch_scheduler("Linux")
182
183 if succ:
184 self.__save_results()
185 self.log("Experiment done!")
186 192
187 def __check_system(self):
188 running = lu.all_tasks() 193 running = lu.all_tasks()
189 if running: 194 if running:
190 raise SystemCorrupted("%d tasks already running!" % running) 195 msgs += ["%d real-time tasks still running!" % running]
191 196
192 sched = lu.scheduler() 197 if msgs:
193 if sched != "Linux": 198 raise SystemCorrupted("\n".join(msgs))
194 raise SystemCorrupted("Scheduler is %s, should be Linux" % sched)
195 199
196 def __setup(self): 200 def __setup(self):
201 self.__make_dirs()
202 self.__assign_executable_cwds()
203 self.__setup_tracers()
204
197 self.log("Writing %d proc entries" % len(self.proc_entries)) 205 self.log("Writing %d proc entries" % len(self.proc_entries))
198 map(methodcaller('write_proc'), self.proc_entries) 206 map(methodcaller('write_proc'), self.proc_entries)
199 207
208 self.log("Starting %d regular tracers" % len(self.regular_tracers))
209 map(methodcaller('start_tracing'), self.regular_tracers)
210
211 time.sleep(1)
212
200 self.log("Switching to %s" % self.scheduler) 213 self.log("Switching to %s" % self.scheduler)
201 lu.switch_scheduler(self.scheduler) 214 lu.switch_scheduler(self.scheduler)
202 215
203 self.log("Starting %d regular tracers" % len(self.regular_tracers)) 216 time.sleep(1)
204 map(methodcaller('start_tracing'), self.regular_tracers)
205 217
206 self.exec_out = open('%s/exec-out.txt' % self.working_dir, 'w') 218 self.exec_out = open('%s/exec-out.txt' % self.working_dir, 'w')
207 self.exec_err = open('%s/exec-err.txt' % self.working_dir, 'w') 219 self.exec_err = open('%s/exec-err.txt' % self.working_dir, 'w')
@@ -217,3 +229,32 @@ class Experiment(object):
217 self.log("Stopping regular tracers") 229 self.log("Stopping regular tracers")
218 map(methodcaller('stop_tracing'), self.regular_tracers) 230 map(methodcaller('stop_tracing'), self.regular_tracers)
219 231
232 def log(self, msg):
233 print("[Exp %s]: %s" % (self.name, msg))
234
235 def run_exp(self):
236 self.__to_linux()
237
238 succ = False
239 try:
240 self.__setup()
241
242 try:
243 self.__run_tasks()
244 self.log("Saving results in %s" % self.finished_dir)
245 succ = True
246 except Exception as e:
247 # Give time for whatever failed to finish failing
248 time.sleep(2)
249 self.__kill_all()
250
251 raise e
252 finally:
253 self.__teardown()
254 finally:
255 self.log("Switching back to Linux scheduler")
256 self.__to_linux()
257
258 if succ:
259 self.__save_results()
260 self.log("Experiment done!")
diff --git a/run/litmus_util.py b/run/litmus_util.py
index 70da262..81f690b 100644
--- a/run/litmus_util.py
+++ b/run/litmus_util.py
@@ -20,7 +20,7 @@ def switch_scheduler(switch_to_in):
20 with open('/proc/litmus/active_plugin', 'w') as active_plugin: 20 with open('/proc/litmus/active_plugin', 'w') as active_plugin:
21 subprocess.Popen(["echo", switch_to], stdout=active_plugin) 21 subprocess.Popen(["echo", switch_to], stdout=active_plugin)
22 22
23 # it takes a bit to do the switch, sleep an arbitrary amount of time 23 # It takes a bit to do the switch, sleep an arbitrary amount of time
24 time.sleep(2) 24 time.sleep(2)
25 25
26 cur_plugin = scheduler() 26 cur_plugin = scheduler()
@@ -29,24 +29,21 @@ def switch_scheduler(switch_to_in):
29 (switch_to, cur_plugin)) 29 (switch_to, cur_plugin))
30 30
31def waiting_tasks(): 31def waiting_tasks():
32 reg = re.compile(r'^ready.*?(?P<READY>\d+)$', re.M) 32 reg = re.compile(r'^ready.*?(?P<WAITING>\d+)$', re.M)
33 with open('/proc/litmus/stats', 'r') as f: 33 with open('/proc/litmus/stats', 'r') as f:
34 data = f.read() 34 data = f.read()
35 35
36 # Ignore if no tasks are waiting for release 36 # Ignore if no tasks are waiting for release
37 match = re.search(reg, data) 37 waiting = re.search(reg, data).group("WAITING")
38 ready = match.group("READY")
39 38
40 return 0 if not ready else int(ready) 39 return 0 if not waiting else int(waiting)
41 40
42def all_tasks(): 41def all_tasks():
43 reg = re.compile(r'^real-time.*?(?P<TASKS>\d+)$', re.M) 42 reg = re.compile(r'^real-time.*?(?P<TASKS>\d+)$', re.M)
44 with open('/proc/litmus/stats', 'r') as f: 43 with open('/proc/litmus/stats', 'r') as f:
45 data = f.read() 44 data = f.read()
46 45
47 # Ignore if no tasks are waiting for release 46 ready = re.search(reg, data).group("TASKS")
48 match = re.search(reg, data)
49 ready = match.group("TASKS")
50 47
51 return 0 if not ready else int(ready) 48 return 0 if not ready else int(ready)
52 49