From 57d87fed3fafd55ae34ddd35cfbeea8fb9c0d26e Mon Sep 17 00:00:00 2001 From: Glenn Elliott Date: Thu, 2 Jan 2014 10:50:27 -0500 Subject: Script updates. --- cache_cost.py | 36 +++++---- distill_co.py | 207 +++++++++++++++++++++++++++++++++++++++++++++++ distill_pco.py | 209 ------------------------------------------------ distill_pmo.py | 41 +++++----- gen/__init__.py | 1 + gen/edf_generators.py | 40 +++++++++ produce_consume_cost.py | 31 ++++--- 7 files changed, 309 insertions(+), 256 deletions(-) create mode 100755 distill_co.py delete mode 100755 distill_pco.py diff --git a/cache_cost.py b/cache_cost.py index 9f4e54a..9c8f2ac 100755 --- a/cache_cost.py +++ b/cache_cost.py @@ -4,6 +4,7 @@ import os import copy import sys import string +import pwd import smtplib import socket import time @@ -114,6 +115,15 @@ def run_exp(nsamples, ncpu, numa_args, wss, wcycle, sleep_range_ms, walk, do_pol print str(p) print str(probe) +def send_email(_to, _msg): + user = pwd.getpwuid(os.getuid())[0] + host = socket.gethostname() + _from = "%s@%s" % (user, host) + _body = "\r\n".join(["From: %s" % _from, "To: %s" % _to, "Subject: Test Completed!", "", "{}"]) + mail = smtplib.SMTP("localhost") + mail.sendmail(_from, [_to], _body.format(_msg)) + mail.quit() + def main(argv): nsamples = 5000 @@ -128,12 +138,12 @@ def main(argv): # # todo: configure numa args for system automatically ncpu_and_numa_args = { - 6: ['--cpunodebind=0', '--interleave=0'] -# 12: ['--cpunodebind=0,1', '--interleave=0,1'] + 24: [] } wss_kb = [4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 3072, 4096, 8192, 12288, 16384] - write_cycle = [0, 64, 16, 4, 2, 1] + write_cycle = [16, 4, 2] +# write_cycle = [0] sleep_range_ms = [0,50] @@ -142,8 +152,11 @@ def main(argv): huge_pages = [False] pollute = [False, True] - walk = ['seq', 'rand'] -# walk = ['seq'] +# walk = ['seq', 'rand'] + walk = ['seq'] + + # disable rt throttling in linux scheduler + os.system("echo -1 > /proc/sys/kernel/sched_rt_runtime_us") for ncpu, numa_args in ncpu_and_numa_args.iteritems(): for u in uncache: @@ -158,16 +171,9 @@ def main(argv): run_exp(nsamples, ncpu, numa_args, wss, wcycle, sleep_range_ms, w, p, h, u) if email_notification: - _subject = "Cache Ovh Collection Complete!" - _to = "gelliott@cs.unc.edu" - _from = "gelliott@bonham.cs.unc.edu" - _text = "Cache Ovh Collection Complete!" - _body = string.join(("From: %s" % _from, "To: %s" % _to, "Subject: %s" % _subject, "", _text), "\r\n") - s = smtplib.SMTP("localhost") - s.sendmail(_from, [_to], _body) - s.quit() - - + to = "gelliott@cs.unc.edu" + msg = "Cache Ovh Collection Complete!" + send_email(to, msg) if __name__ == "__main__": diff --git a/distill_co.py b/distill_co.py new file mode 100755 index 0000000..d507f47 --- /dev/null +++ b/distill_co.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python + +import os +import re +import fnmatch +import shutil as sh +import sys +import csv +import numpy as np +from scipy.stats import scoreatpercentile +import bisect +from optparse import OptionParser + +from utils.machines import machines + +import utils.iqr + +class Topology: + ncpus, root, leaves, dist_mat = 0, None, None, None + levels = ['L1', 'L2', 'L3', 'Mem', 'System'] + + class Node: + idx, name, parent, children = 0, 'Unk', None, None + def __init__(self, idx, name, parent = None): + self.idx = idx + self.name = name + self.parent = parent + self.children = [] + def __repr__(self): + return self.name + '_' + str(self.idx) + + def __build_level_above(self, machine, l, child_nodes): + key = 'n' + l + if key in machine: + cluster_sz = machine[key] + else: + cluster_sz = 1 + nchildren = len(child_nodes) + nodes = [self.Node(idx, l) for idx in range(nchildren/cluster_sz)] + for i in range(len(child_nodes)): + child_nodes[i].parent = nodes[i/cluster_sz] + nodes[i/cluster_sz].children.append(child_nodes[i]) + return nodes + + def __find_dist(self, a, b): + if a != b: + # pass-through (ex. as CPU is to private L1) + if len(a.parent.children) == 1: + return self.__find_dist(a.parent, b.parent) + else: + return 1 + self.__find_dist(a.parent, b.parent) + return 0 + + def __build_dist_matrix(self): + dist_mat = np.empty([self.ncpus, self.ncpus], int) + for i in range(self.ncpus): + for j in range(i, self.ncpus): + dist_mat[i,j] = dist_mat[j,i] = self.__find_dist(self.leaves[i], self.leaves[j]) + return dist_mat + + def __init__(self, machine): + self.ncpus = machine['sockets']*machine['cores_per_socket'] + + # build the Topology bottom up + self.leaves = [self.Node(idx, 'CPU') for idx in range(self.ncpus)] + nodes = self.leaves + for l in self.levels: + nodes = self.__build_level_above(machine, l, nodes) + self.root = nodes + + self.dist_mat = self.__build_dist_matrix() + + + def __repr_level(self, node, stem, buf): + spacing = 3 + buf += stem + node.name + '_' + str(node.idx) + '\n' + for c in node.children: + buf = self.__repr_level(c, stem + ' '*spacing, buf) + return buf + + def __repr__(self): + buf = self.__repr_level(self.root[0], '', '') + return buf + + def distance(self, a, b): + return self.dist_mat[a,b] + + +topologies = {} +def get_topo(host): + if host in topologies: + return topologies[host] + else: + topo = Topology(machines[host]) + topologies[host] = topo + return topo + +def non_polluter_filename(csv_file): + return re.sub(r"polluters=True", r"polluters=False", csv_file) + +# find the max/avg/std of preemption and migration +def process_cpmd(csv_file, params): + + if 'pco' not in params: + raise Exception(('not producer/consumer overhead file: %s)') % csv_file) + + topo = get_topo(params['host']) + + print 'processing ' + csv_file + + ifile = open(csv_file, "r") + bestcase = open(non_polluter_filename(csv_file), "r") + + reader = csv.reader(ifile) + bc_reader = csv.reader(bestcase) + costs = {} + + SAMPLE = 0 + WSS = 1 + DELAY = 2 + LAST_CPU = 3 + NEXT_CPU = 4 + DIST = 5 + PRODUCE_COLD = 6 + PRODUCE_HOT = 7 + CONSUME_COLD = 8 + CONSUME_HOT = 9 + + for (row, bc_row) in zip(reader, bc_reader): + hot = min(int(row[CONSUME_HOT]), int(bc_row[CONSUME_HOT])) + after = int(row[CONSUME_COLD]) + cost = max(after - hot, 0) + distance = int(row[DIST]) + if distance not in costs: + costs[distance] = [] + costs[distance].append(cost) + + for d,c in costs.iteritems(): + arr = np.array(c, float) + arr = np.sort(arr) +# (arr, mincut, maxcut) = utils.iqr.apply_iqr(arr, 1.5) + for x in np.nditer(arr, op_flags=['readwrite']): + x[...] = utils.machines.cycles_to_us(params['host'], x) + costs[d] = arr + + stats = {} +# print costs + for d,arr in costs.iteritems(): + stats[d] = {'max':arr.max(), 'median':np.median(arr), 'mean':arr.mean(), 'std':arr.std()} + + return stats + +def parse_args(): + parser = OptionParser("usage: %prog [files...]") + return parser.parse_args() + +def safe_split(t, delim): + t = t.split(delim) + if len(t) == 1: + t = tuple([t[0], None]) + return t + +def get_level(machine, ncpus): + dist = get_topo(machine).distance(0, int(ncpus)-1) + names = ['L1', 'L2', 'L3', 'mem', 'sys'] + if dist <= len(names): + return names[dist] + else: + raise Exception("Unable to determine level.") + return '' + +def main(): + opts, args = parse_args() + + files = filter(os.path.exists, args) + + regex = fnmatch.translate("pco_*.csv") + csvs = re.compile(regex) + files = filter(csvs.search, files) + + results = {} + for f in files: + temp = os.path.basename(f).split(".csv")[0] + tokens = temp.split("_") + + params = {k:v for (k,v) in map(lambda x: safe_split(x, "="), tokens)} + common = tuple([params['host'], params['ncpu'], params['polluters'], params['walk'], params['hpages'], params['upages']]) + if common not in results: + results[common] = {} + results[common][int(params['wss'])] = process_cpmd(f, params) + +# print results + for common in results: + trends = results[common] + for t in ['max', 'median', 'mean']: + name = 'dco_host=%s_lvl=%s_polluters=%s_walk=%s_hpages=%s_upages=%s_type=%s.csv' % (common[0], get_level(common[0], common[1]), common[2], common[3], common[4], common[5], t) + f = open(name, 'w') + f.write('WSS,L1,L2,L3,MEM\n') + for w,stats in iter(sorted(trends.iteritems())): + f.write('%d' % w) + for i,data in iter(sorted(stats.iteritems())): + val = data[t] + f.write(',%.6f' % val) + f.write('\n') + +if __name__ == '__main__': + main() diff --git a/distill_pco.py b/distill_pco.py deleted file mode 100755 index c83ccde..0000000 --- a/distill_pco.py +++ /dev/null @@ -1,209 +0,0 @@ -#!/usr/bin/env python - -import os -import re -import fnmatch -import shutil as sh -import sys -import csv -import numpy as np -from scipy.stats import scoreatpercentile -import bisect -from optparse import OptionParser - -from utils.machines import machines - -import utils.iqr - -class Topology: - ncpus, root, leaves, dist_mat = 0, None, None, None - levels = ['L1', 'L2', 'L3', 'Mem', 'System'] - - class Node: - idx, name, parent, children = 0, 'Unk', None, None - def __init__(self, idx, name, parent = None): - self.idx = idx - self.name = name - self.parent = parent - self.children = [] - def __repr__(self): - return self.name + '_' + str(self.idx) - - def __build_level_above(self, machine, l, child_nodes): - key = 'n' + l - if key in machine: - cluster_sz = machine[key] - else: - cluster_sz = 1 - nchildren = len(child_nodes) - nodes = [self.Node(idx, l) for idx in range(nchildren/cluster_sz)] - for i in range(len(child_nodes)): - child_nodes[i].parent = nodes[i/cluster_sz] - nodes[i/cluster_sz].children.append(child_nodes[i]) - return nodes - - def __find_dist(self, a, b): - if a != b: - # pass-through (ex. as CPU is to private L1) - if len(a.parent.children) == 1: - return self.__find_dist(a.parent, b.parent) - else: - return 1 + self.__find_dist(a.parent, b.parent) - return 0 - - def __build_dist_matrix(self): - dist_mat = np.empty([self.ncpus, self.ncpus], int) - for i in range(self.ncpus): - for j in range(i, self.ncpus): - dist_mat[i,j] = dist_mat[j,i] = self.__find_dist(self.leaves[i], self.leaves[j]) - return dist_mat - - def __init__(self, machine): - self.ncpus = machine['sockets']*machine['cores_per_socket'] - - # build the Topology bottom up - self.leaves = [self.Node(idx, 'CPU') for idx in range(self.ncpus)] - nodes = self.leaves - for l in self.levels: - nodes = self.__build_level_above(machine, l, nodes) - self.root = nodes - - self.dist_mat = self.__build_dist_matrix() - - - def __repr_level(self, node, stem, buf): - spacing = 3 - buf += stem + node.name + '_' + str(node.idx) + '\n' - for c in node.children: - buf = self.__repr_level(c, stem + ' '*spacing, buf) - return buf - - def __repr__(self): - buf = self.__repr_level(self.root[0], '', '') - return buf - - def distance(self, a, b): - return self.dist_mat[a,b] - - -topologies = {} -def get_topo(host): - if host in topologies: - return topologies[host] - else: - topo = Topology(machines[host]) - topologies[host] = topo - return topo - -# find the max/avg/std of preemption and migration -def process_cpmd(csv_file, params): - - if 'pco' not in params: - raise Exception(('not producer/consumer overhead file: %s)') % csv_file) - - topo = get_topo(params['host']) - - print 'processing ' + csv_file - - ifile = open(csv_file, "r") - reader = csv.reader(ifile) - costs = {} - - SAMPLE = 0 - WSS = 1 - DELAY = 2 - LAST_CPU = 3 - NEXT_CPU = 4 - DIST = 5 - PRODUCE_COLD = 6 - PRODUCE_HOT = 7 - CONSUME_COLD = 8 - CONSUME_HOT = 9 - - for row in reader: - hot = int(row[CONSUME_HOT]) - after = int(row[CONSUME_COLD]) - cost = max(after - hot, 0) - distance = topo.distance(int(row[NEXT_CPU]), int(row[LAST_CPU])) - assert distance == int(row[DIST]) - if distance not in costs: - costs[distance] = [] - costs[distance].append(cost) - - for d,c in costs.iteritems(): - arr = np.array(c, float) - arr = np.sort(arr) - (arr, mincut, maxcut) = utils.iqr.apply_iqr(arr, 1.5) - for x in np.nditer(arr, op_flags=['readwrite']): - x[...] = utils.machines.cycles_to_us(params['host'], x) - costs[d] = arr - - stats = {} -# print costs - for d,arr in costs.iteritems(): - stats[d] = {'max':arr.max(), 'median':np.median(arr), 'mean':arr.mean(), 'std':arr.std()} - - return stats - -def parse_args(): - parser = OptionParser("usage: %prog [files...]") - return parser.parse_args() - -def safe_split(t, delim): - t = t.split(delim) - if len(t) == 1: - t = tuple([t[0], None]) - return t - -def get_level(machine, ncpus): - dist = get_topo(machine).distance(0, int(ncpus)-1) - names = ['L1', 'L2', 'L3', 'mem', 'sys'] - if dist <= len(names): - return names[dist] - else: - raise Exception("Unable to determine level.") - return '' - -def main(): - opts, args = parse_args() - - files = filter(os.path.exists, args) - - regex = fnmatch.translate("pco_*.csv") - csvs = re.compile(regex) - files = filter(csvs.search, files) - - results = {} - for f in files: - temp = os.path.basename(f).split(".csv")[0] - tokens = temp.split("_") - - params = {k:v for (k,v) in map(lambda x: safe_split(x, "="), tokens)} - common = tuple([params['host'], params['ncpu'], params['polluters'], params['walk'], params['hpages'], params['upages']]) - if common not in results: - results[common] = {} - results[common][int(params['wss'])] = process_cpmd(f, params) - -# print results - for common in results: - trends = results[common] - name = 'dpco_host=%s_lvl=%s_polluters=%s_walk=%s_hpages=%s_upages=%s.csv' % - (common[0], get_level(common[0], common[1]), common[2], common[3], common[4], common[5]) - f = open(name, 'w') - for w,stats in iter(sorted(trends.iteritems())): - f.write('%d' % w) - _mean = 0 - _max = 0 - for i,data in iter(sorted(stats.iteritems())): - dist_mean = data['mean'] - _mean = max(_mean, dist_mean) - f.write(', %.6f' % dist_mean) - f.write(', %.6f' % _mean) - for i,data in iter(sorted(stats.iteritems())): - dist_max = data['max'] - _max = max(_max, dist_max) - f.write(', %.6f' % dist_max) - f.write(', %.6f\n' % _max) - -if __name__ == '__main__': - main() diff --git a/distill_pmo.py b/distill_pmo.py index 9821dd5..3897b7f 100755 --- a/distill_pmo.py +++ b/distill_pmo.py @@ -95,6 +95,9 @@ def get_topo(host): topologies[host] = topo return topo +def non_polluter_filename(csv_file): + return re.sub(r"polluters=True", r"polluters=False", csv_file) + # find the max/avg/std of preemption and migration def process_cpmd(csv_file, params): @@ -106,7 +109,10 @@ def process_cpmd(csv_file, params): print 'processing ' + csv_file ifile = open(csv_file, "r") + bestcase = open(non_polluter_filename(csv_file), "r") + reader = csv.reader(ifile) + bc_reader = csv.reader(bestcase) costs = {} SAMPLE = 0 @@ -122,12 +128,14 @@ def process_cpmd(csv_file, params): HOT3 = 10 AFTER_RESUME = 11 - for row in reader: + for (row, bc_row) in zip(reader, bc_reader): + # read the 'hot' value from both cases to iron out anomolies hot = min(int(row[HOT1]), int(row[HOT2]), int(row[HOT3])) + bc_hot = min(int(bc_row[HOT1]), int(bc_row[HOT2]), int(bc_row[HOT3])) + hot = min(hot, bc_hot) after = int(row[AFTER_RESUME]) cost = max(after - hot, 0) - distance = topo.distance(int(row[NEXT_CPU]), int(row[LAST_CPU])) - assert distance == int(row[DIST]) + distance = int(row[DIST]) if distance not in costs: costs[distance] = [] costs[distance].append(cost) @@ -189,23 +197,16 @@ def main(): # print results for common in results: trends = results[common] - name = 'dpmo_host=%s_lvl=%s_wcycle=%s_polluters=%s_walk=%s_hpages=%s_upages=%s.csv' % - (common[0], get_level(common[0], common[1]), common[2], common[3], common[4], common[5], common[6]) - f = open(name, 'w') - for w,stats in iter(sorted(trends.iteritems())): - f.write('%d' % w) - _mean = 0 - _max = 0 - for i,data in iter(sorted(stats.iteritems())): - dist_mean = data['mean'] - _mean = max(_mean, dist_mean) - f.write(', %.6f' % dist_mean) - f.write(', %.6f' % _mean) - for i,data in iter(sorted(stats.iteritems())): - dist_max = data['max'] - _max = max(_max, dist_max) - f.write(', %.6f' % dist_max) - f.write(', %.6f\n' % _max) + for t in ['max', 'median', 'mean']: + name = 'dpmo_host=%s_lvl=%s_wcycle=%s_polluters=%s_walk=%s_hpages=%s_upages=%s_type=%s.csv' % (common[0], get_level(common[0], common[1]), common[2], common[3], common[4], common[5], common[6], t) + f = open(name, 'w') + f.write('WSS,L1,L2,L3,MEM\n') + for w,stats in iter(sorted(trends.iteritems())): + f.write('%d' % w) + for i,data in iter(sorted(stats.iteritems())): + val = data[t] + f.write(',%.6f' % val) + f.write('\n') if __name__ == '__main__': main() diff --git a/gen/__init__.py b/gen/__init__.py index 8c60b46..654ac19 100644 --- a/gen/__init__.py +++ b/gen/__init__.py @@ -4,3 +4,4 @@ import edf_generators as edf gen.register_generator("G-EDF", edf.GedfGenerator) gen.register_generator("P-EDF", edf.PedfGenerator) gen.register_generator("C-EDF", edf.CedfGenerator) +gen.register_generator("C-FL-split", edf.CflSplitGenerator) diff --git a/gen/edf_generators.py b/gen/edf_generators.py index 8e4b8df..ce99d05 100644 --- a/gen/edf_generators.py +++ b/gen/edf_generators.py @@ -6,6 +6,7 @@ TP_TBASE = """#for $t in $task_set #end for""" TP_GLOB_TASK = TP_TBASE.format("") TP_PART_TASK = TP_TBASE.format("-p $t.cpu") +TP_CLST_TASK = TP_TBASE.format("-p $t.cluster -z $t.cluster_sz") class EdfGenerator(gen.Generator): '''Creates sporadic task sets with the most common Litmus options.''' @@ -83,6 +84,45 @@ class CedfGenerator(PartitionedGenerator): [CedfGenerator.CLUSTER_OPTION], params) +class CflSplitGenerator(EdfGenerator): + TP_CLUSTER = "plugins/C-FL-split/cluster{$level}" + CLUSTER_OPTION = gen.GenOption('level', ['L1', 'L2', 'L3', 'ALL'], 'L2', + 'Cache clustering level.',) +# CLUSTER_SZ_OPTION = gen.GenOption('cluster_size', ['1', '2', '6', '24'], '2', +# 'Cluster size.',) + + def __init__(self, params={}): + super(CflSplitGenerator, self).__init__("C-FL-split", + [CflSplitGenerator.TP_CLUSTER, TP_CLST_TASK], + [CflSplitGenerator.CLUSTER_OPTION], + params) + + def _customize(self, taskset, exp_params): + cpus = int(exp_params['cpus']) +# cluster_sz = int(exp_params['cluster_size']) + + if exp_params['level'] == 'L1': + cluster_sz = 1 + elif exp_params['level'] == 'L2': + cluster_sz = 2 + elif exp_params['level'] == 'L3': + cluster_sz = 6 + elif exp_params['level'] == 'ALL': + cluster_sz = 24 + else: + assert False + + num_clusters = cpus / cluster_sz + assert num_clusters * cluster_sz == cpus + + utils = [0]*num_clusters + tasks = [0]*num_clusters + for t in taskset: + t.cluster = utils.index(min(utils)) + t.cluster_sz = cluster_sz + utils[t.cluster] += t.utilization() + tasks[t.cluster] += 1 + class GedfGenerator(EdfGenerator): def __init__(self, params={}): super(GedfGenerator, self).__init__("GSN-EDF", [TP_GLOB_TASK], diff --git a/produce_consume_cost.py b/produce_consume_cost.py index fd08a8d..b24ce76 100755 --- a/produce_consume_cost.py +++ b/produce_consume_cost.py @@ -4,6 +4,7 @@ import os import copy import sys import string +import pwd import smtplib import socket import time @@ -111,8 +112,17 @@ def run_exp(nsamples, ncpu, numa_args, wss, sleep_range_ms, walk, do_pollute, do print str(p) print str(probe) +def send_email(_to, _msg): + user = pwd.getpwuid(os.getuid())[0] + host = socket.gethostname() + _from = "%s@%s" % (user, host) + _body = "\r\n".join(["From: %s" % _from, "To: %s" % _to, "Subject: Test Completed!", "", "{}"]) + mail = smtplib.SMTP("localhost") + mail.sendmail(_from, [_to], _body.format(_msg)) + mail.quit() + def main(argv): - nsamples = 5000 + nsamples = 5000*4 # We may need to test different NUMA node configurations # according to memory interleaving across the NUMA topology. @@ -125,8 +135,9 @@ def main(argv): # # todo: configure numa args for system automatically ncpu_and_numa_args = { - 6: ['--cpunodebind=0', '--interleave=0'] +# 6: ['--cpunodebind=0', '--interleave=0'] # 12: ['--cpunodebind=0,1', '--interleave=0,1'] + 24: [] } wss_kb = [4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 3072, 4096, 8192, 12288, 16384] @@ -140,6 +151,9 @@ def main(argv): # walk = ['seq', 'rand'] walk = ['seq'] + # disable rt throttling in linux scheduler + os.system("echo -1 > /proc/sys/kernel/sched_rt_runtime_us") + for ncpu, numa_args in ncpu_and_numa_args.iteritems(): for u in uncache: for h in huge_pages: @@ -149,16 +163,9 @@ def main(argv): run_exp(nsamples, ncpu, numa_args, wss, sleep_range_ms, w, p, h, u) if email_notification: - _subject = "Producer/Consumer Ovh Collection Complete!" - _to = "gelliott@cs.unc.edu" - _from = "gelliott@bonham.cs.unc.edu" - _text = "Producer/Consumer Ovh Collection Complete!" - _body = string.join(("From: %s" % _from, "To: %s" % _to, "Subject: %s" % _subject, "", _text), "\r\n") - s = smtplib.SMTP("localhost") - s.sendmail(_from, [_to], _body) - s.quit() - - + to = "gelliott@cs.unc.edu" + msg = "Producer/Consumer Ovh Collection Complete!" + send_email(to, msg) if __name__ == "__main__": -- cgit v1.2.2