aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJonathan Herman <hermanjl@cs.unc.edu>2013-04-22 15:32:12 -0400
committerJonathan Herman <hermanjl@cs.unc.edu>2013-04-22 15:32:12 -0400
commit25ccdb0cbc6b959b1f96c89b8bce91963cb67b4c (patch)
treedcaeb7d85f3dcc0f2afbb53d11c512c71fb712ab
parentfbd1df6f63eb551b99f71330d2370c570ff323f5 (diff)
Improved robustness of run_exps.py execution.
Thanks to bcw and gelliott for debugging and ideas. * Print out experiment number and total experiments when starting experiments. * Only sleep and re-release tasks if tasks are waiting to release. * Fail experiment with verbose messages if any tasks fail before becoming ready to release. * When waiting for tasks to become ready for release, reset the waiting time whenever a new task (or task(s)) become ready. * Start regular tracers BEFORE the plugin switch to log data from the switch. * Check the number of running tasks AFTER trying to switch the linux scheduler. This gives plugin deactivate code the opportunity to kill these tasks. * If an invalid executable is specified in the schedule file, fail before attempting to run the experiment and print out the problem. * Propogate exceptions up from experiment failures instead of creating ExperimentFailed exceptions. This commit also made clock-frequency automatically ignored by parse_exps.py. The value of this would change by +- a Mhz between experiments, ruining graphs.
-rw-r--r--config/config.py2
-rwxr-xr-xparse_exps.py4
-rw-r--r--run/executable/executable.py3
-rw-r--r--run/experiment.py185
-rw-r--r--run/litmus_util.py13
-rwxr-xr-xrun_exps.py170
6 files changed, 216 insertions, 161 deletions
diff --git a/config/config.py b/config/config.py
index 1ac468b..b631aa2 100644
--- a/config/config.py
+++ b/config/config.py
@@ -9,7 +9,7 @@ BINS = {'rtspin' : get_executable_hint('rtspin', 'liblitmus'),
9 'ftsplit' : get_executable_hint('ft2csv', 'feather-trace-tools'), 9 'ftsplit' : get_executable_hint('ft2csv', 'feather-trace-tools'),
10 'ftsort' : get_executable_hint('ftsort', 'feather-trace-tools'), 10 'ftsort' : get_executable_hint('ftsort', 'feather-trace-tools'),
11 'st_trace' : get_executable_hint('st_trace', 'feather-trace-tools'), 11 'st_trace' : get_executable_hint('st_trace', 'feather-trace-tools'),
12 # Option, as not everyone uses kernelshark yet 12 # Optional, as not everyone uses kernelshark yet
13 'trace-cmd' : get_executable_hint('trace-cmd', 'rt-kernelshark', True), 13 'trace-cmd' : get_executable_hint('trace-cmd', 'rt-kernelshark', True),
14 # Optional, as sched_trace is not a publically supported repository 14 # Optional, as sched_trace is not a publically supported repository
15 'st_show' : get_executable_hint('st_show', 'sched_trace', True)} 15 'st_show' : get_executable_hint('st_show', 'sched_trace', True)}
diff --git a/parse_exps.py b/parse_exps.py
index d07378c..c2cbedb 100755
--- a/parse_exps.py
+++ b/parse_exps.py
@@ -140,8 +140,8 @@ def main():
140 if opts.ignore: 140 if opts.ignore:
141 for param in opts.ignore.split(","): 141 for param in opts.ignore.split(","):
142 builder.try_remove(param) 142 builder.try_remove(param)
143 # Always average multiple trials 143 builder.try_remove(PARAMS['trial']) # Always average multiple trials
144 builder.try_remove(PARAMS['trial']) 144 builder.try_remove(PARAMS['cycles']) # Only need for feather-trace parsing
145 145
146 col_map = builder.build() 146 col_map = builder.build()
147 result_table = TupleTable(col_map) 147 result_table = TupleTable(col_map)
diff --git a/run/executable/executable.py b/run/executable/executable.py
index e6f2003..a2426f1 100644
--- a/run/executable/executable.py
+++ b/run/executable/executable.py
@@ -59,6 +59,9 @@ class Executable(object):
59 def interrupt(self): 59 def interrupt(self):
60 self.sp.send_signal(signal.SIGINT) 60 self.sp.send_signal(signal.SIGINT)
61 61
62 def poll(self):
63 return self.sp.poll()
64
62 def terminate(self): 65 def terminate(self):
63 '''Send the terminate signal to the binary.''' 66 '''Send the terminate signal to the binary.'''
64 self.sp.terminate() 67 self.sp.terminate()
diff --git a/run/experiment.py b/run/experiment.py
index ff0e9f3..b0e46b6 100644
--- a/run/experiment.py
+++ b/run/experiment.py
@@ -1,9 +1,7 @@
1import common as com
2import os 1import os
3import time 2import time
4import run.litmus_util as lu 3import run.litmus_util as lu
5import shutil as sh 4import shutil as sh
6import traceback
7from operator import methodcaller 5from operator import methodcaller
8 6
9class ExperimentException(Exception): 7class ExperimentException(Exception):
@@ -11,16 +9,11 @@ class ExperimentException(Exception):
11 def __init__(self, name): 9 def __init__(self, name):
12 self.name = name 10 self.name = name
13 11
14
15class ExperimentDone(ExperimentException): 12class ExperimentDone(ExperimentException):
16 '''Raised when an experiment looks like it's been run already.''' 13 '''Raised when an experiment looks like it's been run already.'''
17 def __str__(self): 14 def __str__(self):
18 return "Experiment finished already: %d" % self.name 15 return "Experiment finished already: %d" % self.name
19 16
20class ExperimentFailed(ExperimentException):
21 def __str__(self):
22 return "Experiment failed during execution: %d" % self.name
23
24class SystemCorrupted(Exception): 17class SystemCorrupted(Exception):
25 pass 18 pass
26 19
@@ -31,7 +24,6 @@ class Experiment(object):
31 def __init__(self, name, scheduler, working_dir, finished_dir, 24 def __init__(self, name, scheduler, working_dir, finished_dir,
32 proc_entries, executables, tracer_types): 25 proc_entries, executables, tracer_types):
33 '''Run an experiment, optionally wrapped in tracing.''' 26 '''Run an experiment, optionally wrapped in tracing.'''
34
35 self.name = name 27 self.name = name
36 self.scheduler = scheduler 28 self.scheduler = scheduler
37 self.working_dir = working_dir 29 self.working_dir = working_dir
@@ -40,16 +32,10 @@ class Experiment(object):
40 self.executables = executables 32 self.executables = executables
41 self.exec_out = None 33 self.exec_out = None
42 self.exec_err = None 34 self.exec_err = None
35 self.tracer_types = tracer_types
43 36
44 self.task_batch = com.num_cpus() 37 def __setup_tracers(self):
45 38 tracers = [ t(self.working_dir) for t in self.tracer_types ]
46 self.__make_dirs()
47 self.__assign_executable_cwds()
48 self.__setup_tracers(tracer_types)
49
50
51 def __setup_tracers(self, tracer_types):
52 tracers = [ t(self.working_dir) for t in tracer_types ]
53 39
54 self.regular_tracers = [t for t in tracers if not t.is_exact()] 40 self.regular_tracers = [t for t in tracers if not t.is_exact()]
55 self.exact_tracers = [t for t in tracers if t.is_exact()] 41 self.exact_tracers = [t for t in tracers if t.is_exact()]
@@ -84,13 +70,11 @@ class Experiment(object):
84 map(assign_cwd, self.executables) 70 map(assign_cwd, self.executables)
85 71
86 def __kill_all(self): 72 def __kill_all(self):
87 # Give time for whatever failed to finish failing 73 if lu.waiting_tasks():
88 time.sleep(2) 74 released = lu.release_tasks()
89 75 self.log("Re-released %d tasks" % released)
90 released = lu.release_tasks()
91 self.log("Re-released %d tasks" % released)
92 76
93 time.sleep(5) 77 time.sleep(1)
94 78
95 self.log("Killing all tasks") 79 self.log("Killing all tasks")
96 for e in self.executables: 80 for e in self.executables:
@@ -99,7 +83,62 @@ class Experiment(object):
99 except: 83 except:
100 pass 84 pass
101 85
102 time.sleep(2) 86 time.sleep(1)
87
88 def __strip_path(self, path):
89 '''Shorten path to something more readable.'''
90 file_dir = os.path.split(self.working_dir)[0]
91 if path.index(file_dir) == 0:
92 path = path[len(file_dir):]
93
94 return path.strip("/")
95
96 def __check_tasks_status(self):
97 '''Raises an exception if any tasks have failed.'''
98 msgs = []
99
100 for e in self.executables:
101 status = e.poll()
102 if status != None and status:
103 err_msg = "Task %s failed with status: %s" % (e.wait(), status)
104 msgs += [err_msg]
105
106 if msgs:
107 # Show at most 3 messages so that every task failing doesn't blow
108 # up the terminal
109 if len(msgs) > 3:
110 num_errs = len(msgs) - 3
111 msgs = msgs[0:4] + ["...%d more task errors..." % num_errs]
112
113 out_name = self.__strip_path(self.exec_out.name)
114 err_name = self.__strip_path(self.exec_err.name)
115 help = "Check dmesg, %s, and %s" % (out_name, err_name)
116
117 raise Exception("\n".join(msgs + [help]))
118
119 def __wait_for_ready(self):
120 self.log("Sleeping until tasks are ready for release...")
121
122 wait_start = time.time()
123 num_ready = lu.waiting_tasks()
124
125 while num_ready < len(self.executables):
126 # Quit if too much time passes without a task becoming ready
127 if time.time() - wait_start > 180.0:
128 s = "waiting: %d, submitted: %d" %\
129 (lu.waiting_tasks(), len(self.executables))
130 raise Exception("Too much time spent waiting for tasks! %s" % s)
131
132 time.sleep(1)
133
134 # Quit if any tasks fail
135 self.__check_tasks_status()
136
137 # Reset the waiting time whenever more tasks become ready
138 now_ready = lu.waiting_tasks()
139 if now_ready != num_ready:
140 wait_start = time.time()
141 num_ready = lu.now_ready
103 142
104 def __run_tasks(self): 143 def __run_tasks(self):
105 self.log("Starting %d tasks" % len(self.executables)) 144 self.log("Starting %d tasks" % len(self.executables))
@@ -108,16 +147,9 @@ class Experiment(object):
108 try: 147 try:
109 e.execute() 148 e.execute()
110 except: 149 except:
111 raise Exception("Executable failed: %s" % e) 150 raise Exception("Executable failed to start: %s" % e)
112 151
113 self.log("Sleeping until tasks are ready for release...") 152 self.__wait_for_ready()
114 start = time.time()
115 while lu.waiting_tasks() < len(self.executables):
116 if time.time() - start > 30.0:
117 s = "waiting: %d, submitted: %d" %\
118 (lu.waiting_tasks(), len(self.executables))
119 raise Exception("Too much time spent waiting for tasks! %s" % s)
120 time.sleep(1)
121 153
122 # Exact tracers (like overheads) must be started right after release or 154 # Exact tracers (like overheads) must be started right after release or
123 # measurements will be full of irrelevant records 155 # measurements will be full of irrelevant records
@@ -148,60 +180,40 @@ class Experiment(object):
148 def __save_results(self): 180 def __save_results(self):
149 os.rename(self.working_dir, self.finished_dir) 181 os.rename(self.working_dir, self.finished_dir)
150 182
151 def log(self, msg): 183 def __to_linux(self):
152 print("[Exp %s]: %s" % (self.name, msg)) 184 msgs = []
153
154 def run_exp(self):
155 self.__check_system()
156
157 succ = False
158 try:
159 self.__setup()
160 185
186 sched = lu.scheduler()
187 if sched != "Linux":
161 try: 188 try:
162 self.__run_tasks() 189 lu.switch_scheduler("Linux")
163 self.log("Saving results in %s" % self.finished_dir)
164 succ = True
165 except: 190 except:
166 traceback.print_exc() 191 msgs += ["Scheduler is %s, cannot switch to Linux!" % sched]
167 self.__kill_all()
168 raise ExperimentFailed(self.name)
169 finally:
170 self.__teardown()
171 finally:
172 self.log("Switching to Linux scheduler")
173
174 # Give the tasks 10 seconds to finish before bailing
175 start = time.time()
176 while lu.all_tasks() > 0:
177 if time.time() - start < 10.0:
178 raise SystemCorrupted("%d tasks still running!" %
179 lu.all_tasks())
180
181 lu.switch_scheduler("Linux")
182
183 if succ:
184 self.__save_results()
185 self.log("Experiment done!")
186 192
187 def __check_system(self):
188 running = lu.all_tasks() 193 running = lu.all_tasks()
189 if running: 194 if running:
190 raise SystemCorrupted("%d tasks already running!" % running) 195 msgs += ["%d real-time tasks still running!" % running]
191 196
192 sched = lu.scheduler() 197 if msgs:
193 if sched != "Linux": 198 raise SystemCorrupted("\n".join(msgs))
194 raise SystemCorrupted("Scheduler is %s, should be Linux" % sched)
195 199
196 def __setup(self): 200 def __setup(self):
201 self.__make_dirs()
202 self.__assign_executable_cwds()
203 self.__setup_tracers()
204
197 self.log("Writing %d proc entries" % len(self.proc_entries)) 205 self.log("Writing %d proc entries" % len(self.proc_entries))
198 map(methodcaller('write_proc'), self.proc_entries) 206 map(methodcaller('write_proc'), self.proc_entries)
199 207
208 self.log("Starting %d regular tracers" % len(self.regular_tracers))
209 map(methodcaller('start_tracing'), self.regular_tracers)
210
211 time.sleep(1)
212
200 self.log("Switching to %s" % self.scheduler) 213 self.log("Switching to %s" % self.scheduler)
201 lu.switch_scheduler(self.scheduler) 214 lu.switch_scheduler(self.scheduler)
202 215
203 self.log("Starting %d regular tracers" % len(self.regular_tracers)) 216 time.sleep(1)
204 map(methodcaller('start_tracing'), self.regular_tracers)
205 217
206 self.exec_out = open('%s/exec-out.txt' % self.working_dir, 'w') 218 self.exec_out = open('%s/exec-out.txt' % self.working_dir, 'w')
207 self.exec_err = open('%s/exec-err.txt' % self.working_dir, 'w') 219 self.exec_err = open('%s/exec-err.txt' % self.working_dir, 'w')
@@ -217,3 +229,32 @@ class Experiment(object):
217 self.log("Stopping regular tracers") 229 self.log("Stopping regular tracers")
218 map(methodcaller('stop_tracing'), self.regular_tracers) 230 map(methodcaller('stop_tracing'), self.regular_tracers)
219 231
232 def log(self, msg):
233 print("[Exp %s]: %s" % (self.name, msg))
234
235 def run_exp(self):
236 self.__to_linux()
237
238 succ = False
239 try:
240 self.__setup()
241
242 try:
243 self.__run_tasks()
244 self.log("Saving results in %s" % self.finished_dir)
245 succ = True
246 except Exception as e:
247 # Give time for whatever failed to finish failing
248 time.sleep(2)
249 self.__kill_all()
250
251 raise e
252 finally:
253 self.__teardown()
254 finally:
255 self.log("Switching back to Linux scheduler")
256 self.__to_linux()
257
258 if succ:
259 self.__save_results()
260 self.log("Experiment done!")
diff --git a/run/litmus_util.py b/run/litmus_util.py
index 70da262..81f690b 100644
--- a/run/litmus_util.py
+++ b/run/litmus_util.py
@@ -20,7 +20,7 @@ def switch_scheduler(switch_to_in):
20 with open('/proc/litmus/active_plugin', 'w') as active_plugin: 20 with open('/proc/litmus/active_plugin', 'w') as active_plugin:
21 subprocess.Popen(["echo", switch_to], stdout=active_plugin) 21 subprocess.Popen(["echo", switch_to], stdout=active_plugin)
22 22
23 # it takes a bit to do the switch, sleep an arbitrary amount of time 23 # It takes a bit to do the switch, sleep an arbitrary amount of time
24 time.sleep(2) 24 time.sleep(2)
25 25
26 cur_plugin = scheduler() 26 cur_plugin = scheduler()
@@ -29,24 +29,21 @@ def switch_scheduler(switch_to_in):
29 (switch_to, cur_plugin)) 29 (switch_to, cur_plugin))
30 30
31def waiting_tasks(): 31def waiting_tasks():
32 reg = re.compile(r'^ready.*?(?P<READY>\d+)$', re.M) 32 reg = re.compile(r'^ready.*?(?P<WAITING>\d+)$', re.M)
33 with open('/proc/litmus/stats', 'r') as f: 33 with open('/proc/litmus/stats', 'r') as f:
34 data = f.read() 34 data = f.read()
35 35
36 # Ignore if no tasks are waiting for release 36 # Ignore if no tasks are waiting for release
37 match = re.search(reg, data) 37 waiting = re.search(reg, data).group("WAITING")
38 ready = match.group("READY")
39 38
40 return 0 if not ready else int(ready) 39 return 0 if not waiting else int(waiting)
41 40
42def all_tasks(): 41def all_tasks():
43 reg = re.compile(r'^real-time.*?(?P<TASKS>\d+)$', re.M) 42 reg = re.compile(r'^real-time.*?(?P<TASKS>\d+)$', re.M)
44 with open('/proc/litmus/stats', 'r') as f: 43 with open('/proc/litmus/stats', 'r') as f:
45 data = f.read() 44 data = f.read()
46 45
47 # Ignore if no tasks are waiting for release 46 ready = re.search(reg, data).group("TASKS")
48 match = re.search(reg, data)
49 ready = match.group("TASKS")
50 47
51 return 0 if not ready else int(ready) 48 return 0 if not ready else int(ready)
52 49
diff --git a/run_exps.py b/run_exps.py
index d7a06b5..a15018d 100755
--- a/run_exps.py
+++ b/run_exps.py
@@ -12,12 +12,13 @@ from config.config import PARAMS,DEFAULTS
12from collections import namedtuple 12from collections import namedtuple
13from optparse import OptionParser 13from optparse import OptionParser
14from run.executable.executable import Executable 14from run.executable.executable import Executable
15from run.experiment import Experiment,ExperimentDone,ExperimentFailed,SystemCorrupted 15from run.experiment import Experiment,ExperimentDone,SystemCorrupted
16from run.proc_entry import ProcEntry 16from run.proc_entry import ProcEntry
17 17
18'''Customizable experiment parameters''' 18'''Customizable experiment parameters'''
19ExpParams = namedtuple('ExpParams', ['scheduler', 'duration', 'tracers', 19ExpParams = namedtuple('ExpParams', ['scheduler', 'duration', 'tracers',
20 'kernel', 'config_options']) 20 'kernel', 'config_options', 'file_params',
21 'pre_script', 'post_script'])
21'''Comparison of requested versus actual kernel compile parameter value''' 22'''Comparison of requested versus actual kernel compile parameter value'''
22ConfigResult = namedtuple('ConfigResult', ['param', 'wanted', 'actual']) 23ConfigResult = namedtuple('ConfigResult', ['param', 'wanted', 'actual'])
23 24
@@ -113,8 +114,8 @@ def fix_paths(schedule, exp_dir, sched_file):
113 args = args.replace(arg, abspath) 114 args = args.replace(arg, abspath)
114 break 115 break
115 elif re.match(r'.*\w+\.[a-zA-Z]\w*', arg): 116 elif re.match(r'.*\w+\.[a-zA-Z]\w*', arg):
116 print("WARNING: non-existent file '%s' may be referenced:\n\t%s" 117 sys.stderr.write("WARNING: non-existent file '%s' " % arg +
117 % (arg, sched_file)) 118 "may be referenced:\n\t%s" % sched_file)
118 119
119 schedule['task'][idx] = (task, args) 120 schedule['task'][idx] = (task, args)
120 121
@@ -182,22 +183,21 @@ def verify_environment(exp_params):
182 raise InvalidConfig(results) 183 raise InvalidConfig(results)
183 184
184 185
185def run_parameter(exp_dir, out_dir, params, param_name): 186def run_script(script_params, exp, exp_dir, out_dir):
186 '''Run an executable (arguments optional) specified as a configurable 187 '''Run an executable (arguments optional)'''
187 @param_name in @params.''' 188 if not script_params:
188 if PARAMS[param_name] not in params:
189 return 189 return
190 190
191 script_params = params[PARAMS[param_name]]
192
193 # Split into arguments and program name 191 # Split into arguments and program name
194 if type(script_params) != type([]): 192 if type(script_params) != type([]):
195 script_params = [script_params] 193 script_params = [script_params]
196 script_name = script_params.pop(0)
197 194
195 exp.log("Running %s" % script_params.join(" "))
196
197 script_name = script_params.pop(0)
198 script = com.get_executable(script_name, cwd=exp_dir) 198 script = com.get_executable(script_name, cwd=exp_dir)
199 199
200 out = open('%s/%s-out.txt' % (out_dir, param_name), 'w') 200 out = open('%s/%s-out.txt' % (out_dir, script_name), 'w')
201 prog = Executable(script, script_params, cwd=out_dir, 201 prog = Executable(script, script_params, cwd=out_dir,
202 stderr_file=out, stdout_file=out) 202 stderr_file=out, stdout_file=out)
203 203
@@ -207,28 +207,41 @@ def run_parameter(exp_dir, out_dir, params, param_name):
207 out.close() 207 out.close()
208 208
209 209
210def get_exp_params(cmd_scheduler, cmd_duration, file_params): 210def make_exp_params(cmd_scheduler, cmd_duration, sched_dir, param_file):
211 '''Return ExpParam with configured values of all hardcoded params.''' 211 '''Return ExpParam with configured values of all hardcoded params.'''
212 kernel = copts = "" 212 kernel = copts = ""
213 213
214 scheduler = cmd_scheduler or file_params[PARAMS['sched']] 214 # Load parameter file
215 duration = cmd_duration or file_params[PARAMS['dur']] or\ 215 param_file = param_file or "%s/%s" % (sched_dir, DEFAULTS['params_file'])
216 if os.path.isfile(param_file):
217 fparams = com.load_params(param_file)
218 else:
219 fparams = {}
220
221 scheduler = cmd_scheduler or fparams[PARAMS['sched']]
222 duration = cmd_duration or fparams[PARAMS['dur']] or\
216 DEFAULTS['duration'] 223 DEFAULTS['duration']
217 224
218 # Experiments can specify required kernel name 225 # Experiments can specify required kernel name
219 if PARAMS['kernel'] in file_params: 226 if PARAMS['kernel'] in fparams:
220 kernel = file_params[PARAMS['kernel']] 227 kernel = fparams[PARAMS['kernel']]
221 228
222 # Or required config options 229 # Or required config options
223 if PARAMS['copts'] in file_params: 230 if PARAMS['copts'] in fparams:
224 copts = file_params[PARAMS['copts']] 231 copts = fparams[PARAMS['copts']]
225 232
226 # Or required tracers 233 # Or required tracers
227 requested = [] 234 requested = []
228 if PARAMS['trace'] in file_params: 235 if PARAMS['trace'] in fparams:
229 requested = file_params[PARAMS['trace']] 236 requested = fparams[PARAMS['trace']]
230 tracers = trace.get_tracer_types(requested) 237 tracers = trace.get_tracer_types(requested)
231 238
239 # Or scripts to run before and after experiments
240 def get_script(name):
241 return fparams[name] if name in fparams else None
242 pre_script = get_script('pre')
243 post_script = get_script('post')
244
232 # But only these two are mandatory 245 # But only these two are mandatory
233 if not scheduler: 246 if not scheduler:
234 raise IOError("No scheduler found in param file!") 247 raise IOError("No scheduler found in param file!")
@@ -236,48 +249,39 @@ def get_exp_params(cmd_scheduler, cmd_duration, file_params):
236 raise IOError("No duration found in param file!") 249 raise IOError("No duration found in param file!")
237 250
238 return ExpParams(scheduler=scheduler, kernel=kernel, duration=duration, 251 return ExpParams(scheduler=scheduler, kernel=kernel, duration=duration,
239 config_options=copts, tracers=tracers) 252 config_options=copts, tracers=tracers, file_params=fparams,
240 253 pre_script=pre_script, post_script=post_script)
241 254
242def load_experiment(sched_file, cmd_scheduler, cmd_duration, 255def run_experiment(name, sched_file, exp_params, out_dir,
243 param_file, out_dir, ignore, jabber): 256 start_message, ignore, jabber):
244 '''Load and parse data from files and run result.''' 257 '''Load and parse data from files and run result.'''
245 if not os.path.isfile(sched_file): 258 if not os.path.isfile(sched_file):
246 raise IOError("Cannot find schedule file: %s" % sched_file) 259 raise IOError("Cannot find schedule file: %s" % sched_file)
247 260
248 dir_name, fname = os.path.split(sched_file) 261 dir_name, fname = os.path.split(sched_file)
249 exp_name = os.path.split(dir_name)[1] + "/" + fname
250 work_dir = "%s/tmp" % dir_name 262 work_dir = "%s/tmp" % dir_name
251 263
252 # Load parameter file 264 procs, execs = load_schedule(name, sched_file, exp_params.duration)
253 param_file = param_file or \
254 "%s/%s" % (dir_name, DEFAULTS['params_file'])
255 if os.path.isfile(param_file):
256 file_params = com.load_params(param_file)
257 else:
258 file_params = {}
259
260 # Create input needed by Experiment
261 exp_params = get_exp_params(cmd_scheduler, cmd_duration, file_params)
262 procs, execs = load_schedule(exp_name, sched_file, exp_params.duration)
263 265
264 exp = Experiment(exp_name, exp_params.scheduler, work_dir, out_dir, 266 exp = Experiment(name, exp_params.scheduler, work_dir, out_dir,
265 procs, execs, exp_params.tracers) 267 procs, execs, exp_params.tracers)
266 268
269 exp.log(start_message)
270
267 if not ignore: 271 if not ignore:
268 verify_environment(exp_params) 272 verify_environment(exp_params)
269 273
270 run_parameter(dir_name, work_dir, file_params, 'pre') 274 run_script(exp_params.pre_script, exp, dir_name, work_dir)
271 275
272 exp.run_exp() 276 exp.run_exp()
273 277
274 run_parameter(dir_name, out_dir, file_params, 'post') 278 run_script(exp_params.post_script, exp, dir_name, out_dir)
275 279
276 if jabber: 280 if jabber:
277 jabber.send("Completed '%s'" % exp_name) 281 jabber.send("Completed '%s'" % name)
278 282
279 # Save parameters used to run experiment in out_dir 283 # Save parameters used to run experiment in out_dir
280 out_params = dict(file_params.items() + 284 out_params = dict(exp_params.file_params.items() +
281 [(PARAMS['sched'], exp_params.scheduler), 285 [(PARAMS['sched'], exp_params.scheduler),
282 (PARAMS['tasks'], len(execs)), 286 (PARAMS['tasks'], len(execs)),
283 (PARAMS['dur'], exp_params.duration)]) 287 (PARAMS['dur'], exp_params.duration)])
@@ -292,6 +296,7 @@ def load_experiment(sched_file, cmd_scheduler, cmd_duration,
292 296
293 297
294def get_exps(opts, args): 298def get_exps(opts, args):
299 '''Return list of experiment files or directories'''
295 if args: 300 if args:
296 return args 301 return args
297 302
@@ -333,63 +338,72 @@ def setup_email(target):
333 return None 338 return None
334 339
335 340
341def make_paths(exp, out_base_dir, opts):
342 '''Translate experiment name to (schedule file, output directory) paths'''
343 path = "%s/%s" % (os.getcwd(), exp)
344 out_dir = "%s/%s" % (out_base_dir, os.path.split(exp.strip('/'))[1])
345
346 if not os.path.exists(path):
347 raise IOError("Invalid experiment: %s" % path)
348
349 if opts.force and os.path.exists(out_dir):
350 shutil.rmtree(out_dir)
351
352 if os.path.isdir(path):
353 sched_file = "%s/%s" % (path, opts.sched_file)
354 else:
355 sched_file = path
356
357 return sched_file, out_dir
358
359
336def main(): 360def main():
337 opts, args = parse_args() 361 opts, args = parse_args()
338 362
339 scheduler = opts.scheduler
340 duration = opts.duration
341 param_file = opts.param_file
342 out_base = os.path.abspath(opts.out_dir)
343
344 exps = get_exps(opts, args) 363 exps = get_exps(opts, args)
345 364
346 created = False 365 jabber = setup_jabber(opts.jabber) if opts.jabber else None
366 email = setup_email(opts.email) if opts.email else None
367
368 out_base = os.path.abspath(opts.out_dir)
369 created = False
347 if not os.path.exists(out_base): 370 if not os.path.exists(out_base):
348 created = True 371 created = True
349 os.mkdir(out_base) 372 os.mkdir(out_base)
350 373
351 ran = 0 374 ran = done = succ = failed = invalid = 0
352 done = 0
353 succ = 0
354 failed = 0
355 invalid = 0
356
357 jabber = setup_jabber(opts.jabber) if opts.jabber else None
358 email = setup_email(opts.email) if opts.email else None
359
360 for exp in exps:
361 path = "%s/%s" % (os.getcwd(), exp)
362 out_dir = "%s/%s" % (out_base, os.path.split(exp.strip('/'))[1])
363 375
364 if not os.path.exists(path): 376 for i, exp in enumerate(exps):
365 raise IOError("Invalid experiment: %s" % path) 377 sched_file, out_dir = make_paths(exp, out_base, opts)
378 sched_dir = os.path.split(sched_file)[0]
366 379
367 if opts.force and os.path.exists(out_dir): 380 try:
368 shutil.rmtree(out_dir) 381 start_message = "Loading experiment %d of %d." % (i+1, len(exps))
382 exp_params = make_exp_params(opts.scheduler, opts.duration,
383 sched_dir, opts.param_file)
369 384
370 if os.path.isdir(exp): 385 run_experiment(exp, sched_file, exp_params, out_dir,
371 path = "%s/%s" % (path, opts.sched_file) 386 start_message, opts.ignore, jabber)
372 387
373 try:
374 load_experiment(path, scheduler, duration, param_file,
375 out_dir, opts.ignore, jabber)
376 succ += 1 388 succ += 1
377 except ExperimentDone: 389 except ExperimentDone:
390 sys.stderr.write("Experiment '%s' already completed " % exp +
391 "at '%s'\n" % out_base)
378 done += 1 392 done += 1
379 print("Experiment '%s' already completed at '%s'" % (exp, out_base))
380 except (InvalidKernel, InvalidConfig) as e: 393 except (InvalidKernel, InvalidConfig) as e:
394 sys.stderr.write("Invalid environment for experiment '%s'\n" % exp)
395 sys.stderr.write("%s\n" % e)
381 invalid += 1 396 invalid += 1
382 print("Invalid environment for experiment '%s'" % exp)
383 print(e)
384 except KeyboardInterrupt: 397 except KeyboardInterrupt:
385 print("Keyboard interrupt, quitting") 398 sys.stderr.write("Keyboard interrupt, quitting\n")
386 break 399 break
387 except SystemCorrupted as e: 400 except SystemCorrupted as e:
388 print("System is corrupted! Fix state before continuing.") 401 sys.stderr.write("System is corrupted! Fix state before continuing.\n")
389 print(e) 402 sys.stderr.write("%s\n" % e)
390 break 403 break
391 except ExperimentFailed: 404 except Exception as e:
392 print("Failed experiment %s" % exp) 405 sys.stderr.write("Failed experiment %s\n" % exp)
406 sys.stderr.write("%s\n" % e)
393 failed += 1 407 failed += 1
394 408
395 ran += 1 409 ran += 1
@@ -398,7 +412,7 @@ def main():
398 if not os.listdir(out_base) and created and not succ: 412 if not os.listdir(out_base) and created and not succ:
399 os.rmdir(out_base) 413 os.rmdir(out_base)
400 414
401 message = "Experiments ran:\t%d of %d" % (ran, len(args)) +\ 415 message = "Experiments ran:\t%d of %d" % (ran, len(exps)) +\
402 "\n Successful:\t\t%d" % succ +\ 416 "\n Successful:\t\t%d" % succ +\
403 "\n Failed:\t\t%d" % failed +\ 417 "\n Failed:\t\t%d" % failed +\
404 "\n Already Done:\t\t%d" % done +\ 418 "\n Already Done:\t\t%d" % done +\