aboutsummaryrefslogtreecommitdiffstats
path: root/run/experiment.py
diff options
context:
space:
mode:
Diffstat (limited to 'run/experiment.py')
-rw-r--r--run/experiment.py128
1 files changed, 74 insertions, 54 deletions
diff --git a/run/experiment.py b/run/experiment.py
index e5811f8..ff0e9f3 100644
--- a/run/experiment.py
+++ b/run/experiment.py
@@ -1,7 +1,9 @@
1import common as com
1import os 2import os
2import time 3import time
3import run.litmus_util as lu 4import run.litmus_util as lu
4import shutil as sh 5import shutil as sh
6import traceback
5from operator import methodcaller 7from operator import methodcaller
6 8
7class ExperimentException(Exception): 9class ExperimentException(Exception):
@@ -15,17 +17,12 @@ class ExperimentDone(ExperimentException):
15 def __str__(self): 17 def __str__(self):
16 return "Experiment finished already: %d" % self.name 18 return "Experiment finished already: %d" % self.name
17 19
18
19class ExperimentInterrupted(ExperimentException):
20 '''Raised when an experiment appears to be interrupted (partial results).'''
21 def __str__(self):
22 return "Experiment was interrupted in progress: %d" % self.name
23
24
25class ExperimentFailed(ExperimentException): 20class ExperimentFailed(ExperimentException):
26 def __str__(self): 21 def __str__(self):
27 return "Experiment failed during execution: %d" % self.name 22 return "Experiment failed during execution: %d" % self.name
28 23
24class SystemCorrupted(Exception):
25 pass
29 26
30class Experiment(object): 27class Experiment(object):
31 '''Execute one task-set and save the results. Experiments have unique IDs.''' 28 '''Execute one task-set and save the results. Experiments have unique IDs.'''
@@ -44,6 +41,8 @@ class Experiment(object):
44 self.exec_out = None 41 self.exec_out = None
45 self.exec_err = None 42 self.exec_err = None
46 43
44 self.task_batch = com.num_cpus()
45
47 self.__make_dirs() 46 self.__make_dirs()
48 self.__assign_executable_cwds() 47 self.__assign_executable_cwds()
49 self.__setup_tracers(tracer_types) 48 self.__setup_tracers(tracer_types)
@@ -84,65 +83,67 @@ class Experiment(object):
84 executable.cwd = self.working_dir 83 executable.cwd = self.working_dir
85 map(assign_cwd, self.executables) 84 map(assign_cwd, self.executables)
86 85
87 def __run_tasks(self): 86 def __kill_all(self):
88 already_waiting = lu.waiting_tasks() 87 # Give time for whatever failed to finish failing
88 time.sleep(2)
89
90 released = lu.release_tasks()
91 self.log("Re-released %d tasks" % released)
89 92
90 if already_waiting: 93 time.sleep(5)
91 self.log("Already %d tasks waiting for release!")
92 self.log("Experiment will fail if any of these tasks are released.")
93 94
94 self.log("Starting the programs") 95 self.log("Killing all tasks")
95 for e in self.executables: 96 for e in self.executables:
96 try: 97 try:
98 e.kill()
99 except:
100 pass
101
102 time.sleep(2)
103
104 def __run_tasks(self):
105 self.log("Starting %d tasks" % len(self.executables))
106
107 for i,e in enumerate(self.executables):
108 try:
97 e.execute() 109 e.execute()
98 except: 110 except:
99 raise Exception("Executable failed: %s" % e) 111 raise Exception("Executable failed: %s" % e)
100 112
101 self.log("Sleeping until tasks are ready for release...") 113 self.log("Sleeping until tasks are ready for release...")
102 start = time.clock() 114 start = time.time()
103 while (lu.waiting_tasks() - already_waiting) < len(self.executables): 115 while lu.waiting_tasks() < len(self.executables):
104 if time.clock() - start > 30.0: 116 if time.time() - start > 30.0:
105 raise Exception("Too much time has passed waiting for tasks!") 117 s = "waiting: %d, submitted: %d" %\
118 (lu.waiting_tasks(), len(self.executables))
119 raise Exception("Too much time spent waiting for tasks! %s" % s)
106 time.sleep(1) 120 time.sleep(1)
107 121
108 # Exact tracers (like overheads) must be started right after release or 122 # Exact tracers (like overheads) must be started right after release or
109 # measurements will be full of irrelevant records 123 # measurements will be full of irrelevant records
110 self.log("Starting %d released tracers" % len(self.exact_tracers)) 124 self.log("Starting %d released tracers" % len(self.exact_tracers))
111 map(methodcaller('start_tracing'), self.exact_tracers) 125 map(methodcaller('start_tracing'), self.exact_tracers)
126 time.sleep(1)
112 127
113 self.log("Releasing %d tasks" % len(self.executables)) 128 try:
114 released = lu.release_tasks() 129 self.log("Releasing %d tasks" % len(self.executables))
115
116 ret = True
117 if released != len(self.executables):
118 # Some tasks failed to release, kill all tasks and fail
119 # Need to re-release non-released tasks before we can kill them though
120 self.log("Failed to release {} tasks! Re-releasing and killing".format(
121 len(self.executables) - released, len(self.executables)))
122 time.sleep(5)
123
124 released = lu.release_tasks() 130 released = lu.release_tasks()
125 131
126 self.log("Re-released %d tasks" % released) 132 if released != len(self.executables):
127 133 # Some tasks failed to release, kill all tasks and fail
128 time.sleep(5) 134 # Need to release non-released tasks before they can be killed
129 135 raise Exception("Released %s tasks, expected %s tasks" %
130 self.log("Killing all tasks") 136 (released, len(self.executables)))
131 map(methodcaller('kill'), self.executables)
132 137
133 ret = False 138 self.log("Waiting for program to finish...")
139 for e in self.executables:
140 if not e.wait():
141 raise Exception("Executable %s failed to complete!" % e)
134 142
135 self.log("Waiting for program to finish...") 143 finally:
136 for e in self.executables: 144 # And these must be stopped here for the same reason
137 if not e.wait(): 145 self.log("Stopping exact tracers")
138 ret = False 146 map(methodcaller('stop_tracing'), self.exact_tracers)
139
140 # And these must be stopped here for the same reason
141 self.log("Stopping exact tracers")
142 map(methodcaller('stop_tracing'), self.exact_tracers)
143
144 if not ret:
145 raise ExperimentFailed(self.name)
146 147
147 def __save_results(self): 148 def __save_results(self):
148 os.rename(self.working_dir, self.finished_dir) 149 os.rename(self.working_dir, self.finished_dir)
@@ -151,29 +152,48 @@ class Experiment(object):
151 print("[Exp %s]: %s" % (self.name, msg)) 152 print("[Exp %s]: %s" % (self.name, msg))
152 153
153 def run_exp(self): 154 def run_exp(self):
155 self.__check_system()
156
154 succ = False 157 succ = False
155 try: 158 try:
156 self.setup() 159 self.__setup()
157 160
158 try: 161 try:
159 self.__run_tasks() 162 self.__run_tasks()
160 self.log("Saving results in %s" % self.finished_dir) 163 self.log("Saving results in %s" % self.finished_dir)
161 succ = True 164 succ = True
165 except:
166 traceback.print_exc()
167 self.__kill_all()
168 raise ExperimentFailed(self.name)
162 finally: 169 finally:
163 self.teardown() 170 self.__teardown()
164 finally: 171 finally:
165 self.log("Switching to Linux scheduler") 172 self.log("Switching to Linux scheduler")
166 try: 173
167 lu.switch_scheduler("Linux") 174 # Give the tasks 10 seconds to finish before bailing
168 except: 175 start = time.time()
169 self.log("Failed to switch back to Linux.") 176 while lu.all_tasks() > 0:
177 if time.time() - start < 10.0:
178 raise SystemCorrupted("%d tasks still running!" %
179 lu.all_tasks())
180
181 lu.switch_scheduler("Linux")
170 182
171 if succ: 183 if succ:
172 self.__save_results() 184 self.__save_results()
173 self.log("Experiment done!") 185 self.log("Experiment done!")
174 186
187 def __check_system(self):
188 running = lu.all_tasks()
189 if running:
190 raise SystemCorrupted("%d tasks already running!" % running)
191
192 sched = lu.scheduler()
193 if sched != "Linux":
194 raise SystemCorrupted("Scheduler is %s, should be Linux" % sched)
175 195
176 def setup(self): 196 def __setup(self):
177 self.log("Writing %d proc entries" % len(self.proc_entries)) 197 self.log("Writing %d proc entries" % len(self.proc_entries))
178 map(methodcaller('write_proc'), self.proc_entries) 198 map(methodcaller('write_proc'), self.proc_entries)
179 199
@@ -190,7 +210,7 @@ class Experiment(object):
190 executable.stderr_file = self.exec_err 210 executable.stderr_file = self.exec_err
191 map(set_out, self.executables) 211 map(set_out, self.executables)
192 212
193 def teardown(self): 213 def __teardown(self):
194 self.exec_out and self.exec_out.close() 214 self.exec_out and self.exec_out.close()
195 self.exec_err and self.exec_err.close() 215 self.exec_err and self.exec_err.close()
196 216