From 21a605fb8fe90f3b2659cb9d93039232bb2bddc4 Mon Sep 17 00:00:00 2001 From: Glenn Elliott Date: Fri, 31 Jan 2014 21:55:03 -0500 Subject: Compute costs for reading and writing data. --- distill_read_hot.py | 205 ++++++++++++++++++++++++++++++++++++++++++++++++++ distill_write_cold.py | 205 ++++++++++++++++++++++++++++++++++++++++++++++++++ gen/edf_generators.py | 11 +++ gen/generator.py | 9 +++ 4 files changed, 430 insertions(+) create mode 100755 distill_read_hot.py create mode 100755 distill_write_cold.py diff --git a/distill_read_hot.py b/distill_read_hot.py new file mode 100755 index 0000000..7b994ff --- /dev/null +++ b/distill_read_hot.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python + +import os +import re +import fnmatch +import shutil as sh +import sys +import csv +import numpy as np +from scipy.stats import scoreatpercentile +import bisect +from optparse import OptionParser + +from utils.machines import machines + +import utils.iqr + +class Topology: + ncpus, root, leaves, dist_mat = 0, None, None, None + levels = ['L1', 'L2', 'L3', 'Mem', 'System'] + + class Node: + idx, name, parent, children = 0, 'Unk', None, None + def __init__(self, idx, name, parent = None): + self.idx = idx + self.name = name + self.parent = parent + self.children = [] + def __repr__(self): + return self.name + '_' + str(self.idx) + + def __build_level_above(self, machine, l, child_nodes): + key = 'n' + l + if key in machine: + cluster_sz = machine[key] + else: + cluster_sz = 1 + nchildren = len(child_nodes) + nodes = [self.Node(idx, l) for idx in range(nchildren/cluster_sz)] + for i in range(len(child_nodes)): + child_nodes[i].parent = nodes[i/cluster_sz] + nodes[i/cluster_sz].children.append(child_nodes[i]) + return nodes + + def __find_dist(self, a, b): + if a != b: + # pass-through (ex. as CPU is to private L1) + if len(a.parent.children) == 1: + return self.__find_dist(a.parent, b.parent) + else: + return 1 + self.__find_dist(a.parent, b.parent) + return 0 + + def __build_dist_matrix(self): + dist_mat = np.empty([self.ncpus, self.ncpus], int) + for i in range(self.ncpus): + for j in range(i, self.ncpus): + dist_mat[i,j] = dist_mat[j,i] = self.__find_dist(self.leaves[i], self.leaves[j]) + return dist_mat + + def __init__(self, machine): + self.ncpus = machine['sockets']*machine['cores_per_socket'] + + # build the Topology bottom up + self.leaves = [self.Node(idx, 'CPU') for idx in range(self.ncpus)] + nodes = self.leaves + for l in self.levels: + nodes = self.__build_level_above(machine, l, nodes) + self.root = nodes + + self.dist_mat = self.__build_dist_matrix() + + + def __repr_level(self, node, stem, buf): + spacing = 3 + buf += stem + node.name + '_' + str(node.idx) + '\n' + for c in node.children: + buf = self.__repr_level(c, stem + ' '*spacing, buf) + return buf + + def __repr__(self): + buf = self.__repr_level(self.root[0], '', '') + return buf + + def distance(self, a, b): + return self.dist_mat[a,b] + + +topologies = {} +def get_topo(host): + if host in topologies: + return topologies[host] + else: + topo = Topology(machines[host]) + topologies[host] = topo + return topo + +def non_polluter_filename(csv_file): + return re.sub(r"polluters=True", r"polluters=False", csv_file) + +# find the max/avg/std of preemption and migration +def process_cpmd(csv_file, params): + + if 'pco' not in params: + raise Exception(('not producer/consumer overhead file: %s)') % csv_file) + + topo = get_topo(params['host']) + + print 'processing ' + csv_file + + ifile = open(csv_file, "r") + bestcase = open(non_polluter_filename(csv_file), "r") + + reader = csv.reader(ifile) + bc_reader = csv.reader(bestcase) + costs = {} + + SAMPLE = 0 + WSS = 1 + DELAY = 2 + LAST_CPU = 3 + NEXT_CPU = 4 + DIST = 5 + PRODUCE_COLD = 6 + PRODUCE_HOT = 7 + CONSUME_COLD = 8 + CONSUME_HOT = 9 + + for (row, bc_row) in zip(reader, bc_reader): + hot = int(row[CONSUME_HOT]) + distance = int(row[DIST]) + if distance not in costs: + costs[distance] = [] + costs[distance].append(hot) + + for d,c in costs.iteritems(): + arr = np.array(c, float) + arr = np.sort(arr) + (arr, mincut, maxcut) = utils.iqr.apply_iqr(arr, 1.5) + for x in np.nditer(arr, op_flags=['readwrite']): + x[...] = utils.machines.cycles_to_us(params['host'], x) + costs[d] = arr + + stats = {} +# print costs + for d,arr in costs.iteritems(): + stats[d] = {'max':arr.max(), 'median':np.median(arr), 'mean':arr.mean(), 'std':arr.std()} + + return stats + +def parse_args(): + parser = OptionParser("usage: %prog [files...]") + return parser.parse_args() + +def safe_split(t, delim): + t = t.split(delim) + if len(t) == 1: + t = tuple([t[0], None]) + return t + +def get_level(machine, ncpus): + dist = get_topo(machine).distance(0, int(ncpus)-1) + names = ['L1', 'L2', 'L3', 'mem', 'sys'] + if dist <= len(names): + return names[dist] + else: + raise Exception("Unable to determine level.") + return '' + +def main(): + opts, args = parse_args() + + files = filter(os.path.exists, args) + + regex = fnmatch.translate("pco_*.csv") + csvs = re.compile(regex) + files = filter(csvs.search, files) + + results = {} + for f in files: + temp = os.path.basename(f).split(".csv")[0] + tokens = temp.split("_") + + params = {k:v for (k,v) in map(lambda x: safe_split(x, "="), tokens)} + common = tuple([params['host'], params['ncpu'], params['polluters'], params['walk'], params['hpages'], params['upages']]) + if common not in results: + results[common] = {} + results[common][int(params['wss'])] = process_cpmd(f, params) + +# print results + for common in results: + trends = results[common] + for t in ['max', 'median', 'mean']: + name = 'dro_hot_host=%s_lvl=%s_polluters=%s_walk=%s_hpages=%s_upages=%s_type=%s.csv' % (common[0], get_level(common[0], common[1]), common[2], common[3], common[4], common[5], t) + f = open(name, 'w') + f.write('WSS,L1,L2,L3,MEM\n') + for w,stats in iter(sorted(trends.iteritems())): + f.write('%d' % w) + for i,data in iter(sorted(stats.iteritems())): + val = data[t] + f.write(',%.6f' % val) + f.write('\n') + +if __name__ == '__main__': + main() diff --git a/distill_write_cold.py b/distill_write_cold.py new file mode 100755 index 0000000..28e9eb0 --- /dev/null +++ b/distill_write_cold.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python + +import os +import re +import fnmatch +import shutil as sh +import sys +import csv +import numpy as np +from scipy.stats import scoreatpercentile +import bisect +from optparse import OptionParser + +from utils.machines import machines + +import utils.iqr + +class Topology: + ncpus, root, leaves, dist_mat = 0, None, None, None + levels = ['L1', 'L2', 'L3', 'Mem', 'System'] + + class Node: + idx, name, parent, children = 0, 'Unk', None, None + def __init__(self, idx, name, parent = None): + self.idx = idx + self.name = name + self.parent = parent + self.children = [] + def __repr__(self): + return self.name + '_' + str(self.idx) + + def __build_level_above(self, machine, l, child_nodes): + key = 'n' + l + if key in machine: + cluster_sz = machine[key] + else: + cluster_sz = 1 + nchildren = len(child_nodes) + nodes = [self.Node(idx, l) for idx in range(nchildren/cluster_sz)] + for i in range(len(child_nodes)): + child_nodes[i].parent = nodes[i/cluster_sz] + nodes[i/cluster_sz].children.append(child_nodes[i]) + return nodes + + def __find_dist(self, a, b): + if a != b: + # pass-through (ex. as CPU is to private L1) + if len(a.parent.children) == 1: + return self.__find_dist(a.parent, b.parent) + else: + return 1 + self.__find_dist(a.parent, b.parent) + return 0 + + def __build_dist_matrix(self): + dist_mat = np.empty([self.ncpus, self.ncpus], int) + for i in range(self.ncpus): + for j in range(i, self.ncpus): + dist_mat[i,j] = dist_mat[j,i] = self.__find_dist(self.leaves[i], self.leaves[j]) + return dist_mat + + def __init__(self, machine): + self.ncpus = machine['sockets']*machine['cores_per_socket'] + + # build the Topology bottom up + self.leaves = [self.Node(idx, 'CPU') for idx in range(self.ncpus)] + nodes = self.leaves + for l in self.levels: + nodes = self.__build_level_above(machine, l, nodes) + self.root = nodes + + self.dist_mat = self.__build_dist_matrix() + + + def __repr_level(self, node, stem, buf): + spacing = 3 + buf += stem + node.name + '_' + str(node.idx) + '\n' + for c in node.children: + buf = self.__repr_level(c, stem + ' '*spacing, buf) + return buf + + def __repr__(self): + buf = self.__repr_level(self.root[0], '', '') + return buf + + def distance(self, a, b): + return self.dist_mat[a,b] + + +topologies = {} +def get_topo(host): + if host in topologies: + return topologies[host] + else: + topo = Topology(machines[host]) + topologies[host] = topo + return topo + +def non_polluter_filename(csv_file): + return re.sub(r"polluters=True", r"polluters=False", csv_file) + +# find the max/avg/std of preemption and migration +def process_cpmd(csv_file, params): + + if 'pco' not in params: + raise Exception(('not producer/consumer overhead file: %s)') % csv_file) + + topo = get_topo(params['host']) + + print 'processing ' + csv_file + + ifile = open(csv_file, "r") + bestcase = open(non_polluter_filename(csv_file), "r") + + reader = csv.reader(ifile) + bc_reader = csv.reader(bestcase) + costs = {} + + SAMPLE = 0 + WSS = 1 + DELAY = 2 + LAST_CPU = 3 + NEXT_CPU = 4 + DIST = 5 + PRODUCE_COLD = 6 + PRODUCE_HOT = 7 + CONSUME_COLD = 8 + CONSUME_HOT = 9 + + for (row, bc_row) in zip(reader, bc_reader): + cold = int(row[PRODUCE_COLD]) + distance = int(row[DIST]) + if distance not in costs: + costs[distance] = [] + costs[distance].append(cold) + + for d,c in costs.iteritems(): + arr = np.array(c, float) + arr = np.sort(arr) + (arr, mincut, maxcut) = utils.iqr.apply_iqr(arr, 1.5) + for x in np.nditer(arr, op_flags=['readwrite']): + x[...] = utils.machines.cycles_to_us(params['host'], x) + costs[d] = arr + + stats = {} +# print costs + for d,arr in costs.iteritems(): + stats[d] = {'max':arr.max(), 'median':np.median(arr), 'mean':arr.mean(), 'std':arr.std()} + + return stats + +def parse_args(): + parser = OptionParser("usage: %prog [files...]") + return parser.parse_args() + +def safe_split(t, delim): + t = t.split(delim) + if len(t) == 1: + t = tuple([t[0], None]) + return t + +def get_level(machine, ncpus): + dist = get_topo(machine).distance(0, int(ncpus)-1) + names = ['L1', 'L2', 'L3', 'mem', 'sys'] + if dist <= len(names): + return names[dist] + else: + raise Exception("Unable to determine level.") + return '' + +def main(): + opts, args = parse_args() + + files = filter(os.path.exists, args) + + regex = fnmatch.translate("pco_*.csv") + csvs = re.compile(regex) + files = filter(csvs.search, files) + + results = {} + for f in files: + temp = os.path.basename(f).split(".csv")[0] + tokens = temp.split("_") + + params = {k:v for (k,v) in map(lambda x: safe_split(x, "="), tokens)} + common = tuple([params['host'], params['ncpu'], params['polluters'], params['walk'], params['hpages'], params['upages']]) + if common not in results: + results[common] = {} + results[common][int(params['wss'])] = process_cpmd(f, params) + +# print results + for common in results: + trends = results[common] + for t in ['max', 'median', 'mean']: + name = 'dwo_cold_host=%s_lvl=%s_polluters=%s_walk=%s_hpages=%s_upages=%s_type=%s.csv' % (common[0], get_level(common[0], common[1]), common[2], common[3], common[4], common[5], t) + f = open(name, 'w') + f.write('WSS,L1,L2,L3,MEM\n') + for w,stats in iter(sorted(trends.iteritems())): + f.write('%d' % w) + for i,data in iter(sorted(stats.iteritems())): + val = data[t] + f.write(',%.6f' % val) + f.write('\n') + +if __name__ == '__main__': + main() diff --git a/gen/edf_generators.py b/gen/edf_generators.py index eda23e4..cca4d44 100644 --- a/gen/edf_generators.py +++ b/gen/edf_generators.py @@ -269,4 +269,15 @@ class CflSplitPgmGenerator(EdfPgmGenerator): if exp_params['level'] == 'ALL': # kludge: assume global task sets are always schedulable is_sched = True + + if is_sched: + # compute the minimum time to produce/consume, so this can be discounted + # from the execution time during runtime + for ti in ts: + consume_amount = ti.wss + produce_amount = sum([e.wss for e in ti.node.outEdges]) + consume_time = overheads.read(consume_amount) + produce_time = overheads.write(produce_amount) + ti.cost_discount = consume_time + produce_time + return is_sched, ts diff --git a/gen/generator.py b/gen/generator.py index 8b3a189..e49606d 100644 --- a/gen/generator.py +++ b/gen/generator.py @@ -152,6 +152,7 @@ class Generator(object): rates_arg = [] etoe_arg = [] exec_arg = [] + discount_arg = [] cluster_arg = [] clustersz_arg = [] wss_arg = [] @@ -164,6 +165,7 @@ class Generator(object): cluster_arg_t = [] graph_desc_arg_t = [] exec_arg_t = [] + discount_arg_t = [] rates_arg_t = [] wss_arg_t = [] split_arg_t = [] @@ -174,6 +176,9 @@ class Generator(object): cluster_arg_t.append('node_' + str(n.id) + ':' + str(n.task.partition)) cost_str = format(n.task.cost/1000.0, '.4f').rstrip('0').rstrip('.') exec_arg_t.append('node_' + str(n.id) + ':' + cost_str) + if n.task.cost_discount > 10: + discount_str = format(n.task.cost_discount/1000.0, '.4f').rstrip('0').rstrip('.') + discount_arg_t.append('node_' + str(n.id) + ':' + discount_str) if n.task.split != 1: split_arg_t.append('node_' + str(n.id) + ':' + str(n.task.split)) if n.isSrc == True: @@ -193,12 +198,14 @@ class Generator(object): cluster_arg_t = ','.join(cluster_arg_t) graph_desc_arg_t = ','.join(graph_desc_arg_t) exec_arg_t = ','.join(exec_arg_t) + discount_arg_t = ','.join(discount_arg_t) wss_arg_t = ','.join(wss_arg_t) split_arg_t = ','.join(split_arg_t) rates_arg_t = ','.join(rates_arg_t) cluster_arg.append(cluster_arg_t) exec_arg.append(exec_arg_t) + discount_arg.append(discount_arg_t) graph_desc_arg.append(graph_desc_arg_t) wss_arg.append(wss_arg_t) split_arg.append(split_arg_t) @@ -223,6 +230,8 @@ class Generator(object): pgm_args_t = ''; pgm_args_t += '--wait --cluster ' + cluster_arg[i] + ' --clusterSize ' + clustersz_arg[i] pgm_args_t += ' --graph ' + graph_desc_arg[i] + ' --rates ' + rates_arg[i] + ' --execution ' + exec_arg[i] + if len(discount_arg[i]) != 0: + pgm_args_t += ' --discount ' + discount_arg[i] if len(split_arg[i]) != 0: pgm_args_t += ' --split ' + split_arg[i] if len(wss_arg[i]) != 0: -- cgit v1.2.2