import gen.rv as rv import os import sys import copy import math import itertools import pprint import schedcat.generator.tasks as tasks import shutil as sh import ecrts14.topology as topology import ecrts14.graph as graph from Cheetah.Template import Template from common import get_config_option,num_cpus,recordtype from config.config import FILES,PARAMS from gen.dp import DesignPointGenerator from ecrts14.generator import DesignPointGenerator as PgmDesignPointGenerator from parse.col_map import ColMapBuilder from numpy import arange from schedcat.util.storage import storage from ecrts14.machines import machines from ecrts14.ecrts14 import NAMED_NUM_GRAPHS, NAMED_SHAPES, NAMED_HEIGHT_FACTORS, NAMED_FAN, NAMED_EDGE_HOP, NAMED_EDGE_WSS NAMED_PERIODS = { 'harmonic' : rv.uniform_choice([25, 50, 100, 200]), 'uni-short' : rv.uniform_int( 3, 33), 'uni-moderate' : rv.uniform_int(10, 100), 'uni-long' : rv.uniform_int(50, 250), } NAMED_UTILIZATIONS = { 'uni-very-light': rv.uniform(0.0001, 0.001), 'uni-light' : rv.uniform(0.001, 0.1), 'uni-medium' : rv.uniform( 0.1, 0.4), 'uni-heavy' : rv.uniform( 0.5, 0.9), 'uni-mixed' : rv.uniform(0.001, .4), 'exp-light' : rv.exponential(0, 1, 0.10), 'exp-medium' : rv.exponential(0, 1, 0.25), 'exp-heavy' : rv.exponential(0, 1, 0.50), 'bimo-light' : rv.multimodal([(rv.uniform(0.001, 0.5), 8), (rv.uniform( 0.5, 0.9), 1)]), 'bimo-medium' : rv.multimodal([(rv.uniform(0.001, 0.5), 6), (rv.uniform( 0.5, 0.9), 3)]), 'bimo-heavy' : rv.multimodal([(rv.uniform(0.001, 0.5), 4), (rv.uniform( 0.5, 0.9), 5)]), } '''Components of Cheetah template for schedule file''' TP_RM = """#if $release_master release_master{0} #end if""" GenOptionT = recordtype('GenOption', ['name', 'types', 'default', 'help', 'hidden']) def GenOption(name, types, default, help, hidden = False): return GenOptionT(name, types, default, help, hidden) class Generator(object): '''Creates all combinations @options specified by @params. This class also performs checks of parameter values and prints out help. All subclasses must implement _create_exp. ''' def __init__(self, scheduler, templates, options, params): self.options = self.__make_options(params) + options self.__setup_params(params) self.params = params self.template = "\n".join([TP_RM] + templates) self.scheduler = scheduler def __make_options(self, params): '''Return generic Litmus options.''' # Guess defaults using the properties of this computer if 'cpus' in params: cpus = min(map(int, params['cpus'])) else: cpus = num_cpus() try: rm_config = get_config_option("RELEASE_MASTER") and True except: rm_config = False release_master = list(set([False, bool(rm_config)])) return [GenOption('tasks', int, [0], 'Number of tasks'), GenOption('cpus', int, [cpus], 'Number of processors on target system.'), GenOption('release_master', [True,False], release_master, 'Redirect release interrupts to a single CPU.')] @staticmethod def _dist_option(name, default, distribution, help): return GenOption(name, [str, float, type([])] + distribution.keys(), default, help) def _create_dist(self, name, value, named_dists): '''Attempt to create a distribution representing the data in @value. If @value is a string, use it as a key for @named_dists.''' # A list of values if type(value) == type([]): map(lambda x : self.__check_value(name, x, [float, int]), value) return rv.uniform_choice(value) elif type(value) in [float, int]: return lambda : value elif named_dists and value in named_dists: return named_dists[value] else: raise ValueError("Invalid %s value: %s" % (name, value)) def _create_taskset(self, params, periods, utils, max_util = None): tg = tasks.TaskGenerator(period=periods, util=utils) ts = [] tries = 0 while len(ts) != params['tasks'] and tries < 100: ts = tg.make_task_set(max_tasks = params['tasks'], max_util=max_util) tries += 1 if len(ts) != params['tasks']: print(("Only created task set of size %d < %d for params %s. " + "Switching to light utilization.") % (len(ts), params['tasks'], params)) print("Switching to light util. This usually means the " + "utilization distribution is too agressive.") return self._create_taskset(params, periods, NAMED_UTILIZATIONS['uni-light'], max_util) return ts def _write_schedule(self, params): '''Write schedule file using current template for @params.''' sched_file = self.out_dir + "/" + FILES['sched_file'] with open(sched_file, 'wa') as f: f.write(str(Template(self.template, searchList=[params]))) def _write_graph(self, graph, pgm_params): graph_file = self.out_dir + ("/graph_%d.dot" % graph.id) with open(graph_file, 'w') as f: f.write(graph.dot()) def _write_pgm_schedule(self, pgm_params): '''Write schedule file using current template for @params.''' # make pgmrt arguments using graphs and tasks. sched_file = self.out_dir + "/" + FILES['sched_file'] # task set is in microseconds. we must convert to milliseconds graph_desc_arg = [] rates_arg = [] etoe_arg = [] exec_arg = [] discount_arg = [] cluster_arg = [] clustersz_arg = [] wss_arg = [] wss_cycle_arg = [] split_arg = [] for g in pgm_params['graphs']: self._write_graph(g, pgm_params) cluster_arg_t = [] graph_desc_arg_t = [] exec_arg_t = [] discount_arg_t = [] rates_arg_t = [] wss_arg_t = [] split_arg_t = [] for n in g.nodes: # task set is in microseconds. we must convert to milliseconds cluster_arg_t.append('node_' + str(n.id) + ':' + str(n.task.partition)) cost_str = format(n.task.cost/1000.0, '.4f').rstrip('0').rstrip('.') exec_arg_t.append('node_' + str(n.id) + ':' + cost_str) if n.task.cost_discount > 10: discount_str = format(n.task.cost_discount/1000.0, '.4f').rstrip('0').rstrip('.') discount_arg_t.append('node_' + str(n.id) + ':' + discount_str) if n.task.split != 1: split_arg_t.append('node_' + str(n.id) + ':' + str(n.task.split)) if n.isSrc == True: # assume that x=1 period_str = format(n.task.period/1000.0, '.4f').rstrip('0').rstrip('.') rates_arg_t.append('node_' + str(n.id) + ':1:' + period_str) if len(g.nodes) == 1: graph_desc_arg_t.append('node_' + str(n.id)) for succ in n.succ: graph_desc_arg_t.append('node_' + str(n.id) + ':node_' + str(succ.id)) # wss parameter for e in n.outEdges: wss_kb_str = format(e.wss, '.4f').rstrip('0').rstrip('.') wss_arg_t.append('node_' + str(n.id) + ':node_' + str(e.s.id) + ':' + wss_kb_str) # combine arguments to a comma-separated string cluster_arg_t = ','.join(cluster_arg_t) graph_desc_arg_t = ','.join(graph_desc_arg_t) exec_arg_t = ','.join(exec_arg_t) discount_arg_t = ','.join(discount_arg_t) wss_arg_t = ','.join(wss_arg_t) split_arg_t = ','.join(split_arg_t) rates_arg_t = ','.join(rates_arg_t) cluster_arg.append(cluster_arg_t) exec_arg.append(exec_arg_t) discount_arg.append(discount_arg_t) graph_desc_arg.append(graph_desc_arg_t) wss_arg.append(wss_arg_t) split_arg.append(split_arg_t) rates_arg.append(rates_arg_t) clustersz_arg.append(str(pgm_params['cpus'] / pgm_params['nr_clusters'])) # Use a wss cycle of 1.5x the graph depth to avoid cache # contention among tasks. This mimics how a real data-flow # program would work: buffers get passed down the graph # and later reused after processing by the last node. wss_cycle_arg.append(str(int(math.ceil(g.depth * 1.5)))) # get the ideal end-to-end response time etoe = graph.compute_hrt_ideal_graph_latency(g) etoe_arg.append(format(etoe/1000.0, '.4f').rstrip('0').rstrip('.')) pgm_args = [] for i in range(len(pgm_params['graphs'])): pgm_args_t = ''; pgm_args_t += '--wait --cluster ' + cluster_arg[i] + ' --clusterSize ' + clustersz_arg[i] pgm_args_t += ' --graph ' + graph_desc_arg[i] + ' --rates ' + rates_arg[i] + ' --execution ' + exec_arg[i] if len(discount_arg[i]) != 0: pgm_args_t += ' --discount ' + discount_arg[i] if len(split_arg[i]) != 0: pgm_args_t += ' --split ' + split_arg[i] if len(wss_arg[i]) != 0: pgm_args_t += ' --wss ' + wss_arg[i] pgm_args_t += ' --wsCycle ' + wss_cycle_arg[i] pgm_args_t += ' --etoe ' + etoe_arg[i] # last argument must always be duration. actual duration given by run_exps.py pgm_args_t += ' --duration' pgm_args.append(pgm_args_t) with open(sched_file, 'wa') as f: f.write(str(Template(self.template, searchList=[pgm_params])) + '\n') for s in pgm_args: f.write(s + '\n') def _write_params(self, params): '''Write out file with relevant parameters.''' # Don't include this in the parameters. It will be automatically added # in run_exps.py if 'system' in params: del params['system'] if 'tasks' in params: tasks = params.pop('tasks') else: tasks = 0 exp_params_file = self.out_dir + "/" + FILES['params_file'] with open(exp_params_file, 'wa') as f: params['scheduler'] = self.scheduler pprint.pprint(params, f) if tasks: params['tasks'] = tasks def __setup_params(self, params): '''Set default parameter values and check that values are valid.''' for option in self.options: if option.name not in params: val = option.default val = val if type(val) == type([]) else [val] params[option.name] = val else: option.hidden = True params[option.name] = self._check_value(option.name, option.types, params[option.name]) return params def _check_value(self, name, types, val): '''Raise an exception if the value of type of @val is not specified in @types. Returns a copy of @val with strings converted to raw Python types, if possible.''' if types == float: types = [float, int] if type(types) != type([]): types = [types] if type(val) != type([]): val = [val] retval = [] for v in val: # Has to be a better way to find this v = False if v in ['f', 'False', 'false', 'n', 'no'] else v v = True if v in ['t', 'True', 'true', 'y', 'yes'] else v if type(v) not in types and v not in types: # Try and convert v to one of the specified types parsed = None for t in types: try: parsed = t(v) break except: pass if parsed: retval += [parsed] else: raise TypeError("Invalid %s value: '%s'" % (name, v)) else: retval += [v] return retval def _create_exp(self, exp_params, out_dir): '''Overridden by subclasses.''' raise NotImplementedError def create_exps(self, out_dir, force, trials): '''Create experiments for all possible combinations of params in @out_dir. Overwrite existing files if @force is True.''' builder = ColMapBuilder() # Track changing values so only relevant parameters are included # in directory names for dp in DesignPointGenerator(self.params): for k, v in dp.iteritems(): builder.try_add(k, v) col_map = builder.build() for dp in DesignPointGenerator(self.params): for trial in xrange(trials): # Create directory name from relevant parameters dir_leaf = "sched=%s_%s" % (self.scheduler, col_map.encode(dp)) dir_leaf = dir_leaf.strip('_') # If there are none dir_leaf += ("_trial=%s" % trial) if trials > 1 else "" dir_path = "%s/%s" % (out_dir, dir_leaf.strip('_')) if os.path.exists(dir_path): if force: sh.rmtree(dir_path) else: print("Skipping existing experiment: '%s'" % dir_path) continue os.mkdir(dir_path) if trials > 1: dp[PARAMS['trial']] = trial self.out_dir = dir_path self._create_exp(dict(dp)) del(self.out_dir) if PARAMS['trial'] in dp: del dp[PARAMS['trial']] HELP_INDENT = 17 def create_pgm_exps(self, opts): '''Create experiments for all possible combinations of params in @out_dir. Overwrite existing files if @force is True.''' builder = ColMapBuilder() out_dir = opts.out_dir force = opts.force trials = opts.trials # Hardcoded design points exp = storage() exp.host = ['ludwig'] cpus = 24.0 exp.processors = [cpus] exp.wcycle = [ 0 ] exp.walk = ['seq'] exp.huge_pages = [False] exp.uncached = [False] exp.sched = ['edf'] exp.update(self.params) # extract the parameters we want to test the same task set under polluter_method = exp['polluters'] split_method = exp['job_splitting'] del exp['polluters'] del exp['job_splitting'] # Track changing values so only relevant parameters are included # in directory names for dp in PgmDesignPointGenerator(exp): for k, v in dp.iteritems(): builder.try_add(k, v) col_map = builder.build() # extract the parameters we want to test the same task set under partition_method = exp['partitions'] cluster_method = exp['clustering'] del exp['partitions'] del exp['clustering'] shared_params = [] for part, clust, pol, splt in list(itertools.product(partition_method, cluster_method, polluter_method, split_method)): if clust == 'ALL' and part != 'no_cache': # skip over partition methods when there is no clustering/partitioning continue p = storage() p.partitioning = part p.clustering = clust # convert from string to bool p.polluting = True if pol == 'True' else False p.splitting = True if splt == 'True' else False shared_params.append(p) for _dp in PgmDesignPointGenerator(exp): # TODO: Find out why fan_in_cap is set to a string. >:( # Force it to be int. for i,c in enumerate(_dp.fan_in_cap): _dp.fan_in_cap = int(c) for trial in xrange(trials): dp = copy.deepcopy(_dp) dp.num_graphs = NAMED_NUM_GRAPHS[dp.num_graphs] dp.depth_factor = NAMED_HEIGHT_FACTORS[dp.depth_factor] dp.node_placement = NAMED_SHAPES[dp.node_placement] dp.fan_out = NAMED_FAN[dp.fan_out] dp.edge_distance = NAMED_EDGE_HOP[dp.edge_distance] dp.nr_source = graph.uniform(opts.nr_source, opts.nr_source) dp.nr_sink = graph.uniform(opts.nr_sink, opts.nr_sink) dp.wss = NAMED_EDGE_WSS[dp.wss] last_failed = '' tries = 0 success = False max_tries = 100 while tries < max_tries and not success: created_dirs = [] tries += 1 if tries > 1: print('Retrying...') # Generate a task set ts, graphs, subts = self._create_tasks(dp) dp.tasks = len(ts) try: for shp in shared_params: dp['level'] = shp.clustering _dp['level'] = shp.clustering # load in the shared parameters dp.partitions = shp.partitioning dp.cluster = shp.clustering dp.polluters = shp.polluting dp.job_splitting = shp.splitting # Create directory name from relevant parameters temp = dp.wss # slam the wss parameter to get a text-based name dp.wss = _dp.wss dir_parts = [] dir_parts.append("sched=%s" % self.scheduler) dir_parts.append("cluster=%s" % shp.clustering) dir_parts.append("polluterovh=%s" % shp.polluting) dir_parts.append("splitting=%s" % shp.splitting) others = col_map.encode(dp) if others != "": dir_parts.append(others) if trials > 1: dir_parts.append("trial=%d" % trial) dir_leaf = "_".join(dir_parts) dir_path = "%s/%s" % (out_dir, dir_leaf) print("Generating %s" % dir_leaf) dp.wss = temp if os.path.exists(dir_path): if force: sh.rmtree(dir_path) else: print("Skipping existing experiment: '%s'" % dir_path) continue os.mkdir(dir_path) created_dirs.append(dir_path) if trials > 1: dp[PARAMS['trial']] = trial _dp[PARAMS['trial']] = trial self.out_dir = dir_path _dp.system = topology.Topology(machines[dp.host]) _dp.partitions = dp.partitions _dp.polluters = dp.polluters _dp.job_splitting = dp.job_splitting # Write a sched.py and param.py for each partition method ret = self._create_exp(_dp, ts, graphs, subts) if not ret: print(" Generated unschedulable ts for " + dir_leaf) last_failed = dir_leaf raise Exception("Unschedulable.") del(self.out_dir) if PARAMS['trial'] in dp: del dp[PARAMS['trial']] del _dp[PARAMS['trial']] success = True except Exception, e: print e for d in created_dirs: sh.rmtree(d) if not success: print("Failed to generate experiment (%s). Try count = %d" % (last_failed, tries)) def print_help(self): display_options = [o for o in self.options if not o.hidden] s = str(Template("""scheduler $scheduler: #for $o in $options $o.name -- $o.help \tDefault: $o.default \tAllowed: $o.types #end for""", searchList={'scheduler':self.scheduler, 'options':display_options})) # Has to be an easier way to print this out... for line in s.split("\n"): res = [] i = 0 for word in line.split(", "): i += len(word) res += [word] if i > 80 and len(word) < 80: print(", ".join(res[:-1])) res = [" "*Generator.HELP_INDENT +res[-1]] i = Generator.HELP_INDENT + len(word) print(", ".join(res)) generators = {} def register_generator(name, clazz): generators[name] = clazz def get_generators(): return generators