aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGlenn Elliott <gelliott@cs.unc.edu>2014-04-29 17:41:31 -0400
committerGlenn Elliott <gelliott@cs.unc.edu>2014-04-29 17:41:31 -0400
commit30d7c45411c0db77baef949498e9c21ca9c289e4 (patch)
tree46777110d8f7200d9f77aa1c4e4467c812c7c3ae
parent8d785ca286cbc924c3ab7cb8bde6ea6f8e2c8596 (diff)
rtss14
-rw-r--r--schedcat/generator/tasks.py6
-rw-r--r--schedcat/model/tasks.py80
-rw-r--r--schedcat/overheads/jlfp.py102
-rw-r--r--schedcat/overheads/model.py359
4 files changed, 216 insertions, 331 deletions
diff --git a/schedcat/generator/tasks.py b/schedcat/generator/tasks.py
index 422d5f0..804ed25 100644
--- a/schedcat/generator/tasks.py
+++ b/schedcat/generator/tasks.py
@@ -12,6 +12,12 @@ def uniform_int(minval, maxval):
12 return random.randint(minval, maxval) 12 return random.randint(minval, maxval)
13 return _draw 13 return _draw
14 14
15def const(val):
16 "Create a function that returns a constant value"
17 def _draw():
18 return val
19 return _draw
20
15def uniform(minval, maxval): 21def uniform(minval, maxval):
16 "Create a function that draws floats uniformly from [minval, maxval]" 22 "Create a function that draws floats uniformly from [minval, maxval]"
17 def _draw(): 23 def _draw():
diff --git a/schedcat/model/tasks.py b/schedcat/model/tasks.py
index 91fccab..9c84090 100644
--- a/schedcat/model/tasks.py
+++ b/schedcat/model/tasks.py
@@ -31,6 +31,57 @@ class SporadicTask(object):
31 def utilization(self): 31 def utilization(self):
32 return self.cost / self.period 32 return self.cost / self.period
33 33
34 def eff_utilization(self, factor):
35 if not self.uses_gpu:
36 return self.utilization()
37 # cpu cost without gpu management
38 eff_cost = (self.cost - self.ccost)
39 # kernel time scaled to single-thread of cpu time
40 eff_cost += self.gcost * factor
41 return eff_cost/self.period
42
43 def ee_utilization(self):
44 if not self.uses_gpu:
45 return 0.0
46 return self.gcost / self.period
47
48 def ce_utilization(self, with_send = True, with_recv = True, with_state = False):
49 if not self.uses_gpu:
50 return 0.0
51 ce_cost = 0.0
52 if with_send:
53 ce_cost += self.scost
54 if with_recv:
55 ce_cost += self.rcost
56 if with_state:
57 ce_cost += self.stcost
58 return ce_cost / self.period
59
60 def g_utilization(self, with_state = False, with_cpu = True):
61 if not self.uses_gpu:
62 return 0.0
63 totcost = self.gcost + self.scost + self.rcost
64 if with_state:
65 totcost += self.stcost
66 if with_cpu:
67 totcost += self.ccost
68 return float(totcost)/self.period
69
70 def bandwidth(self, send_only = False, recv_only = False, state_only = False, all_data = False):
71 if not self.uses_gpu:
72 return 0.0
73 elif send_only:
74 return float(self.sdata)/self.period
75 elif recv_only:
76 return float(self.rdata)/self.period
77 elif state_only:
78 return float(self.stdata)/self.period
79 elif all_data:
80 return float(self.sdata + self.rdata + self.stdata)/self.period
81 else:
82 # assume just input/output
83 return float(self.sdata + self.rdata)/self.period
84
34 def utilization_q(self): 85 def utilization_q(self):
35 return Fraction(self.cost, self.period) 86 return Fraction(self.cost, self.period)
36 87
@@ -40,6 +91,11 @@ class SporadicTask(object):
40 def density_q(self): 91 def density_q(self):
41 return Fraction(self.cost, min(self.period, self.deadline)) 92 return Fraction(self.cost, min(self.period, self.deadline))
42 93
94 def est_response_time(self):
95 if hasattr(self, 'response_time'):
96 return self.response_time
97 return self.period
98
43 def tardiness(self): 99 def tardiness(self):
44 """Return this task's tardiness. 100 """Return this task's tardiness.
45 Note: this function can only be called after some test 101 Note: this function can only be called after some test
@@ -47,6 +103,9 @@ class SporadicTask(object):
47 """ 103 """
48 return max(0, self.response_time - self.deadline) 104 return max(0, self.response_time - self.deadline)
49 105
106 def ptardiness(self):
107 return self.tardiness()/self.deadline
108
50 def maxjobs(self, interval_length): 109 def maxjobs(self, interval_length):
51 """Compute the maximum number of jobs that can execute during 110 """Compute the maximum number of jobs that can execute during
52 some interval. 111 some interval.
@@ -154,6 +213,27 @@ class TaskSystem(list):
154 "Assumes t.wss has been initialized for each task." 213 "Assumes t.wss has been initialized for each task."
155 return max([t.wss for t in self]) 214 return max([t.wss for t in self])
156 215
216 def max_tardiness(self):
217 return max([t.tardiness() for t in self])
218
219 def max_ptardiness(self):
220 return max([t.ptardiness() for t in self])
221
222 def bandwidth(self, send_only = False, recv_only = False, state_only = False, all_data = False):
223 return sum([t.bandwidth(send_only, recv_only, state_only, all_data) for t in self])
224
225 def ee_utilization(self):
226 return sum([t.ee_utilization() for t in self])
227
228 def ce_utilization(self, with_send = True, with_recv = True, with_state = False):
229 return sum([t.ce_utilization(with_send, with_recv, with_state) for t in self])
230
231 def g_utilization(self, with_state = False, with_cpu = True):
232 return sum([t.g_utilization(with_state, with_cpu) for t in self])
233
234 def eff_utilization(self, factor):
235 return sum([t.eff_utilization(factor) for t in self])
236
157 def copy(self): 237 def copy(self):
158 ts = TaskSystem((copy.deepcopy(t) for t in self)) 238 ts = TaskSystem((copy.deepcopy(t) for t in self))
159 return ts 239 return ts
diff --git a/schedcat/overheads/jlfp.py b/schedcat/overheads/jlfp.py
index 8db39df..7d7a29b 100644
--- a/schedcat/overheads/jlfp.py
+++ b/schedcat/overheads/jlfp.py
@@ -28,12 +28,7 @@ def preemption_centric_irq_costs(oheads, dedicated_irq, taskset):
28 urel = 0.0 28 urel = 0.0
29 if not dedicated_irq: 29 if not dedicated_irq:
30 rel = oheads.release(n) 30 rel = oheads.release(n)
31 for ti in taskset: 31 urel = sum([(rel/ti.period) for ti in taskset])
32 # PGM consumers (early-releasing tasks) don't use release timers,
33 # so skip them.
34 if hasattr(ti, 'node') and len(ti.node.inEdges):
35 continue
36 urel += (rel / ti.period)
37 32
38 # cost of preemption 33 # cost of preemption
39 cpre_numerator = tck + ev_lat * utick 34 cpre_numerator = tck + ev_lat * utick
@@ -44,6 +39,14 @@ def preemption_centric_irq_costs(oheads, dedicated_irq, taskset):
44 39
45 return (uscale, cpre_numerator / uscale) 40 return (uscale, cpre_numerator / uscale)
46 41
42def count_gpu_interrupts(t, ts):
43 def instances(u, interval):
44 return int(ceil((interval + max(u.response_time, u.period))/u.period))
45 # -1 to exclude the token lock from the gpu request count
46 count = sum([(u.nrequests-1)*instances(u, max(t.response_time, t.period))
47 for u in ts if t is not u and u.uses_gpu and u.nrequests > 2])
48 return count
49
47def charge_scheduling_overheads(oheads, num_cpus, dedicated_irq, taskset): 50def charge_scheduling_overheads(oheads, num_cpus, dedicated_irq, taskset):
48 if not oheads: 51 if not oheads:
49 return taskset 52 return taskset
@@ -54,66 +57,73 @@ def charge_scheduling_overheads(oheads, num_cpus, dedicated_irq, taskset):
54 uscale, cpre = preemption_centric_irq_costs(oheads, dedicated_irq, taskset) 57 uscale, cpre = preemption_centric_irq_costs(oheads, dedicated_irq, taskset)
55 58
56 if uscale <= 0: 59 if uscale <= 0:
60 print 'interrupt overload'
57 # interrupt overload 61 # interrupt overload
58 return False 62 return False
59 63
60 sched_ovh = oheads.schedule(n) 64 sched_ovh = oheads.schedule(n)
61 ctx_ovh = oheads.ctx_switch(n) 65 ctx_ovh = oheads.ctx_switch(n)
66 ipi_ovh = oheads.ipi_latency(n)
62 cache_ovh = oheads.cache_affinity_loss.cpmd_cost(oheads.shared_cache, taskset.max_wss()) 67 cache_ovh = oheads.cache_affinity_loss.cpmd_cost(oheads.shared_cache, taskset.max_wss())
63 sysin_ovh = oheads.syscall_in(n) 68 sysin_ovh = oheads.syscall_in(n)
64 sysout_ovh = oheads.syscall_out(n) 69 sysout_ovh = oheads.syscall_out(n)
70 nvth_ovh = oheads.nvtop(n)
71 nvbhrel_ovh = oheads.nvbot_release(n)
72
73 nvtop = nvth_ovh + nvbhrel_ovh
65 74
66 sched = 2 * (sched_ovh + ctx_ovh) + cache_ovh 75 sched = 2 * (sched_ovh + ctx_ovh) + cache_ovh
67 76
77 # wait, resume, yield: 3 (sched and ctx)
78 # two CPMD: 1 at resume and 1 at preemption of lower-priority task
79 # one ipi for wake-up latency
80 locking = 3 * (sched_ovh + ctx_ovh) + 2 * cache_ovh
81 locking_unscaled = ipi_ovh
82
83 # and extra overheads for gpusync engine locks
84 # sched/ctx for klmirqd, aux task, and task itself
85 # 3 ipi for each waking task
86 # Don't charge CPMDs because these are really small and fast
87 # (It would be like charging a CPMD because of a timer tick.)
88 engine_locking = 6 * (sched_ovh + ctx_ovh)
89 engine_locking_unscaled = 3 * ipi_ovh
90
68 irq_latency = oheads.release_latency(n) 91 irq_latency = oheads.release_latency(n)
69 92
70 if dedicated_irq: 93 if dedicated_irq:
71 unscaled = 2 * cpre + oheads.ipi_latency(n) + oheads.release(n) 94 unscaled = 2 * cpre + ipi_ovh + oheads.release(n)
72 elif num_cpus > 1: 95 elif num_cpus > 1:
73 unscaled = 2 * cpre + oheads.ipi_latency(n) 96 unscaled = 2 * cpre + ipi_ovh
74 else: 97 else:
75 unscaled = 2 * cpre 98 unscaled = 2 * cpre
76 99
77 # Charge PGM costs
78 pgm_costs = defaultdict(float)
79 max_producer_delay = 0.0
80 for ti in taskset:
81 # Charge consumers costs
82 if hasattr(ti, 'node') and len(ti.node.inEdges):
83 # Cost of consuming from remote producer after some delay, rather
84 # than consuming from a local producer with no delay.
85 if oheads.consumer:
86 consumer_cache_cost = oheads.consumer.consume_cost_spilled(ti, num_cpus)
87 pgm_costs[ti] += consumer_cache_cost
88 # We call into the scheduler to release the next job
89 early_release_cost = oheads.schedule(n)
90 pgm_costs[ti] += early_release_cost
91 # Charge the producer costs
92 if hasattr(ti, 'node') and len(ti.node.outEdges):
93 # one syscall in/out for each consumer (wakeup + sched)
94 out_degree = len(ti.node.outEdges)
95 producer_cost = out_degree * (sysin_ovh + sysout_ovh + sched_ovh)
96 # Compute how long it takes to check token constraints of consumers.
97 # This is already in ti's execution time, but we need to determine
98 # how long ti will be boosted.
99 scan_cost = 0.0
100 if oheads.producer:
101 for e in ti.node.outEdges:
102 consumer_in_degree = len(e.s.task.node.inEdges)
103 scan_cost += oheads.producer(consumer_in_degree)
104 max_producer_delay = max(max_producer_delay, producer_cost + scan_cost)
105 # charge for sched_yield() to exit boosted state
106 boost_cost = sysin_ovh + sysout_ovh + sched_ovh + ctx_ovh
107 #
108 pgm_costs[ti] += (producer_cost + boost_cost)
109
110 for ti in taskset: 100 for ti in taskset:
111 # PGM: A producer's boosted section may delay our release 101 latency = irq_latency
112 latency = irq_latency + max_producer_delay
113
114 ti.period -= latency 102 ti.period -= latency
115 ti.deadline -= latency 103 ti.deadline -= latency
116 ti.cost = ((ti.cost + sched + pgm_costs[ti]) / uscale) + unscaled 104
105 ti_sched = sched
106 ti_unscaled = unscaled
107
108 # Charge gpu interrupt overheads
109 ti_sched += nvtop * count_gpu_interrupts(ti, taskset)
110
111 # Charge overheads for GPUSync locking protocol use
112 if ti.nrequests > 0:
113 ti_sched += ti.nrequests * locking
114 ti_unscaled += ti.nrequests * locking_unscaled
115 # There are extra overheads for the engine locks...
116 if ti.nrequests > 2:
117 # -1 to exclude the token lock
118 ti_sched += (ti.nrequests - 1) * engine_locking
119 ti_unscaled += (ti.nrequests - 1) * engine_locking_unscaled
120
121 # Charge overheads for blocking.
122 # Includes donation costs, GPU exec/copy time, lock blocking, etc.
123 if hasattr(ti, 'gpusync_ovh'):
124 ti_sched += ti.gpusync_ovh
125
126 ti.cost = ((ti.cost + ti_sched) / uscale) + ti_unscaled
117 if ti.density() > 1: 127 if ti.density() > 1:
118 return False 128 return False
119 return taskset 129 return taskset
@@ -127,6 +137,6 @@ def quantize_params(taskset):
127 t.period = int(floor(t.period)) 137 t.period = int(floor(t.period))
128 t.deadline = int(floor(t.deadline)) 138 t.deadline = int(floor(t.deadline))
129 if not min(t.period, t.deadline) or t.density() > 1: 139 if not min(t.period, t.deadline) or t.density() > 1:
130 return False 140 return False
131 141
132 return taskset 142 return taskset
diff --git a/schedcat/overheads/model.py b/schedcat/overheads/model.py
index ea82862..f28ff1c 100644
--- a/schedcat/overheads/model.py
+++ b/schedcat/overheads/model.py
@@ -1,6 +1,7 @@
1from __future__ import division 1from __future__ import division
2 2
3import copy 3import copy
4import itertools
4 5
5from schedcat.util.csv import load_columns as load_column_csv 6from schedcat.util.csv import load_columns as load_column_csv
6from schedcat.util.math import monotonic_pwlin, piece_wise_linear, const 7from schedcat.util.math import monotonic_pwlin, piece_wise_linear, const
@@ -27,6 +28,11 @@ class Overheads(object):
27 ('READ-UNLOCK', 'read_unlock'), 28 ('READ-UNLOCK', 'read_unlock'),
28 ('SYSCALL-IN', 'syscall_in'), 29 ('SYSCALL-IN', 'syscall_in'),
29 ('SYSCALL-OUT', 'syscall_out'), 30 ('SYSCALL-OUT', 'syscall_out'),
31
32 # GPU-related overheads
33 ('NV-TOP', 'nvtop'),
34 ('NV-BOTTOM', 'nvbot'),
35 ('NV-BOTTOM-RELEASE', 'nvbot_release'),
30 ] 36 ]
31 37
32 def zero_overheads(self): 38 def zero_overheads(self):
@@ -61,44 +67,47 @@ class Overheads(object):
61 o.load_approximations(fname, non_decreasing) 67 o.load_approximations(fname, non_decreasing)
62 return o 68 return o
63 69
70
71
64class CacheDelay(object): 72class CacheDelay(object):
65 """Cache-related Preemption and Migration Delay (CPMD) 73 """Cache-related Preemption and Migration Delay (CPMD)
66 Overheads are expressed as a piece-wise linear function of working set size. 74 Overheads are expressed as a piece-wise linear function of working set size.
67 """ 75 """
68 76
69 MEM, L1, L2, L3 = 0, 1, 2, 3 77 MAPPING = {0:'L1', 1:'L2', 2:'L3', 3:'Mem', 4:'Numa'}
70 SCHEDCAT_MAPPING = list(enumerate(["MEM", "L1", "L2", "L3"])) 78 RMAPPING = {'L1':0, 'L2':1, 'L3':2, 'Mem':3, 'Numa':4}
71 79
72 def __init__(self, l1=0, l2=0, l3=0, mem=0): 80 def __init__(self, l1=0, l2=0, l3=0, mem=0, numa=0):
73 self.mem_hierarchy = [const(mem), const(l1), const(l2), const(l3)] 81 self.mem_hierarchy = [const(l1), const(l2), const(l3), const(mem), const(numa)]
74 for (i, name) in CacheDelay.SCHEDCAT_MAPPING: 82 for i, name in CacheDelay.MAPPING.iteritems():
75 self.__dict__[name] = self.mem_hierarchy[i] 83 self.__dict__[name] = self.mem_hierarchy[i]
76 84
77 def cpmd_cost(self, shared_mem_level, working_set_size): 85 def cpmd_cost(self, shared_mem_level, working_set_size):
78 wss = min(working_set_size, self.cache_size) 86# wss = min(working_set_size, self.cache_size)
79 return self.mem_hierarchy[shared_mem_level](wss) 87 if type(shared_mem_level) is str:
88 shared_mem_level = CacheDelay.get_idx_for_name(shared_mem_level)
89 return self.mem_hierarchy[shared_mem_level](working_set_size)
80 90
81 def set_cpmd_cost(self, shared_mem_level, approximation): 91 def set_cpmd_cost(self, shared_mem_level, approximation):
92 if type(shared_mem_level) is str:
93 shared_mem_level = CacheDelay.get_idx_for_name(shared_mem_level)
82 self.mem_hierarchy[shared_mem_level] = approximation 94 self.mem_hierarchy[shared_mem_level] = approximation
83 name = CacheDelay.SCHEDCAT_MAPPING[shared_mem_level][1] 95 name = CacheDelay.MAPPING[shared_mem_level][1]
84 self.__dict__[name] = self.mem_hierarchy[shared_mem_level] 96 self.__dict__[name] = self.mem_hierarchy[shared_mem_level]
85 97
86 def set_max_wss(self, cache_size): 98# def set_max_wss(self, cache_size):
87 self.cache_size = cache_size 99# self.cache_size = cache_size
88 100
89 def max_cost(self, working_set_size): 101 def max_cost(self, working_set_size):
90 wss = min(working_set_size, self.cache_size) 102# wss = min(working_set_size, self.cache_size)
91 return max([f(wss) for f in self.mem_hierarchy]) 103 return max([f(working_set_size) for f in self.mem_hierarchy])
92 104
93 def __call__(self, wss): 105 def __call__(self, wss):
94 return self.max_cost(wss) 106 return self.max_cost(wss)
95 107
96 @staticmethod 108 @staticmethod
97 def get_idx_for_name(key): 109 def get_idx_for_name(key):
98 for (i, name) in CacheDelay.SCHEDCAT_MAPPING: 110 return CacheDelay.RMAPPING[key]
99 if name == key:
100 return i
101 assert False # bad key
102 111
103 @staticmethod 112 @staticmethod
104 def from_file(fname, non_decreasing=True): 113 def from_file(fname, non_decreasing=True):
@@ -108,7 +117,7 @@ class CacheDelay(object):
108 117
109 o = CacheDelay() 118 o = CacheDelay()
110 119
111 for idx, name in CacheDelay.SCHEDCAT_MAPPING: 120 for idx, name in CacheDelay.MAPPING.iteritems():
112 if name in data.by_name: 121 if name in data.by_name:
113 points = zip(data.by_name['WSS'], data.by_name[name]) 122 points = zip(data.by_name['WSS'], data.by_name[name])
114 if non_decreasing: 123 if non_decreasing:
@@ -118,291 +127,71 @@ class CacheDelay(object):
118 o.__dict__[name] = o.mem_hierarchy[idx] 127 o.__dict__[name] = o.mem_hierarchy[idx]
119 return o 128 return o
120 129
121class RawOverheads(object):
122 MEM, L1, L2, L3 = 0, 1, 2, 3
123 SCHEDCAT_MAPPING = list(enumerate(["MEM", "L1", "L2", "L3"]))
124 130
125 def __init__(self, l1=0, l2=0, l3=0, mem=0): 131def btokb(byts):
126 self.mem_hierarchy = [const(mem), const(l1), const(l2), const(l3)] 132 return byts/1024.0
127 for (i, name) in RawOverheads.SCHEDCAT_MAPPING:
128 self.__dict__[name] = self.mem_hierarchy[i]
129 133
130 def cost(self, shared_mem_level, working_set_size): 134class XmitOverheads(object):
131 wss = min(working_set_size, self.cache_size)
132 return self.mem_hierarchy[shared_mem_level](wss)
133 135
134 def __call__(self, wss): 136 D2DN = 0
135 # presume local: L1 137 D2DF = 1
136 return self.mem_hierarchy[1](wss) 138 D2H = 2
139 H2D = 3
137 140
138 @staticmethod 141 MAPPING = {D2DN:'D2DN', D2DF:'D2DF', D2H:'D2H', H2D:'H2D'}
139 def get_idx_for_name(key): 142 RMAPPING = {'D2DN':D2DN, 'D2DF':D2DF, 'D2H':D2H, 'H2D':H2D}
140 for (i, name) in RawOverheads.SCHEDCAT_MAPPING:
141 if name == key:
142 return i
143 assert False # bad key
144 143
145 @staticmethod 144 def __init__(self, d2dn=0, d2df=0, d2h=0, h2d=0):
146 def from_file(fname, non_decreasing=True): 145 self.xmit = [const(d2dn), const(d2df), const(d2h), const(h2d)]
147 data = load_column_csv(fname, convert=float) 146 for i, name in XmitOverheads.MAPPING.iteritems():
148 if not 'WSS' in data.by_name: 147 self.__dict__[name] = self.xmit[i]
149 raise IOError, 'WSS column is missing'
150
151 o = RawOverheads()
152 148
153 for idx, name in RawOverheads.SCHEDCAT_MAPPING: 149 def xmit_cost(self, xmit_type, datasz):
154 if name in data.by_name: 150 if type(xmit_type) is str:
155 points = zip(data.by_name['WSS'], data.by_name[name]) 151 xmit_type = XmitOverheads.get_idx_for_name(xmit_type)
156 if non_decreasing: 152 if datasz == 0:
157 o.mem_hierarchy[idx] = monotonic_pwlin(points) 153 return 0.0
158 else: 154 return self.xmit[xmit_type](btokb(datasz))
159 o.mem_hierarchy[idx] = piece_wise_linear(points)
160 o.__dict__[name] = o.mem_hierarchy[idx]
161 return o
162 155
163class ConsumerOverheads(object): 156 def compute_xmit_cost(self, xmit_type, datasz, chunk_size):
164 """Consumption cost overheads 157 nchunks = int(datasz/chunk_size)
165 Overheads are expressed as a piece-wise linear function of working set size. 158 extra = datasz - nchunks*chunk_size
166 """ 159 cost = self.xmit_cost(xmit_type, chunk_size)*nchunks
167 160 if extra > 0:
168 MEM, L1, L2, L3 = 0, 1, 2, 3 161 cost += self.xmit_cost(xmit_type, extra)
169 SCHEDCAT_MAPPING = list(enumerate(["MEM", "L1", "L2", "L3"]))
170
171 levels = ['L1', 'L2', 'L3', 'MEM']
172 rlevels = list(reversed(levels))
173
174 def __init__(self, l1=0, l2=0, l3=0, mem=0, system=None):
175 self.mem_hierarchy = [const(mem), const(l1), const(l2), const(l3)]
176 for (i, name) in ConsumerOverheads.SCHEDCAT_MAPPING:
177 self.__dict__[name] = self.mem_hierarchy[i]
178 self.system = system
179 if self.system:
180 self.compute_limits()
181 else:
182 self.limits = None
183
184 def coalesce(self, working_set):
185 for i,l in enumerate(ConsumerOverheads.levels):
186 if working_set[l] > self.limits[l]:
187 delta = working_set[l] - self.limits[l]
188 working_set[l] = self.limits[l]
189 working_set[ConsumerOverheads.levels[i+1]] += delta
190
191 def worst_case_placement(self, working_set):
192 if not self.limits:
193 self.compute_limits()
194 ws = working_set
195 self.coalesce(ws)
196 placement = {'L1':0, 'L2':0, 'L3':0, 'MEM':0}
197 dirty = True
198 while dirty:
199 dirty = False
200 for i,l in enumerate(ConsumerOverheads.rlevels):
201 footprint = ws[l]
202 if footprint == 0:
203 # no data accessed at this level
204 continue
205 # place the data
206 placement[l] += ws[l]
207 # consume this data
208 ws[l] = 0
209 # evict data from mem hierarchy, top down
210 for j,v in enumerate(ConsumerOverheads.levels[0:-1]):
211 evicted = min(footprint, ws[v])
212 if evicted:
213 # spill the evicted amount...
214 ws[v] -= evicted
215 # ...down to the next level
216 ws[ConsumerOverheads.levels[j+1]] += evicted
217 # coalesce the spill down the mem hierarchy
218 self.coalesce(ws)
219 # recored that we must recurse
220 dirty = True
221 # keep going if the footprint was large enough to
222 # spill into the next cache
223 footprint -= self.limits[v]
224 if footprint <= 0:
225 break
226 return placement
227
228 def best_case_placement(self, working_set):
229 if not self.limits:
230 self.compute_limits()
231 placement = working_set
232 self.coalesce(placement)
233 return placement
234
235 def place_production(self, ti):
236 if not self.limits:
237 self.compute_limits()
238 produced = sum([e.wss for e in ti.node.outEdges])
239 placement = {'L1':produced, 'L2':0, 'L3':0, 'MEM':0}
240 self.coalesce(placement)
241 ti.placed_production = placement
242
243 def worst_case_place_consumption(self, tp, wss, dist):
244 placement = {'L1':0, 'L2':0, 'L3':0, 'MEM':0}
245 remaining = wss
246 for l in ConsumerOverheads.rlevels[0:(4-dist)]:
247 consumed = min(remaining, tp.placed_production[l])
248 placement[l] = consumed
249 remaining -= consumed
250 if remaining == 0:
251 break
252 if remaining > 0:
253 # Place anything left over to the 'dist' level.
254 # Coalescing takes place at a later stage.
255 placement[ConsumerOverheads.levels[dist]] += remaining
256# assert sum(placement.itervalues()) == wss
257 return placement
258
259 def best_case_place_consumption(self, tp, wss, dist):
260 placement = {'L1':0, 'L2':0, 'L3':0, 'MEM':0}
261 placement[ConsumerOverheads.levels[dist]] = wss
262 # Coalescing takes place at a later stage
263 return placement
264
265 def consume_cost(self, shared_mem_level, working_set_size):
266 return self.mem_hierarchy[shared_mem_level](working_set_size)
267
268 def consume_multilevel_cost(self, working_set):
269 # working_set is a dictionary of bytes to be consumed from
270 # different distances
271 cost = 0.0
272 for shared_mem_level, wss in working_set.iteritems():
273 cost += self.consume_cost(shared_mem_level, wss)
274 return cost 162 return cost
275 163
276 def compute_limits(self): 164 def set_xmit_cost(self, xmit_type, approximation):
277 self.limits = {'L1':self.system.machine['L1'], 165 if type(xmit_type) is str:
278 'L2':self.system.machine['L2'], 166 xmit_type = XmitOverheads.get_idx_for_name(xmit_type)
279 'L3':self.system.machine['L3'], 167 self.xmit[xmit_type] = approximation
280 'MEM':10000000000} 168 name = XmitOverheads.MAPPING[xmit_type][1]
281 # shrink the available space if we're on an inclusive cache 169 self.__dict__[name] = self.xmit[xmit_type]
282 if self.system.machine['inclusive'] == 1: 170
283 for i,l in enumerate(ConsumerOverheads.rlevels[1:-1]): 171 def max_cost(self, datasz):
284 above = ConsumerOverheads.rlevels[i+2] 172 return max([f(btokb(datasz)) for f in self.xmit])
285 self.limits[l] -= self.limits[above] 173
286 174 def __call__(self, datasz):
287 def consume_cost_spilled(self, ti, num_cpus): 175 return self.max_cost(btokb(datasz))
288 if self.system and self.system.machine: 176
289 init_placement = {'L1':0, 'L2':0, 'L3':0, 'MEM':0}
290 ti_hi_cpu = (ti.partition+1)*num_cpus - 1
291 # sum up wss from different sources
292 for e in ti.node.inEdges:
293 # assume memory distance
294 dist = 3
295 # -- IMPORTANT --
296 # Take distance between ti's last cpu and producer's first cpu
297 # in partition. Ensures conservative consumer overhead even if
298 # ti and producer share a multi-cpu partition.
299 if e.p.task.partition != -1:
300 producer_lo_cpu = e.p.task.partition*num_cpus
301 dist = self.system.distance(producer_lo_cpu, ti_hi_cpu)
302 sources = self.worst_case_place_consumption(e.p.task, e.wss, dist)
303 for l,wss in sources.iteritems():
304 init_placement[l] += sources[l]
305 placement = self.worst_case_placement(init_placement)
306 # convert to schedcat's format...
307 consumer_cost = self.consume_multilevel_cost({0:placement['MEM'],
308 1:placement['L1'],
309 2:placement['L2'],
310 3:placement['L3']})
311 return consumer_cost
312 else:
313 return self.max_cost(ti.wss)
314
315 def consume_cost_spilled_estimate(self, ti, partition, num_cpus):
316 if partition != -1 and self.system and self.system.machine:
317 init_placement = {'L1':0, 'L2':0, 'L3':0, 'MEM':0}
318 ti_hi_cpu = (partition+1)*num_cpus - 1
319 # sum up wss from different sources
320 for e in ti.node.inEdges:
321 # assume memory distance
322 dist = 3
323 # -- IMPORTANT --
324 # Take distance between ti's last cpu and producer's first cpu
325 # in partition. Ensures conservative consumer overhead even if
326 # ti and producer share a multi-cpu partition.
327 if e.p.task.partition != -1:
328 producer_lo_cpu = e.p.task.partition*num_cpus
329 dist = self.system.distance(producer_lo_cpu, ti_hi_cpu)
330 sources = self.worst_case_place_consumption(e.p.task, e.wss, dist)
331 for l,wss in sources.iteritems():
332 init_placement[l] += sources[l]
333 placement = self.worst_case_placement(init_placement)
334 # convert to schedcat's format...
335 consumer_cost = self.consume_multilevel_cost({0:placement['MEM'],
336 1:placement['L1'],
337 2:placement['L2'],
338 3:placement['L3']})
339 return consumer_cost
340 else:
341 return self.max_cost(ti.wss)
342
343 def set_consume_cost(self, shared_mem_level, approximation):
344 self.mem_hierarchy[shared_mem_level] = approximation
345 name = ConsumeOverhead.SCHEDCAT_MAPPING[shared_mem_level][1]
346 self.__dict__[name] = self.mem_hierarchy[shared_mem_level]
347
348 def max_cost(self, working_set_size):
349 return max([f(working_set_size) for f in self.mem_hierarchy])
350
351 def __call__(self, wss):
352 return self.max_cost(wss)
353
354 @staticmethod 177 @staticmethod
355 def get_idx_for_name(key): 178 def get_idx_for_name(key):
356 for (i, name) in ConsumerOverheads.SCHEDCAT_MAPPING: 179 return XmitOverheads.RMAPPING[key]
357 if name == key: 180
358 return i
359 assert False # bad key
360
361 @staticmethod 181 @staticmethod
362 def from_file(fname, non_decreasing=True, system=None): 182 def from_file(fname, non_decreasing=True):
363 data = load_column_csv(fname, convert=float) 183 data = load_column_csv(fname, convert=float)
364 if not 'WSS' in data.by_name: 184 if not 'WSS' in data.by_name:
365 raise IOError, 'WSS column is missing' 185 raise IOError, 'WSS column is missing'
366 186
367 o = ConsumerOverheads() 187 o = XmitOverheads()
368 o.system = system 188
369 189 for idx, name in XmitOverheads.MAPPING.iteritems():
370 for idx, name in ConsumerOverheads.SCHEDCAT_MAPPING:
371 if name in data.by_name: 190 if name in data.by_name:
372 points = zip(data.by_name['WSS'], data.by_name[name]) 191 points = zip(data.by_name['WSS'], data.by_name[name])
373 if non_decreasing: 192 if non_decreasing:
374 o.mem_hierarchy[idx] = monotonic_pwlin(points) 193 o.xmit[idx] = monotonic_pwlin(points)
375 else: 194 else:
376 o.mem_hierarchy[idx] = piece_wise_linear(points) 195 o.xmit[idx] = piece_wise_linear(points)
377 o.__dict__[name] = o.mem_hierarchy[idx] 196 o.__dict__[name] = o.xmit[idx]
378 return o
379
380class ProducerOverheads(object):
381 """Token production and constraint checking overheads.
382 """
383
384 def __init__(self, cost = 0):
385 self.cost_func = const(cost)
386
387 def production_cost(self, degree):
388 return self.cost_func(degree)
389
390 def set_production_cost(self, approximation):
391 self.cost_func = approximation
392
393 def __call__(self, degree):
394 return self.production_cost(degree)
395
396 @staticmethod
397 def from_file(fname, non_decreasing=True):
398 data = load_column_csv(fname, convert=float)
399 if not 'DEG' in data.by_name:
400 raise IOError, 'DEG (degree) column is missing'
401
402 o = ProducerOverheads()
403 points = zip(data.by_name['DEG'], data.by_name['COST'])
404 if non_decreasing:
405 o.cost_func = monotonic_pwlin(points)
406 else:
407 o.cost_func = piece_wise_linear(points)
408 return o 197 return o