From de754b0a0a56b10eeb14305358f58a275eaa262c Mon Sep 17 00:00:00 2001 From: Glenn Elliott Date: Tue, 15 Oct 2013 20:29:05 -0400 Subject: producer/consumer overhead distillation script --- distill_pco.py | 209 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 209 insertions(+) create mode 100755 distill_pco.py diff --git a/distill_pco.py b/distill_pco.py new file mode 100755 index 0000000..c83ccde --- /dev/null +++ b/distill_pco.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python + +import os +import re +import fnmatch +import shutil as sh +import sys +import csv +import numpy as np +from scipy.stats import scoreatpercentile +import bisect +from optparse import OptionParser + +from utils.machines import machines + +import utils.iqr + +class Topology: + ncpus, root, leaves, dist_mat = 0, None, None, None + levels = ['L1', 'L2', 'L3', 'Mem', 'System'] + + class Node: + idx, name, parent, children = 0, 'Unk', None, None + def __init__(self, idx, name, parent = None): + self.idx = idx + self.name = name + self.parent = parent + self.children = [] + def __repr__(self): + return self.name + '_' + str(self.idx) + + def __build_level_above(self, machine, l, child_nodes): + key = 'n' + l + if key in machine: + cluster_sz = machine[key] + else: + cluster_sz = 1 + nchildren = len(child_nodes) + nodes = [self.Node(idx, l) for idx in range(nchildren/cluster_sz)] + for i in range(len(child_nodes)): + child_nodes[i].parent = nodes[i/cluster_sz] + nodes[i/cluster_sz].children.append(child_nodes[i]) + return nodes + + def __find_dist(self, a, b): + if a != b: + # pass-through (ex. as CPU is to private L1) + if len(a.parent.children) == 1: + return self.__find_dist(a.parent, b.parent) + else: + return 1 + self.__find_dist(a.parent, b.parent) + return 0 + + def __build_dist_matrix(self): + dist_mat = np.empty([self.ncpus, self.ncpus], int) + for i in range(self.ncpus): + for j in range(i, self.ncpus): + dist_mat[i,j] = dist_mat[j,i] = self.__find_dist(self.leaves[i], self.leaves[j]) + return dist_mat + + def __init__(self, machine): + self.ncpus = machine['sockets']*machine['cores_per_socket'] + + # build the Topology bottom up + self.leaves = [self.Node(idx, 'CPU') for idx in range(self.ncpus)] + nodes = self.leaves + for l in self.levels: + nodes = self.__build_level_above(machine, l, nodes) + self.root = nodes + + self.dist_mat = self.__build_dist_matrix() + + + def __repr_level(self, node, stem, buf): + spacing = 3 + buf += stem + node.name + '_' + str(node.idx) + '\n' + for c in node.children: + buf = self.__repr_level(c, stem + ' '*spacing, buf) + return buf + + def __repr__(self): + buf = self.__repr_level(self.root[0], '', '') + return buf + + def distance(self, a, b): + return self.dist_mat[a,b] + + +topologies = {} +def get_topo(host): + if host in topologies: + return topologies[host] + else: + topo = Topology(machines[host]) + topologies[host] = topo + return topo + +# find the max/avg/std of preemption and migration +def process_cpmd(csv_file, params): + + if 'pco' not in params: + raise Exception(('not producer/consumer overhead file: %s)') % csv_file) + + topo = get_topo(params['host']) + + print 'processing ' + csv_file + + ifile = open(csv_file, "r") + reader = csv.reader(ifile) + costs = {} + + SAMPLE = 0 + WSS = 1 + DELAY = 2 + LAST_CPU = 3 + NEXT_CPU = 4 + DIST = 5 + PRODUCE_COLD = 6 + PRODUCE_HOT = 7 + CONSUME_COLD = 8 + CONSUME_HOT = 9 + + for row in reader: + hot = int(row[CONSUME_HOT]) + after = int(row[CONSUME_COLD]) + cost = max(after - hot, 0) + distance = topo.distance(int(row[NEXT_CPU]), int(row[LAST_CPU])) + assert distance == int(row[DIST]) + if distance not in costs: + costs[distance] = [] + costs[distance].append(cost) + + for d,c in costs.iteritems(): + arr = np.array(c, float) + arr = np.sort(arr) + (arr, mincut, maxcut) = utils.iqr.apply_iqr(arr, 1.5) + for x in np.nditer(arr, op_flags=['readwrite']): + x[...] = utils.machines.cycles_to_us(params['host'], x) + costs[d] = arr + + stats = {} +# print costs + for d,arr in costs.iteritems(): + stats[d] = {'max':arr.max(), 'median':np.median(arr), 'mean':arr.mean(), 'std':arr.std()} + + return stats + +def parse_args(): + parser = OptionParser("usage: %prog [files...]") + return parser.parse_args() + +def safe_split(t, delim): + t = t.split(delim) + if len(t) == 1: + t = tuple([t[0], None]) + return t + +def get_level(machine, ncpus): + dist = get_topo(machine).distance(0, int(ncpus)-1) + names = ['L1', 'L2', 'L3', 'mem', 'sys'] + if dist <= len(names): + return names[dist] + else: + raise Exception("Unable to determine level.") + return '' + +def main(): + opts, args = parse_args() + + files = filter(os.path.exists, args) + + regex = fnmatch.translate("pco_*.csv") + csvs = re.compile(regex) + files = filter(csvs.search, files) + + results = {} + for f in files: + temp = os.path.basename(f).split(".csv")[0] + tokens = temp.split("_") + + params = {k:v for (k,v) in map(lambda x: safe_split(x, "="), tokens)} + common = tuple([params['host'], params['ncpu'], params['polluters'], params['walk'], params['hpages'], params['upages']]) + if common not in results: + results[common] = {} + results[common][int(params['wss'])] = process_cpmd(f, params) + +# print results + for common in results: + trends = results[common] + name = 'dpco_host=%s_lvl=%s_polluters=%s_walk=%s_hpages=%s_upages=%s.csv' % + (common[0], get_level(common[0], common[1]), common[2], common[3], common[4], common[5]) + f = open(name, 'w') + for w,stats in iter(sorted(trends.iteritems())): + f.write('%d' % w) + _mean = 0 + _max = 0 + for i,data in iter(sorted(stats.iteritems())): + dist_mean = data['mean'] + _mean = max(_mean, dist_mean) + f.write(', %.6f' % dist_mean) + f.write(', %.6f' % _mean) + for i,data in iter(sorted(stats.iteritems())): + dist_max = data['max'] + _max = max(_max, dist_max) + f.write(', %.6f' % dist_max) + f.write(', %.6f\n' % _max) + +if __name__ == '__main__': + main() -- cgit v1.2.2