aboutsummaryrefslogtreecommitdiffstats
path: root/ecrts14/ecrts14.py
diff options
context:
space:
mode:
authorGlenn Elliott <gelliott@cs.unc.edu>2014-01-02 10:06:43 -0500
committerGlenn Elliott <gelliott@cs.unc.edu>2014-01-02 10:20:58 -0500
commitb96db4bf10c0efb4204fa6eeb78926c61b2a4710 (patch)
treeda3cdc9326f8626d19d5f07aa321bb7dcd18d3ee /ecrts14/ecrts14.py
parent005f680d80e7c97ef2a34589cb2baa3e09db9f8e (diff)
Dump of progress.
This ugly patch updates code and checks in overhead data for ludwig.
Diffstat (limited to 'ecrts14/ecrts14.py')
-rwxr-xr-xecrts14/ecrts14.py494
1 files changed, 494 insertions, 0 deletions
diff --git a/ecrts14/ecrts14.py b/ecrts14/ecrts14.py
new file mode 100755
index 0000000..2a81399
--- /dev/null
+++ b/ecrts14/ecrts14.py
@@ -0,0 +1,494 @@
1#!/usr/bin/env python
2
3from __future__ import division
4
5import argparse
6import random
7import sys
8import os
9import math
10import time
11
12#import logging, multiprocessing
13
14from collections import defaultdict
15from csv import DictWriter
16from itertools import product
17from math import ceil
18from multiprocessing import Pool, cpu_count
19from numpy import arange
20from pprint import pprint
21from copy import deepcopy
22import traceback
23
24from schedcat.model.tasks import SporadicTask, TaskSystem
25from schedcat.overheads.model import Overheads, CacheDelay, ConsumerOverheads, ProducerOverheads
26
27import schedcat.model.resources as resources
28import schedcat.generator.tasks as tasks
29import schedcat.mapping.binpack as bp
30
31from schedcat.generator.tasksets import NAMED_UTILIZATIONS
32
33from schedcat.util.storage import storage
34
35from generator import DesignPointGenerator
36from schedcat.stats.stats import proportion_ci
37
38import graph
39import tests
40import topology
41from machines import machines
42
43import gc
44import resource
45
46NAMED_PERIODS_US = {
47 # Named period distributions used in several UNC papers, in microseconds
48 'uni-short' : tasks.uniform_int( 3*1000, 33*1000),
49 'uni-moderate' : tasks.uniform_int(10*1000, 100*1000),
50 'uni-long' : tasks.uniform_int(50*1000, 250*1000),
51}
52
53MIN_SAMPLES = 500
54MAX_SAMPLES = 10000
55#MIN_SAMPLES = 1000
56#MAX_SAMPLES = 10000
57MAX_CI = 0.05
58CONFIDENCE = 0.95
59
60#TOTAL_TESTED = 0
61
62def create_pgm_task_set(dp):
63 tg = tasks.TaskGenerator(period = NAMED_PERIODS_US[dp.period],
64 util = NAMED_UTILIZATIONS[dp.task_util])
65 ts = tg.make_task_set(max_util = dp.sys_util, squeeze = True)
66
67 # swap the squeeze task into random position
68 shuf = random.randint(0, len(ts)-1)
69 ts[-1], ts[shuf] = ts[shuf], ts[-1]
70
71 nrTasks = len(ts)
72 nrGraphs = min(dp.num_graphs(), nrTasks)
73
74 shares = []
75 for i in range(nrGraphs):
76 shares.append(1.0 - random.random())
77
78 weight = sum(shares)
79 shares = [int((s/weight)*nrTasks + 0.5) for s in shares]
80
81 # we may have gained/lost a node due to rounding
82 # add/remove node from any share with space
83 todrop = sum(shares) - nrTasks
84 if todrop > 0:
85 for i in range(todrop):
86 candidates = [i for i,y in enumerate(shares) if y > 1]
87 shares[random.choice(candidates)] -= 1
88 elif todrop < 0:
89 for i in range(-1*todrop):
90 shares[random.randint(0,len(shares)-1)] += 1
91
92 # make sure that no graph has zero nodes
93 # steal from graphs at random
94 nullGraphs = [i for i,y in enumerate(shares) if y == 0]
95 while nullGraphs:
96 stealGraphs = [i for i,y in enumerate(shares) if y > 1]
97 assert stealGraphs
98 shares[random.choice(stealGraphs)] -= 1
99 shares[nullGraphs[-1]] += 1
100 nullGraphs.pop()
101
102 assert sum(shares) == nrTasks
103
104 subtasksets = []
105 count = 0
106 for i in range(nrGraphs):
107 subts = ts[count:count+shares[i]]
108 assert len(subts) > 0
109 subtasksets.append(subts)
110 count += shares[i]
111
112 graphs = []
113 for subts in subtasksets:
114 graphsz = len(subts)
115 min_depth = max(1, int(dp.depth_factor[0] * graphsz))
116 max_depth = max(1, int(dp.depth_factor[1] * graphsz))
117 gg = graph.GraphGenerator(tasks.uniform_int(graphsz, graphsz),
118 tasks.uniform_int(min_depth, max_depth),
119 dp.node_placement,
120 dp.fan_out, dp.fan_in_cap,
121 dp.edge_distance,
122 dp.nr_source, dp.nr_sink,
123 False, False)
124 g = gg.graph()
125
126 assert len(g.nodes) == graphsz
127
128 # asign working sets to each edge
129 for e in g.edges:
130 e.wss = dp.wss()
131
132 # bind tasks/nodes, and compute node wss
133 for i in range(graphsz):
134 g.nodes[i].task = subts[i]
135 subts[i].node = g.nodes[i]
136 subts[i].graph = g
137 # task working set is the sum of inputs
138 subts[i].wss = 0 if len(g.nodes[i].inEdges) == 0 else sum([w.wss for w in g.nodes[i].inEdges])
139
140 # single root
141 root = [r for r in g.nodes if r.isSrc][0]
142
143 # adjust every task to have same execution rate as the source. must
144 # adjust both period and execution time. utilization remains unchanged.
145 for t in subts:
146 if root.task != t:
147 tutil = t.utilization()
148 t.period = root.task.period
149 t.deadline = t.period
150 t.cost = int(t.period * tutil + 0.5)
151
152 graphs.append(g)
153
154 return ts, graphs, subtasksets
155
156def complete(results, n):
157 if n < MIN_SAMPLES:
158 return False
159 elif n > MAX_SAMPLES:
160 return True
161 else:
162 for name, _ in TESTS:
163 if proportion_ci(results[name], n, CONFIDENCE) > MAX_CI:
164 return False
165 return True
166
167def update_mean(old_mean, n, new_sample):
168 return (old_mean*n + new_sample)/(n+1)
169
170def get_consumer_overheads(dp, _system):
171 co_file = 'overheads/%s/consumer/dco_host=%s_lvl=mem_polluters=%s_walk=%s_hpages=%s_upages=%s_type=%s.csv' % (dp.host, dp.host, str(dp.polluters), dp.walk, str(dp.huge_pages), str(dp.uncached), dp.ovh_type)
172 co = ConsumerOverheads.from_file(co_file, non_decreasing=False, system=_system)
173 return co
174
175def get_producer_overheads(dp):
176 po_file = 'overheads/%s/producer/dpo_host=%s_type=%s.csv' % (dp.host, dp.host, dp.ovh_type)
177 po = ProducerOverheads.from_file(po_file, non_decreasing=False)
178 return po
179
180def get_cpmds(dp):
181 cpmd_file = 'overheads/%s/cpmd/dpmo_host=%s_lvl=mem_wcycle=%s_polluters=%s_walk=%s_hpages=%s_upages=%s_type=%s.csv' % (dp.host, dp.host, str(dp.wcycle), str(dp.polluters), dp.walk, str(dp.huge_pages), str(dp.uncached), dp.ovh_type)
182 cpmds = CacheDelay.from_file(cpmd_file, non_decreasing=False)
183 return cpmds
184
185def get_overheads(dp, system = None):
186 cluster_size = dp.processors/dp.nr_clusters
187 max_dist = dp.system.distance(0, cluster_size-1)
188 lvl = dp.system.levels[max_dist]
189 ovh_file = 'overheads/ovh_host=%s_sched=%s_lvl=%s_type=%s.csv' % (dp.host, dp.sched, lvl, dp.ovh_type)
190 ovh = Overheads.from_file(ovh_file)
191 ovh.shared_cache = dp.system.schedcat_distance(0, max_dist)
192 ovh.cache_affinity_loss = get_cpmds(dp)
193 ovh.consumer = get_consumer_overheads(dp, system)
194 ovh.producer = get_producer_overheads(dp)
195 return ovh
196
197def process_dp(dp):
198
199 # kludge in parameters that pickle doesn't like...
200 dp.system = topology.Topology(machines[dp.host])
201
202 dp.num_graphs = graph.binomial(1, 24)
203 dp.depth_factor = [1.0/3.0, 2.0/3.0]
204 dp.node_placement = graph.binomial()
205 dp.fan_out = graph.geometric(1, 3)
206 dp.fan_in_cap = 3
207 dp.edge_distance = graph.geometric(1, 3)
208 dp.nr_source = graph.uniform(1,1)
209 dp.nr_sink = graph.uniform(1,1)
210 dp.wss = tasks.multimodal([(tasks.uniform_int(128,1024), 6), (tasks.uniform_int(2048, 8*1024), 3)])
211
212 results = defaultdict(float)
213 avg_latencies = defaultdict(float)
214 avg_ideal_ratios = defaultdict(float)
215 avg_hrt_ratios = defaultdict(float)
216 nsched = defaultdict(int)
217
218 n = 0
219
220 overheads = get_overheads(dp, dp.system)
221
222 # 512MB
223 upper_memlimit_bytes = 1024*1024*512
224 # ru_maxrss does not go back down, so we can only play this trick once...
225 if gc.isenabled() and resource.getrusage(resource.RUSAGE_SELF).ru_maxrss < upper_memlimit_bytes:
226 gc.disable()
227
228 while not complete(results, n):
229 if (not gc.isenabled()) and (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss > upper_memlimit_bytes):
230 gc.enable()
231
232 ts, graphs, subts = create_pgm_task_set(dp)
233
234 hrt_ideal_response_times = map(graph.compute_hrt_ideal_response_time, graphs)
235 num_graphs = len(graphs)
236 if dp.nr_clusters != 1:
237 for name, test in TESTS:
238 result, processed_ts = test(ts, graphs, subts, dp, overheads)
239 if result:
240 ideal_response_times = map(graph.compute_ideal_response_time, graphs)
241 srt_response_times = map(graph.bound_graph_response_time, graphs)
242 ideal_ratio = 0.0
243 hrt_ratio = 0.0
244 for i, h, s in zip(ideal_response_times, hrt_ideal_response_times, srt_response_times):
245 ideal_ratio += s/i
246 hrt_ratio += s/h
247
248 ideal_ratio /= num_graphs
249 hrt_ratio /= num_graphs
250 avg_latency = sum(srt_response_times)/num_graphs
251 avg_latencies[name] = update_mean(avg_latencies[name], nsched[name], avg_latency)
252 avg_ideal_ratios[name] = update_mean(avg_ideal_ratios[name], nsched[name], ideal_ratio)
253 avg_hrt_ratios[name] = update_mean(avg_hrt_ratios[name], nsched[name], hrt_ratio)
254 nsched[name] += 1
255 results[name] = update_mean(results[name], n, result)
256 # if there is no partitioning, then same results hold for all tests
257 else:
258 result, processed_ts = TESTS[0][1](ts, graphs, subts, dp, overheads)
259 if result:
260 ideal_response_times = map(graph.compute_ideal_response_time, graphs)
261 srt_response_times = map(graph.bound_graph_response_time, graphs)
262 ideal_ratio = 0.0
263 hrt_ratio = 0.0
264 for i, h, s in zip(ideal_response_times, hrt_ideal_response_times, srt_response_times):
265 ideal_ratio += s/i
266 hrt_ratio += s/h
267
268 ideal_ratio /= num_graphs
269 hrt_ratio /= num_graphs
270 avg_latency = sum(srt_response_times)/num_graphs
271 for name, test in TESTS:
272 avg_latencies[name] = update_mean(avg_latencies[name], nsched[name], avg_latency)
273 avg_ideal_ratios[name] = update_mean(avg_ideal_ratios[name], nsched[name], ideal_ratio)
274 avg_hrt_ratios[name] = update_mean(avg_hrt_ratios[name], nsched[name], hrt_ratio)
275 nsched[name] += 1
276
277 for name, test in TESTS:
278 results[name] = update_mean(results[name], n, result)
279
280 n += 1
281# global TOTAL_TESTED
282# TOTAL_TESTED += 1
283# print TOTAL_TESTED
284
285 for name, test in TESTS:
286 if nsched[name] == 0:
287 avg_latencies[name] = -1.0
288 avg_ideal_ratios[name] = -1.0
289 avg_hrt_ratios[name] = -1.0
290
291 del dp.system
292 del dp.num_graphs
293 del dp.depth_factor
294 del dp.node_placement
295 del dp.fan_out
296 del dp.fan_in_cap
297 del dp.edge_distance
298 del dp.nr_source
299 del dp.nr_sink
300 del dp.wss
301
302# return dict(dp.items() + results.items())
303# return dict(dp.items() + results.items()), dict(dp.items() + avg_latencies.items()), dict(dp.items() + avg_ideal_ratios.items()), dict(dp.items() + avg_hrt_ratios.items())
304 return dp, results, avg_latencies, avg_ideal_ratios, avg_hrt_ratios
305
306def valid(dp):
307 return True
308
309TESTS = [
310 ("CacheAgnostic", tests.test_partition_no_cache),
311 ("MaximizeParallelism", tests.test_partition_parallel),
312 ("CacheAware", tests.test_partition_cache_aware),
313 ("CacheAwareEdges", tests.test_partition_cache_aware_edges),
314# ("MaximizeParallelismCacheAware", tests.test_partition_parallel2)
315 ("CacheAwareBFSEdges", tests.test_partition_cache_aware_bfs),
316 ("CacheAwareDFSEdges", tests.test_partition_cache_aware_dfs)
317]
318
319def myrange(start, end, inc):
320 return arange(start, end+inc, inc)
321
322
323def main():
324 random.seed(12345)
325
326 parser = argparse.ArgumentParser()
327 parser.add_argument('-o', "--outfile", type = str,
328 default = "",
329 help = "store results to <filename>.csv")
330 parser.add_argument('-p', "--pretend", action='store_true',
331 help = "Only print design point, do not execute")
332 parser.add_argument('-m', "--processors", default=1, type = int,
333 help="Number of processors to execute on")
334# parser.add_argument('-s', "--model", type = str,
335# default = "",
336# help = "Overhead model of the system")
337 args = parser.parse_args()
338
339 exp = storage()
340 exp.host = ['ludwig']
341
342 cpus = 24.0
343 exp.processors = [cpus]
344# exp.nr_clusters = [24]
345 exp.nr_clusters = [1]
346 exp.sched = ['edf']
347# exp.nr_clusters = [1, 4, 12, 24]
348 exp.task_util = ['uni-medium']
349 exp.period = ['uni-long']
350 exp.sys_util = myrange(1, cpus, 0.1)
351 #
352# exp.num_graphs = [graph.binomial(1, 24)]
353# exp.depth_factor = [[1.0/3.0, 2.0/3.0]]
354# exp.node_placement = [graph.binomial()]
355# exp.fan_out = [graph.geometric(1, 3)]
356# exp.fan_in_cap = [3]
357# exp.edge_distance = [graph.geometric(1, 3)]
358# exp.nr_source = [graph.uniform(1,1)]
359# exp.nr_sink = [graph.uniform(1,1)]
360#
361# exp.wss = [ tasks.multimodal([(tasks.uniform_int(128,1024), 6), (tasks.uniform_int(2048, 8*1024), 3)]) ]
362 exp.wcycle = [ 0 ]
363 exp.walk = ['seq']
364 exp.huge_pages = [False]
365 exp.uncached = [False]
366 exp.polluters = [False]
367 exp.ovh_type = ['max']
368# exp.ovh_type = ['max', 'median', 'mean']
369
370 design_points = [dp for dp in DesignPointGenerator(exp, is_valid = valid)]
371 design_points.reverse()
372
373 # hopefully this makes the % done more indicative of progress.
374# random.shuffle(design_points)
375
376 print "Total design points: ", len(design_points)
377
378 if not args.pretend:
379# if args.outfile == "":
380# sched_out = DictWriter(sys.stdout, exp.keys()+[t[0] for t in TESTS])
381# lat_out = DictWriter(open(os.devnull, 'w'), exp.keys()+[t[0] for t in TESTS])
382# ir_out = DictWriter(open(os.devnull, 'w'), exp.keys()+[t[0] for t in TESTS])
383# hr_out = DictWriter(open(os.devnull, 'w'), exp.keys()+[t[0] for t in TESTS])
384# else:
385# sched_out = DictWriter(open(args.outfile+'_sched.csv', 'w'), exp.keys()+[t[0] for t in TESTS])
386# lat_out = DictWriter(open(args.outfile+'_latency.csv', 'w'), exp.keys()+[t[0] for t in TESTS])
387# ir_out = DictWriter(open(args.outfile+'_idealratio.csv', 'w'), exp.keys()+[t[0] for t in TESTS])
388# hr_out = DictWriter(open(args.outfile+'_hrtratio.csv', 'w'), exp.keys()+[t[0] for t in TESTS])
389#
390# sched_out.writeheader()
391# lat_out.writeheader()
392# ir_out.writeheader()
393# hr_out.writeheader()
394
395 if args.outfile == "":
396 sched_out = sys.stdout
397 lat_out = open(os.devnull, 'w')
398 ir_out = open(os.devnull, 'w')
399 hr_out = open(os.devnull, 'w')
400 else:
401 sched_out = open(args.outfile+'_sched.csv', 'w')
402 lat_out = open(args.outfile+'_latency.csv', 'w')
403 ir_out = open(args.outfile+'_idealratio.csv', 'w')
404 hr_out = open(args.outfile+'_hrtratio.csv', 'w')
405
406 hdr = 'processors,nr_clusters,task_util,period,wcycle,polluters,ovh_type,sys_util'
407 for t in TESTS:
408 hdr += ','+t[0]
409 hdr += '\n'
410
411 sched_out.write(hdr)
412 lat_out.write(hdr)
413 ir_out.write(hdr)
414 hr_out.write(hdr)
415
416 if args.processors > 1:
417 pool = Pool(processes = args.processors)
418# logger = multiprocessing.log_to_stderr()
419# logger.setLevel(multiprocessing.SUBDEBUG)
420 try:
421 for i, row in enumerate(pool.imap_unordered(process_dp, design_points)):
422 if not row:
423 continue
424 if sched_out != sys.stdout:
425 sys.stderr.write('\rdone {0:%}'.format(i/len(design_points)))
426
427 dp, sched, latency, iratio, hrtratio = row
428
429 keys = '%d,%d,%s,%s,%d,%d,%s,%f' % (dp.processors, dp.nr_clusters, dp.task_util, dp.period, dp.wcycle, dp.polluters, dp.ovh_type, dp.sys_util)
430
431 values = ''
432 for t in TESTS:
433 values += ',%f' % sched[t[0]]
434 sched_out.write('%s%s\n' % (keys, values))
435
436 values = ''
437 for t in TESTS:
438 values += ',%f' % latency[t[0]]
439 lat_out.write('%s%s\n' % (keys, values))
440
441 values = ''
442 for t in TESTS:
443 values += ',%f' % iratio[t[0]]
444 ir_out.write('%s%s\n' % (keys, values))
445
446 values = ''
447 for t in TESTS:
448 values += ',%f' % hrtratio[t[0]]
449 hr_out.write('%s%s\n' % (keys, values))
450
451 pool.close()
452
453 except Exception as e:
454 pool.terminate()
455 print e
456 raise
457 else:
458
459 for i, row in enumerate(map(process_dp, design_points)):
460 if not row:
461 continue
462 if sched_out != sys.stdout:
463 sys.stderr.write('\rdone {0:%}'.format(i/len(design_points)))
464
465 dp, sched, latency, iratio, hrtratio = row
466
467 entry_stem = '%d,%d,%s,%s,%d,%d,%s,%f' % (dp.processors, dp.nr_clusters, dp.task_util, dp.period, dp.wcycle, dp.polluters, dp.ovh_type, dp.sys_util)
468
469 sched_out.write(entry_stem)
470 for t in TESTS:
471 sched_out.write(',%f' % sched[t[0]])
472 sched_out.write('\n')
473
474 lat_out.write(entry_stem)
475 for t in TESTS:
476 lat_out.write(',%f' % latency[t[0]])
477 lat_out.write('\n')
478
479 ir_out.write(entry_stem)
480 for t in TESTS:
481 ir_out.write(',%f' % iratio[t[0]])
482 ir_out.write('\n')
483
484 hr_out.write(entry_stem)
485 for t in TESTS:
486 hr_out.write(',%f' % hrtratio[t[0]])
487 hr_out.write('\n')
488
489# global TOTAL_TESTED
490# print 'total tasksets:', TOTAL_TESTED
491
492
493if __name__ == '__main__':
494 main()