aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGlenn Elliott <gelliott@cs.unc.edu>2014-01-21 23:06:05 -0500
committerGlenn Elliott <gelliott@cs.unc.edu>2014-01-21 23:06:05 -0500
commit61d61b72c1bd50365aa5019aac5e104d1438b5fd (patch)
treed521da0a3f0f6d5825f153464720b90b3be6b947
parent111fcad23c49330a24f6f0e93ac98668e2cc6c15 (diff)
parentf73adf2e5e3ae46b1616c08f3b4edd6852afed31 (diff)
Merge branch 'wip-ecrts14-pgm' of ssh://rtsrv.cs.unc.edu/home/litmus/schedcat into wip-ecrts14-pgm
-rwxr-xr-xecrts14/ecrts14.py19
-rwxr-xr-xecrts14/graph.py116
-rwxr-xr-xecrts14/quick.py583
-rw-r--r--schedcat/overheads/model.py62
4 files changed, 728 insertions, 52 deletions
diff --git a/ecrts14/ecrts14.py b/ecrts14/ecrts14.py
index c348091..3cf1389 100755
--- a/ecrts14/ecrts14.py
+++ b/ecrts14/ecrts14.py
@@ -77,6 +77,7 @@ NAMED_HEIGHT_FACTORS = {
77 'uni-short' : [1.0/3.0, 1.0/2.0], 77 'uni-short' : [1.0/3.0, 1.0/2.0],
78 'uni-medium' : [1.0/2.0, 3.0/4.0], 78 'uni-medium' : [1.0/2.0, 3.0/4.0],
79 'uni-tall' : [3.0/4.0, 1.0], 79 'uni-tall' : [3.0/4.0, 1.0],
80 'pipeline' : [1.0, 1.0],
80} 81}
81 82
82NAMED_FAN = { 83NAMED_FAN = {
@@ -321,6 +322,10 @@ def process_dp(_dp):
321 while not complete(__avg_sched, n): 322 while not complete(__avg_sched, n):
322 ts, graphs, subts = create_pgm_task_set(dp) 323 ts, graphs, subts = create_pgm_task_set(dp)
323 324
325 if overheads.consumer is not None:
326 for t in ts:
327 overheads.consumer.place_production(t)
328
324 num_graphs = len(graphs) 329 num_graphs = len(graphs)
325 avg_depth = sum([g.depth for g in graphs])/float(num_graphs) 330 avg_depth = sum([g.depth for g in graphs])/float(num_graphs)
326 avg_graph_size = sum([len(g.nodes) for g in graphs])/float(num_graphs) 331 avg_graph_size = sum([len(g.nodes) for g in graphs])/float(num_graphs)
@@ -335,8 +340,8 @@ def process_dp(_dp):
335 this_method = {} 340 this_method = {}
336 this_method['sched'] = is_sched 341 this_method['sched'] = is_sched
337 if is_sched: 342 if is_sched:
338 this_method['latencies'] = map(graph.bound_graph_response_time, graphs) 343 this_method['latencies'] = map(graph.bound_graph_latency, graphs)
339 this_method['ideal_latencies'] = map(graph.compute_ideal_response_time, graphs) 344 this_method['ideal_latencies'] = map(graph.compute_ideal_graph_latency, graphs)
340 this_task_set[method] = this_method 345 this_task_set[method] = this_method
341 else: 346 else:
342 # global. no partitioning. all methods equivelent 347 # global. no partitioning. all methods equivelent
@@ -345,8 +350,8 @@ def process_dp(_dp):
345 this_method = {} 350 this_method = {}
346 this_method['sched'] = is_sched 351 this_method['sched'] = is_sched
347 if is_sched: 352 if is_sched:
348 this_method['latencies'] = map(graph.bound_graph_response_time, graphs) 353 this_method['latencies'] = map(graph.bound_graph_latency, graphs)
349 this_method['ideal_latencies'] = map(graph.compute_ideal_response_time, graphs) 354 this_method['ideal_latencies'] = map(graph.compute_ideal_graph_latency, graphs)
350 for method, _, _ in TESTS: 355 for method, _, _ in TESTS:
351 this_task_set[method] = this_method 356 this_task_set[method] = this_method
352 357
@@ -354,7 +359,7 @@ def process_dp(_dp):
354 all_sched = True if num_method_sched == n_methods else False 359 all_sched = True if num_method_sched == n_methods else False
355 360
356 if all_sched: 361 if all_sched:
357 hrt_ideal_response_times = map(graph.compute_hrt_ideal_response_time, graphs) 362 hrt_ideal_response_times = map(graph.compute_hrt_ideal_graph_latency, graphs)
358 363
359 # they're all schedulable, so compute graph latencies 364 # they're all schedulable, so compute graph latencies
360 # redo with job splitting 365 # redo with job splitting
@@ -363,12 +368,12 @@ def process_dp(_dp):
363 # redo test to get the split-based latency 368 # redo test to get the split-based latency
364 dp.job_splitting = True 369 dp.job_splitting = True
365 is_sched, processed_ts = test(ts, graphs, subts, dp, overheads) 370 is_sched, processed_ts = test(ts, graphs, subts, dp, overheads)
366 this_task_set[method]['split_latencies'] = map(graph.bound_graph_response_time, graphs) 371 this_task_set[method]['split_latencies'] = map(graph.bound_graph_latency, graphs)
367 else: 372 else:
368 # global. no partitioning. all methods equivelent 373 # global. no partitioning. all methods equivelent
369 dp.job_splitting = True 374 dp.job_splitting = True
370 is_sched, processed_ts = TESTS[0][2](ts, graphs, subts, dp, overheads) 375 is_sched, processed_ts = TESTS[0][2](ts, graphs, subts, dp, overheads)
371 split_lat = map(graph.bound_graph_response_time, graphs) 376 split_lat = map(graph.bound_graph_latency, graphs)
372 for method, _, _ in TESTS: 377 for method, _, _ in TESTS:
373 this_task_set[method]['split_latencies'] = split_lat 378 this_task_set[method]['split_latencies'] = split_lat
374 379
diff --git a/ecrts14/graph.py b/ecrts14/graph.py
index 5006033..95e63cb 100755
--- a/ecrts14/graph.py
+++ b/ecrts14/graph.py
@@ -57,7 +57,7 @@ class node:
57 return len(self.inEdges) + len(self.outEdges) 57 return len(self.inEdges) + len(self.outEdges)
58 58
59 def __repr__(self): 59 def __repr__(self):
60 graph_id = self.graph.id if self.graph and hasattr(self.graph.id) else -1 60 graph_id = self.graph.id if self.graph and hasattr(self.graph, 'id') else -1
61 stem = 'node_%s(gid:%d,l:%d,src:%d,sink:%d,spine:%d)' % (self.id, graph_id, self.privLevel, self.isSrc, self.isSink, self.isSpine) 61 stem = 'node_%s(gid:%d,l:%d,src:%d,sink:%d,spine:%d)' % (self.id, graph_id, self.privLevel, self.isSrc, self.isSink, self.isSpine)
62 pred_str = 'preds{' 62 pred_str = 'preds{'
63 for n in self.pred: 63 for n in self.pred:
@@ -107,6 +107,8 @@ class graph:
107 self.edges = [] 107 self.edges = []
108 self.nodesAtLevel = {} 108 self.nodesAtLevel = {}
109 self.depth = 0 109 self.depth = 0
110 # assume a sporadic release from sources
111 self.isSporadic = True
110 112
111 def __repr__(self): 113 def __repr__(self):
112 s = '' 114 s = ''
@@ -142,38 +144,102 @@ class graph:
142 outs.write('}') 144 outs.write('}')
143 return outs.getvalue() 145 return outs.getvalue()
144 146
145def bound_graph_response_time(g): 147def bound_graph_latency(g):
148 sporadic_latency = 0
149 graph_latency = 0
150
146 if len(g.nodes) == 1: 151 if len(g.nodes) == 1:
147 g.response_time = g.nodes[0].task.response_time 152 sporadic_latency = g.nodes[0].task.response_time
148 else: 153 else:
154 # We assume all nodes share a period
155 period = g.nodes[0].task.period
156
149 for n in g.nodes: 157 for n in g.nodes:
150 n.latency = 0.0 if not n.isSrc else n.task.response_time 158 n.latency = 0 if not n.isSrc else max(period, n.task.response_time)
151 n.isQueued = False 159 n.isQueued = False
152 160 for e in g.edges:
153 queue = g.sources[:] 161 e.longest = False
154 162
163 # accumulate latencies down the graph
164 queue = g.sources[:]
155 while len(queue) != 0: 165 while len(queue) != 0:
156 # breadth-first propagation of latencies from srcs to sinks 166 # breadth-first propagation of latencies from srcs to sinks
157 v = queue.pop(0) 167 v = queue.pop(0)
158 v.isQueued = False 168 v.isQueued = False
159 for e in v.outEdges: 169 for e in v.outEdges:
160 latency = v.latency + e.s.task.response_time 170 latency = v.latency + max(period, e.s.task.response_time)
161 # if we updated the latency, then we need to revisit the node 171 # if we updated the latency, then we need to revisit the node
162 if latency > e.s.latency: 172 if latency > e.s.latency:
173 # clear out old longest
174 cur_longest = [e for e in e.s.inEdges if e.longest == True]
175 if len(cur_longest) > 0:
176 assert len(cur_longest) == 1
177 cur_longest[0].longest = False
178 # set new longest
179 e.longest = True
163 e.s.latency = latency 180 e.s.latency = latency
164 if e.s.isQueued == False: 181 if e.s.isQueued == False:
165 e.s.isQueued = True 182 e.s.isQueued = True
166 queue.append(e.s) 183 queue.append(e.s)
167 g.response_time = max(g.sinks, key=lambda n: n.latency).latency
168 assert g.response_time != 0.0
169 return g.response_time
170 184
171def compute_ideal_response_time(g): 185 max_sink = max(g.sinks, key=lambda n: n.latency)
186 longest_path = []
187 longest_path.append(max_sink)
188 queue.append(max_sink)
189 while len(queue) != 0:
190 v = queue.pop(0)
191 if v.inEdges:
192 longest_edge = [e for e in v.inEdges if e.longest is True]
193 assert len(longest_edge) == 1
194 longest_edge = longest_edge[0]
195 # prepend to the path
196 longest_path.insert(0, longest_edge.p)
197 queue.append(longest_edge.p)
198 else:
199 assert v.isSrc
200 assert len(longest_path) <= g.depth
201
202 depth_latency = len(longest_path) * period
203 sporadic_latency = max_sink.latency - depth_latency
204
205 if g.isSporadic:
206 # factor the accumlated latency into grapth-depth and
207 # tardiness-based components
208
209 if max_sink.task.response_time < period:
210 # Optimization:
211 # * Remove one period from depth_latency
212 # * Add response time of sink to depth_latency
213 # (We don't modify sporadic_latency since we know the sink's
214 # contribition was 0.)
215 graph_latency = depth_latency - period + max_sink.task.response_time
216 else:
217 graph_latency = depth_latency
218 else:
219 # TODO: Optimize of the sink tasks response time
220 # Rate-based bound:
221 graph_latency = 4 * depth_latency
222
223 assert sporadic_latency >= 0
224 assert graph_latency >= 0
225
226# print 'depth: \t',g.depth
227# print 'period: \t',period
228# print 'd*p: \t',g.depth * period
229# print 'combined: \t', sporadic_latency + graph_latency
230# print 'sporadic latency:\t', sporadic_latency
231# print 'graph latency: \t', graph_latency
232# raw_input('press enter')
233
234 return sporadic_latency + graph_latency
235
236def compute_ideal_graph_latency(g):
237 graph_latency = 0
172 if len(g.nodes) == 1: 238 if len(g.nodes) == 1:
173 g.response_time = g.nodes[0].task.cost 239 graph_latency = g.nodes[0].task.cost
174 else: 240 else:
175 for n in g.nodes: 241 for n in g.nodes:
176 n.latency = 0.0 if not n.isSrc else n.task.cost 242 n.latency = 0 if not n.isSrc else n.task.cost
177 n.isQueued = False 243 n.isQueued = False
178 244
179 queue = g.sources[:] 245 queue = g.sources[:]
@@ -190,21 +256,17 @@ def compute_ideal_response_time(g):
190 if e.s.isQueued == False: 256 if e.s.isQueued == False:
191 e.s.isQueued = True 257 e.s.isQueued = True
192 queue.append(e.s) 258 queue.append(e.s)
193 g.response_time = max(g.sinks, key=lambda n: n.latency).latency 259 graph_latency = max(g.sinks, key=lambda n: n.latency).latency
194 if g.response_time == 0: 260 assert graph_latency > 0
195 if len(g.nodes) == 1: 261 return graph_latency
196 print 'single-node graph: node info:',g.nodes[0],' task info: ',g.nodes[0].task
197 else:
198 print 'multi-node graph...'
199 assert g.response_time != 0
200 return g.response_time
201 262
202def compute_hrt_ideal_response_time(g): 263def compute_hrt_ideal_graph_latency(g):
264 graph_latency = 0
203 if len(g.nodes) == 1: 265 if len(g.nodes) == 1:
204 g.response_time = g.nodes[0].task.deadline 266 graph_latency = g.nodes[0].task.deadline
205 else: 267 else:
206 for n in g.nodes: 268 for n in g.nodes:
207 n.latency = 0.0 if not n.isSrc else n.task.deadline 269 n.latency = 0 if not n.isSrc else n.task.deadline
208 n.isQueued = False 270 n.isQueued = False
209 271
210 queue = g.sources[:] 272 queue = g.sources[:]
@@ -221,9 +283,9 @@ def compute_hrt_ideal_response_time(g):
221 if e.s.isQueued == False: 283 if e.s.isQueued == False:
222 e.s.isQueued = True 284 e.s.isQueued = True
223 queue.append(e.s) 285 queue.append(e.s)
224 g.response_time = max(g.sinks, key=lambda n: n.latency).latency 286 graph_latency = max(g.sinks, key=lambda n: n.latency).latency
225 assert g.response_time != 0.0 287 assert graph_latency > 0
226 return g.response_time 288 return graph_latency
227 289
228def link(up, down): 290def link(up, down):
229 up.succ.append(down) 291 up.succ.append(down)
diff --git a/ecrts14/quick.py b/ecrts14/quick.py
new file mode 100755
index 0000000..560b365
--- /dev/null
+++ b/ecrts14/quick.py
@@ -0,0 +1,583 @@
1#!/usr/bin/env python
2
3from __future__ import division
4
5import argparse
6import random
7import sys
8import os
9import math
10import time
11import inspect
12
13import sqlite3 as lite
14import json
15
16import copy
17from collections import defaultdict
18from csv import DictWriter
19from itertools import product
20from math import ceil
21from multiprocessing import Pool, cpu_count
22from numpy import arange
23from pprint import pprint
24import traceback
25
26from schedcat.model.tasks import SporadicTask, TaskSystem
27from schedcat.overheads.model import Overheads, CacheDelay, ConsumerOverheads, ProducerOverheads
28
29import schedcat.model.resources as resources
30import schedcat.generator.tasks as tasks
31import schedcat.mapping.binpack as bp
32
33from schedcat.generator.tasksets import NAMED_UTILIZATIONS
34
35from schedcat.util.storage import storage
36
37from generator import DesignPointGenerator
38from schedcat.stats.stats import proportion_ci
39
40import graph
41import tests
42import topology
43from machines import machines
44
45#import gc
46#import resource
47import traceback
48
49import database as db
50
51NAMED_PERIODS_US = {
52 # Named period distributions used in several UNC papers, in microseconds
53 'uni-short' : tasks.uniform_int( 3*1000, 33*1000),
54 'uni-moderate' : tasks.uniform_int(10*1000, 100*1000),
55 'uni-long' : tasks.uniform_int(50*1000, 250*1000),
56}
57
58#based off of a 24-core system
59# fewer graphs = harder partitioning
60NAMED_NUM_GRAPHS = {
61 'uni-many' : graph.uniform(24, 24*3),
62 'uni-medium' : graph.uniform(12, 24),
63 'uni-few' : graph.uniform(1,12),
64
65 'bimo-many' : graph.binomial(24, 24*3),
66 'bimo-medium' : graph.binomial(12, 24),
67 'bimo-few' : graph.binomial(1,12),
68}
69
70NAMED_SHAPES = {
71 'uniform' : graph.uniform(),
72 'binomial' : graph.binomial(),
73# 'geometric': graph.geometric(),
74}
75
76NAMED_HEIGHT_FACTORS = {
77 'uni-short' : [1.0/3.0, 1.0/2.0],
78 'uni-medium' : [1.0/2.0, 3.0/4.0],
79 'uni-tall' : [3.0/4.0, 1.0],
80}
81
82NAMED_FAN = {
83 'none' : graph.uniform(1,1),
84 'uniform_3' : graph.uniform(1,3),
85 'uniform_6' : graph.uniform(1,6),
86 'geometric_3' : graph.geometric(1,3),
87 'geometric_6' : graph.geometric(1,3),
88}
89
90NAMED_EDGE_HOP = {
91 'none' : graph.uniform(1,1),
92 'uniform_3' : graph.uniform(1,3),
93 'uniform_deep' : graph.uniform(1,100),
94 'geometric_3': graph.geometric(1,3),
95}
96
97NAMED_EDGE_WSS = {
98 'uni-light' : tasks.uniform_int(1, 64),
99 'uni-medium' : tasks.uniform_int(256, 1024),
100 'uni-heavy' : tasks.uniform_int(2*1024, 8*1024),
101
102 'bimo-light' : tasks.multimodal([(tasks.uniform_int(64,256), 8), (tasks.uniform_int(2*1024, 8*1024), 1)]),
103 'bimo-medium' : tasks.multimodal([(tasks.uniform_int(64,256), 6), (tasks.uniform_int(2*1024, 8*1024), 3)]),
104 'bimo-heavy' : tasks.multimodal([(tasks.uniform_int(64,256), 4), (tasks.uniform_int(2*1024, 8*1024), 5)]),
105}
106
107
108TESTS = [
109 (0, "CacheAgnostic", tests.test_partition_no_cache),
110 (1, "MaximizeParallelism", tests.test_partition_parallel),
111 (2, "CacheAware", tests.test_partition_cache_aware),
112 (3, "CacheAwareEdges", tests.test_partition_cache_aware_edges),
113 (4, "CacheAwareBFSEdges", tests.test_partition_cache_aware_bfs),
114 (5, "CacheAwareDFSEdges", tests.test_partition_cache_aware_dfs)
115# (6, "MaximizeParallelismCacheAware", tests.test_partition_parallel2)
116]
117
118MIN_SAMPLES = 5
119MAX_SAMPLES = 10
120#MIN_SAMPLES = 200
121#MAX_SAMPLES = 500
122#MIN_SAMPLES = 1000
123#MAX_SAMPLES = 10000
124MAX_CI = 0.05
125CONFIDENCE = 0.95
126
127#TOTAL_TESTED = 0
128
129def create_pgm_task_set(dp):
130 tg = tasks.TaskGenerator(period = NAMED_PERIODS_US[dp.period],
131 util = NAMED_UTILIZATIONS[dp.task_util])
132 ts = tg.make_task_set(max_util = dp.sys_util, squeeze = True)
133
134 # swap the squeeze task into random position
135 shuf = random.randint(0, len(ts)-1)
136 ts[-1], ts[shuf] = ts[shuf], ts[-1]
137
138 nrTasks = len(ts)
139 nrGraphs = min(dp.num_graphs(), nrTasks)
140
141 shares = []
142 for i in range(nrGraphs):
143 shares.append(1.0 - random.random())
144
145 weight = sum(shares)
146 shares = [int((s/weight)*nrTasks + 0.5) for s in shares]
147
148 # we may have gained/lost a node due to rounding
149 # add/remove node from any share with space
150 todrop = sum(shares) - nrTasks
151 if todrop > 0:
152 for i in range(todrop):
153 candidates = [i for i,y in enumerate(shares) if y > 1]
154 shares[random.choice(candidates)] -= 1
155 elif todrop < 0:
156 for i in range(-1*todrop):
157 shares[random.randint(0,len(shares)-1)] += 1
158
159 # make sure that no graph has zero nodes
160 # steal from graphs at random
161 nullGraphs = [i for i,y in enumerate(shares) if y == 0]
162 while nullGraphs:
163 stealGraphs = [i for i,y in enumerate(shares) if y > 1]
164 assert stealGraphs
165 shares[random.choice(stealGraphs)] -= 1
166 shares[nullGraphs[-1]] += 1
167 nullGraphs.pop()
168
169 assert sum(shares) == nrTasks
170
171 subtasksets = []
172 count = 0
173 for i in range(nrGraphs):
174 subts = ts[count:count+shares[i]]
175 assert len(subts) > 0
176 subtasksets.append(subts)
177 count += shares[i]
178
179 graphs = []
180 for subts in subtasksets:
181 graphsz = len(subts)
182 min_depth = max(1, int(dp.depth_factor[0] * graphsz))
183 max_depth = max(1, int(dp.depth_factor[1] * graphsz))
184 gg = graph.GraphGenerator(tasks.uniform_int(graphsz, graphsz),
185 tasks.uniform_int(min_depth, max_depth),
186 dp.node_placement,
187 dp.fan_out, dp.fan_in_cap,
188 dp.edge_distance,
189 dp.nr_source, dp.nr_sink,
190 False, False)
191 g = gg.graph()
192
193 assert len(g.nodes) == graphsz
194
195 # asign working sets to each edge
196 for e in g.edges:
197 e.wss = dp.wss()
198
199 # bind tasks/nodes, and compute node wss
200 for i in range(graphsz):
201 g.nodes[i].task = subts[i]
202 subts[i].node = g.nodes[i]
203 subts[i].graph = g
204 # task working set is the sum of inputs
205 subts[i].wss = 0 if len(g.nodes[i].inEdges) == 0 else sum([w.wss for w in g.nodes[i].inEdges])
206
207 # single root
208 root = [r for r in g.nodes if r.isSrc][0]
209
210 # adjust every task to have same execution rate as the source. must
211 # adjust both period and execution time. utilization remains unchanged.
212 for t in subts:
213 if root.task != t:
214 tutil = t.utilization()
215 t.period = root.task.period
216 t.deadline = t.period
217 t.cost = int(t.period * tutil + 0.5)
218
219 graphs.append(g)
220
221 # tag each graph with an id
222 for i,g in enumerate(graphs):
223 g.id = i
224
225 return ts, graphs, subtasksets
226
227def complete(results, n):
228 if n < MIN_SAMPLES:
229 return False
230 elif n > MAX_SAMPLES:
231 return True
232 else:
233 for method, _, _ in TESTS:
234 if proportion_ci(results[method], n, CONFIDENCE) > MAX_CI:
235 return False
236 return True
237
238def update_mean(old_mean, n, new_sample):
239 return (old_mean*n + new_sample)/(n+1)
240
241def get_ovh_dir():
242 parent = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
243 ovh_dir = os.path.join(parent, 'overheads')
244 return ovh_dir
245
246def get_consumer_overheads(dp, _system):
247 co_file = '%s/consumer/dco_host=%s_lvl=mem_polluters=%s_walk=%s_hpages=%s_upages=%s_type=%s.csv' % (dp.host, dp.host, str(dp.polluters), dp.walk, str(dp.huge_pages), str(dp.uncached), dp.ovh_type)
248 co_file = os.path.join(get_ovh_dir(), co_file)
249 co = ConsumerOverheads.from_file(co_file, non_decreasing=False, system=_system)
250 return co
251
252def get_producer_overheads(dp):
253 po_file = '%s/producer/dpo_host=%s_type=%s.csv' % (dp.host, dp.host, dp.ovh_type)
254 po_file = os.path.join(get_ovh_dir(), po_file)
255 po = ProducerOverheads.from_file(po_file, non_decreasing=False)
256 return po
257
258def get_cpmds(dp):
259 cpmd_file = '%s/cpmd/dpmo_host=%s_lvl=mem_wcycle=%s_polluters=%s_walk=%s_hpages=%s_upages=%s_type=%s.csv' % (dp.host, dp.host, str(dp.wcycle), str(dp.polluters), dp.walk, str(dp.huge_pages), str(dp.uncached), dp.ovh_type)
260 cpmd_file = os.path.join(get_ovh_dir(), cpmd_file)
261 cpmds = CacheDelay.from_file(cpmd_file, non_decreasing=False)
262 return cpmds
263
264def get_overheads(dp, system = None):
265 cluster_size = dp.processors/dp.nr_clusters
266 max_dist = dp.system.distance(0, cluster_size-1)
267 lvl = dp.system.levels[max_dist]
268 max_wss = dp.system.max_wss()
269 ovh_file = 'ovh_host=%s_sched=%s_lvl=%s_type=%s.csv' % (dp.host, dp.sched, lvl, dp.ovh_type)
270 ovh_file = os.path.join(get_ovh_dir(), ovh_file)
271 ovh = Overheads.from_file(ovh_file)
272 ovh.shared_cache = dp.system.schedcat_distance(0, max_dist)
273 ovh.cache_affinity_loss = get_cpmds(dp)
274 ovh.cache_affinity_loss.set_max_wss(max_wss)
275 ovh.consumer = get_consumer_overheads(dp, system)
276 ovh.producer = get_producer_overheads(dp)
277 return ovh
278
279def process_dp(_dp):
280
281 dp = copy.deepcopy(_dp)
282
283 # kludge in parameters that pickle doesn't like...
284 dp.system = topology.Topology(machines[dp.host])
285
286 # convert names to distributions
287 dp.num_graphs = NAMED_NUM_GRAPHS[dp.num_graphs]
288 dp.depth_factor = NAMED_HEIGHT_FACTORS[dp.depth_factor]
289 dp.node_placement = NAMED_SHAPES[dp.node_placement]
290 dp.fan_out = NAMED_FAN[dp.fan_out]
291 dp.edge_distance = NAMED_EDGE_HOP[dp.edge_distance]
292 dp.wss = NAMED_EDGE_WSS[dp.wss]
293
294 # slam in unchaging values
295 dp.nr_source = graph.uniform(1,1)
296 dp.nr_sink = graph.uniform(1,1)
297 dp.uncached = False
298 dp.huge_pages = False
299 dp.sched = 'edf'
300 dp.walk = 'seq'
301
302 __avg_sched = defaultdict(float)
303
304 __avg_ts_size = defaultdict(float)
305 __avg_nr_graphs = defaultdict(float)
306 __avg_graph_size = defaultdict(float)
307 __avg_k = defaultdict(float)
308 __avg_latencies = defaultdict(float)
309 __avg_tard_ratios = defaultdict(float)
310 __avg_hrt_ratios = defaultdict(float)
311 __avg_split_latencies = defaultdict(float)
312 __avg_split_tard_ratios = defaultdict(float)
313 __avg_split_hrt_ratios = defaultdict(float)
314
315 n_methods = len(TESTS)
316 n = 0
317 n_all_sched = 0
318
319 overheads = get_overheads(dp, dp.system)
320
321 while not complete(__avg_sched, n):
322 ts, graphs, subts = create_pgm_task_set(dp)
323
324 if overheads.consumer is not None:
325 for t in ts:
326 overheads.consumer.place_production(t)
327
328 num_graphs = len(graphs)
329 avg_depth = sum([g.depth for g in graphs])/float(num_graphs)
330 avg_graph_size = sum([len(g.nodes) for g in graphs])/float(num_graphs)
331 hrt_ideal_response_times = None
332
333 this_task_set = {}
334
335 if dp.nr_clusters != 1:
336 for method, _, test in TESTS:
337 dp.job_splitting = False
338 is_sched, processed_ts = test(ts, graphs, subts, dp, overheads)
339 this_method = {}
340 this_method['sched'] = is_sched
341 if is_sched:
342 this_method['latencies'] = map(graph.bound_graph_latency, graphs)
343 this_method['ideal_latencies'] = map(graph.compute_ideal_graph_latency, graphs)
344 this_task_set[method] = this_method
345 else:
346 # global. no partitioning. all methods equivelent
347 dp.job_splitting = False
348 is_sched, processed_ts = TESTS[0][2](ts, graphs, subts, dp, overheads)
349 this_method = {}
350 this_method['sched'] = is_sched
351 if is_sched:
352 this_method['latencies'] = map(graph.bound_graph_latency, graphs)
353 this_method['ideal_latencies'] = map(graph.compute_ideal_graph_latency, graphs)
354 for method, _, _ in TESTS:
355 this_task_set[method] = this_method
356
357 num_method_sched = sum([1 for sched_data in this_task_set.itervalues() if sched_data['sched'] == True])
358 all_sched = True if num_method_sched == n_methods else False
359
360 if all_sched:
361 hrt_ideal_response_times = map(graph.compute_hrt_ideal_graph_latency, graphs)
362
363 # they're all schedulable, so compute graph latencies
364 # redo with job splitting
365 if dp.nr_clusters != 1:
366 for method, _, test in TESTS:
367 # redo test to get the split-based latency
368 dp.job_splitting = True
369 is_sched, processed_ts = test(ts, graphs, subts, dp, overheads)
370 this_task_set[method]['split_latencies'] = map(graph.bound_graph_latency, graphs)
371 else:
372 # global. no partitioning. all methods equivelent
373 dp.job_splitting = True
374 is_sched, processed_ts = TESTS[0][2](ts, graphs, subts, dp, overheads)
375 split_lat = map(graph.bound_graph_latency, graphs)
376 for method, _, _ in TESTS:
377 this_task_set[method]['split_latencies'] = split_lat
378
379 # process the results
380
381 for method, sched_data in this_task_set.iteritems():
382 is_sched = sched_data['sched']
383 __avg_sched[method] = update_mean(__avg_sched[method], n, is_sched)
384
385 # only include latency data for task sets that were schedulable for all methods
386 if all_sched:
387 avg_tard_ratio = 0.0
388 avg_hrt_tard_ratio = 0.0
389 avg_split_tard_ratio = 0.0
390 avg_split_hrt_tard_ratio = 0.0
391 for latency, split_latency, ideal_latency, hrt_latency in zip(sched_data['latencies'], sched_data['split_latencies'], sched_data['ideal_latencies'], hrt_ideal_response_times):
392 if ideal_latency == 0.0:
393 print 'ecrts14.py: bad latency. latency values:',sched_data
394 avg_tard_ratio += (latency / ideal_latency)
395 avg_hrt_tard_ratio += (latency / hrt_latency)
396 avg_split_tard_ratio += (split_latency / ideal_latency)
397 avg_split_hrt_tard_ratio += (split_latency / hrt_latency)
398
399 avg_latency = sum(sched_data['latencies'])/float(num_graphs)
400 avg_tard_ratio /= float(num_graphs)
401 avg_hrt_tard_ratio /= float(num_graphs)
402 avg_split_latency = sum(sched_data['split_latencies'])/float(num_graphs)
403 avg_split_tard_ratio /= float(num_graphs)
404 avg_split_hrt_tard_ratio /= float(num_graphs)
405
406 __avg_latencies[method] = update_mean(__avg_latencies[method], n_all_sched, avg_latency)
407 __avg_tard_ratios[method] = update_mean(__avg_tard_ratios[method], n_all_sched, avg_tard_ratio)
408 __avg_hrt_ratios[method] = update_mean(__avg_hrt_ratios[method], n_all_sched, avg_hrt_tard_ratio)
409 __avg_split_latencies[method] = update_mean(__avg_split_latencies[method], n_all_sched, avg_split_latency)
410 __avg_split_tard_ratios[method] = update_mean(__avg_split_tard_ratios[method], n_all_sched, avg_split_tard_ratio)
411 __avg_split_hrt_ratios[method] = update_mean(__avg_split_hrt_ratios[method], n_all_sched, avg_split_hrt_tard_ratio)
412
413 # we could share these values across all methods
414 __avg_ts_size[method] = update_mean(__avg_ts_size[method], n_all_sched, len(ts))
415 __avg_nr_graphs[method] = update_mean(__avg_nr_graphs[method], n_all_sched, num_graphs)
416 __avg_graph_size[method] = update_mean(__avg_graph_size[method], n_all_sched, avg_graph_size)
417 __avg_k[method] = update_mean(__avg_k[method], n_all_sched, avg_depth)
418
419 if all_sched:
420 n_all_sched += 1
421 n += 1
422
423 if n_all_sched == 0:
424 for method, _, _ in TESTS:
425 __avg_latencies[method] = -1.0
426 __avg_tard_ratios[method] = -1.0
427 __avg_hrt_ratios[method] = -1.0
428 __avg_split_latencies[method] = -1.0
429 __avg_split_tard_ratios[method] = -1.0
430 __avg_split_hrt_ratios[method] = -1.0
431 __avg_ts_size[method] = 0.0
432 __avg_nr_graphs[method] = 0.0
433 __avg_graph_size[method] = 0.0
434 __avg_k[method] = 0.0
435
436 return __avg_sched, __avg_latencies, __avg_tard_ratios, __avg_hrt_ratios, __avg_split_latencies, __avg_split_tard_ratios, __avg_split_hrt_ratios, __avg_ts_size, __avg_nr_graphs, __avg_graph_size, __avg_k
437
438def process_design_points(args):
439 chunk_size = 1
440 try:
441 (worker_id, db_name) = args
442 nr_processed = 0
443 __processed_dps = []
444 __results = []
445 while True:
446 dps = db.get_design_points(db_name, nr_dp = chunk_size)
447 if not dps or not len(dps):
448 break
449 for dp in dps:
450 print '%d starting dp' % worker_id
451 (avg_sched, avg_lat, avg_tard_ratio, avg_hrt_tard_ratio, avg_split_lat, avg_split_tard_ratio, avg_split_hrt_tard_ratio, avg_ts_size, avg_nr_graphs, avg_size, avg_k) = process_dp(dp)
452 print '%d finished dp' % worker_id
453
454 sched_data = {}
455 for m, _, _ in TESTS:
456 results = storage()
457 results.avg_sched = avg_sched[m]
458 results.avg_latency = avg_lat[m]
459 results.avg_tard_ratio = avg_tard_ratio[m]
460 results.avg_hrt_tard_ratio = avg_hrt_tard_ratio[m]
461 results.avg_split_latency = avg_split_lat[m]
462 results.avg_split_tard_ratio = avg_split_tard_ratio[m]
463 results.avg_split_hrt_tard_ratio = avg_split_hrt_tard_ratio[m]
464 results.avg_ts_size = avg_ts_size[m]
465 results.avg_nr_graphs = avg_nr_graphs[m]
466 results.avg_graph_size = avg_size[m]
467 results.avg_k = avg_k[m]
468 sched_data[m] = results
469
470 __processed_dps.append(dp)
471 __results.append(sched_data)
472 nr_processed += 1
473 if len(__processed_dps):
474 assert len(__processed_dps) == len(__results)
475 db.store_sched_results(db_name, __processed_dps, __results)
476 except lite.OperationalError:
477 print "CRAP. Database Error!"
478 print traceback.format_exc()
479 return nr_processed
480
481def valid(dp):
482 return True
483
484# TODO:
485#XXX 1. Track average graph size.
486#XXX 2. Increase minimum number of task sets
487#XXX 3. Remove 'mean' overhead type
488#XXX 4. Explore more branchy graphs
489#XXX 5. Pick one heur_aggress value (0.75)
490#XXX 6. Add wss parameters.
491#XXX 7. Remove polluters (for now).
492# 8. Why are graphs so shallow?
493#XXX 9. Job splitting
494
495def main():
496 random.seed(12345)
497
498 parser = argparse.ArgumentParser()
499 parser.add_argument('-p', "--pretend", action='store_true',
500 help = "Only print design point, do not execute")
501 parser.add_argument('-m', "--processors", default=1, type = int,
502 help="Number of processors to execute on")
503 parser.add_argument('-d', "--database", type = str,
504 default = "",
505 help = "Database for holding experiment data")
506 parser.add_argument('--initonly', action='store_true',
507 help = "Only store design points to database")
508 parser.add_argument('--worker', action='store_true',
509 help = "Only process design points from database")
510 parser.add_argument('--resume', action='store_true',
511 help = "Preserve existing database entries")
512 args = parser.parse_args()
513
514 if args.database == "":
515 print "Database name required."
516 exit(-1)
517
518 if not args.worker:
519 cpus = 24.0
520 exp = storage()
521
522 # system parameters
523 exp.processors = [int(cpus)]
524# exp.nr_clusters = [1, 4, 12, 24]
525 exp.nr_clusters = [24]
526 exp.host = ['ludwig']
527 exp.polluters = [False]
528 exp.ovh_type = ['max']
529
530 # task parameters
531 step_size = 0.1
532 exp.sys_util = [float(v) for v in arange(step_size, cpus+step_size, step_size)]
533# exp.sys_util = [float(v) for v in arange(10.0, cpus+step_size, step_size)]
534 exp.task_util = ['uni-light']
535 exp.period = ['uni-short']
536 exp.wcycle = [0]
537
538 # graph parameters
539 exp.num_graphs = ['uni-few']
540# exp.depth_factor = ['uni-medium']
541 exp.depth_factor = ['uni-short']
542 exp.node_placement = ['binomial']
543 exp.fan_out = ['uniform_3']
544 exp.edge_distance = ['geometric_3']
545 exp.wss = ['bimo-medium']
546 exp.fan_in_cap = [3]
547 exp.heur_aggressiveness = [0.75]
548
549 design_points = [dp for dp in DesignPointGenerator(exp, is_valid = valid)]
550
551 nr_dp = len(design_points)
552 if not args.pretend:
553 random.shuffle(design_points)
554 db.create_tables(args.database, dummy_dp = design_points[0], clean = not args.resume)
555 num_stored = db.store_design_points(args.database, design_points, clean = not args.resume)
556 print "Loaded %d of %d design points. (%d already completed)" % (num_stored, nr_dp, nr_dp - num_stored)
557
558 if args.pretend or args.initonly:
559 exit(0)
560
561 if args.worker:
562 print "Running as worker process."
563
564 total_nr_processed = 0
565 if args.processors > 1:
566 pool = Pool(processes = args.processors)
567 args = zip(range(args.processors), [args.database]*args.processors)
568 try:
569 for i,nr_processed in enumerate(pool.map(process_design_points, args)):
570 print 'worker %d: processed %d design points.' % (i,nr_processed)
571 total_nr_processed += nr_processed
572 pool.close()
573 except Exception as e:
574 pool.terminate()
575 print e
576 raise
577 else:
578 total_nr_processed = process_design_points((0, args.database))
579
580 print 'Processed %d design points!' % total_nr_processed
581
582if __name__ == '__main__':
583 main()
diff --git a/schedcat/overheads/model.py b/schedcat/overheads/model.py
index 42001a6..a3941d8 100644
--- a/schedcat/overheads/model.py
+++ b/schedcat/overheads/model.py
@@ -148,7 +148,9 @@ class ConsumerOverheads(object):
148 working_set[ConsumerOverheads.levels[i+1]] += delta 148 working_set[ConsumerOverheads.levels[i+1]] += delta
149 149
150 def worst_case_placement(self, working_set): 150 def worst_case_placement(self, working_set):
151 ws = copy.deepcopy(working_set) 151 if not self.limits:
152 self.compute_limits()
153 ws = working_set
152 self.coalesce(ws) 154 self.coalesce(ws)
153 placement = {'L1':0, 'L2':0, 'L3':0, 'MEM':0} 155 placement = {'L1':0, 'L2':0, 'L3':0, 'MEM':0}
154 dirty = True 156 dirty = True
@@ -183,8 +185,40 @@ class ConsumerOverheads(object):
183 return placement 185 return placement
184 186
185 def best_case_placement(self, working_set): 187 def best_case_placement(self, working_set):
186 placement = copy.deepcopy(working_set) 188 if not self.limits:
189 self.compute_limits()
190 placement = working_set
191 self.coalesce(placement)
192 return placement
193
194 def place_production(self, ti):
195 if not self.limits:
196 self.compute_limits()
197 produced = sum([e.wss for e in ti.node.outEdges])
198 placement = {'L1':produced, 'L2':0, 'L3':0, 'MEM':0}
187 self.coalesce(placement) 199 self.coalesce(placement)
200 ti.placed_production = placement
201
202 def worst_case_place_consumption(self, tp, wss, dist):
203 placement = {'L1':0, 'L2':0, 'L3':0, 'MEM':0}
204 remaining = wss
205 for l in ConsumerOverheads.rlevels[0:(4-dist)]:
206 consumed = min(remaining, tp.placed_production[l])
207 placement[l] = consumed
208 remaining -= consumed
209 if remaining == 0:
210 break
211 if remaining > 0:
212 # Place anything left over to the 'dist' level.
213 # Coalescing takes place at a later stage.
214 placement[ConsumerOverheads.levels[dist]] += remaining
215# assert sum(placement.itervalues()) == wss
216 return placement
217
218 def best_case_place_consumption(self, tp, wss, dist):
219 placement = {'L1':0, 'L2':0, 'L3':0, 'MEM':0}
220 placement[ConsumerOverheads.levels[dist]] = wss
221 # Coalescing takes place at a later stage
188 return placement 222 return placement
189 223
190 def consume_cost(self, shared_mem_level, working_set_size): 224 def consume_cost(self, shared_mem_level, working_set_size):
@@ -211,7 +245,7 @@ class ConsumerOverheads(object):
211 245
212 def consume_cost_spilled(self, ti, num_cpus): 246 def consume_cost_spilled(self, ti, num_cpus):
213 if self.system and self.system.machine: 247 if self.system and self.system.machine:
214 consume_amount = [0, 0, 0, 0] 248 init_placement = {'L1':0, 'L2':0, 'L3':0, 'MEM':0}
215 ti_hi_cpu = (ti.partition+1)*num_cpus - 1 249 ti_hi_cpu = (ti.partition+1)*num_cpus - 1
216 # sum up wss from different sources 250 # sum up wss from different sources
217 for e in ti.node.inEdges: 251 for e in ti.node.inEdges:
@@ -221,13 +255,9 @@ class ConsumerOverheads(object):
221 # ti and producer share a multi-cpu partition. 255 # ti and producer share a multi-cpu partition.
222 producer_lo_cpu = e.p.task.partition*num_cpus 256 producer_lo_cpu = e.p.task.partition*num_cpus
223 dist = self.system.distance(producer_lo_cpu, ti_hi_cpu) 257 dist = self.system.distance(producer_lo_cpu, ti_hi_cpu)
224 consume_amount[dist] += e.wss 258 sources = self.worst_case_place_consumption(e.p.task, e.wss, dist)
225 if not self.limits: 259 for l,wss in sources.iteritems():
226 self.compute_limits() 260 init_placement[l] += sources[l]
227 init_placement = {'L1':consume_amount[0],
228 'L2':consume_amount[1],
229 'L3':consume_amount[2],
230 'MEM':consume_amount[3]}
231 placement = self.worst_case_placement(init_placement) 261 placement = self.worst_case_placement(init_placement)
232 # convert to schedcat's format... 262 # convert to schedcat's format...
233 consumer_cost = self.consume_multilevel_cost({0:placement['MEM'], 263 consumer_cost = self.consume_multilevel_cost({0:placement['MEM'],
@@ -240,7 +270,7 @@ class ConsumerOverheads(object):
240 270
241 def consume_cost_spilled_estimate(self, ti, partition, num_cpus): 271 def consume_cost_spilled_estimate(self, ti, partition, num_cpus):
242 if self.system and self.system.machine: 272 if self.system and self.system.machine:
243 consume_amount = [0, 0, 0, 0] 273 init_placement = {'L1':0, 'L2':0, 'L3':0, 'MEM':0}
244 ti_hi_cpu = (partition+1)*num_cpus - 1 274 ti_hi_cpu = (partition+1)*num_cpus - 1
245 # sum up wss from different sources 275 # sum up wss from different sources
246 for e in ti.node.inEdges: 276 for e in ti.node.inEdges:
@@ -252,13 +282,9 @@ class ConsumerOverheads(object):
252 continue 282 continue
253 producer_lo_cpu = e.p.task.partition*num_cpus 283 producer_lo_cpu = e.p.task.partition*num_cpus
254 dist = self.system.distance(producer_lo_cpu, ti_hi_cpu) 284 dist = self.system.distance(producer_lo_cpu, ti_hi_cpu)
255 consume_amount[dist] += e.wss 285 sources = self.worst_case_place_consumption(e.p.task, e.wss, dist)
256 if not self.limits: 286 for l,wss in sources.iteritems():
257 self.compute_limits() 287 init_placement[l] += sources[l]
258 init_placement = {'L1':consume_amount[0],
259 'L2':consume_amount[1],
260 'L3':consume_amount[2],
261 'MEM':consume_amount[3]}
262 placement = self.worst_case_placement(init_placement) 288 placement = self.worst_case_placement(init_placement)
263 # convert to schedcat's format... 289 # convert to schedcat's format...
264 consumer_cost = self.consume_multilevel_cost({0:placement['MEM'], 290 consumer_cost = self.consume_multilevel_cost({0:placement['MEM'],