/** * Initiate components in the simulation. * @throws InterruptedException * @throws IOException if trace or topology files cannot be open */ @SuppressWarnings("deprecation") void init() throws InterruptedException, IOException { long now = System.currentTimeMillis(); JobConf jobConf = new JobConf(getConf()); jobConf.setClass("topology.node.switch.mapping.impl", StaticMapping.class, DNSToSwitchMapping.class); jobConf.set("fs.default.name", "file:///"); jobConf.set("mapred.job.tracker", "localhost:8012"); jobConf.setInt("mapred.jobtracker.job.history.block.size", 512); jobConf.setInt("mapred.jobtracker.job.history.buffer.size", 512); jobConf.setLong("mapred.tasktracker.expiry.interval", 5000); jobConf.setInt("mapred.reduce.copy.backoff", 4); jobConf.setLong("mapred.job.reuse.jvm.num.tasks", -1); jobConf.setUser("mumak"); jobConf.set("mapred.system.dir", jobConf.get("hadoop.log.dir", "/tmp/hadoop-"+jobConf.getUser()) + "/mapred/system"); jobConf.set("mapred.jobtracker.taskScheduler", JobQueueTaskScheduler.class.getName()); FileSystem lfs = FileSystem.getLocal(getConf()); Path logPath = new Path(System.getProperty("hadoop.log.dir")).makeQualified(lfs); jobConf.set("mapred.system.dir", logPath.toString()); jobConf.set("hadoop.job.history.location", (new Path(logPath, "history") .toString())); jt = SimulatorJobTracker.startTracker(jobConf, now, this); jt.offerService(); // max Map/Reduce tasks per node int maxMaps = getConf().getInt("mapred.tasktracker.map.tasks.maximum", DEFAULT_MAP_SLOTS_PER_NODE); int maxReduces = getConf().getInt( "mapred.tasktracker.reduce.tasks.maximum", DEFAULT_REDUCE_SLOTS_PER_NODE); MachineNode defaultNode = new MachineNode.Builder("default", 2) .setMapSlots(maxMaps).setReduceSlots(maxReduces).build(); ZombieCluster cluster = new ZombieCluster(new Path(topologyFile), defaultNode, jobConf); long firstJobStartTime = now + 60000; JobStoryProducer jobStoryProducer = new SimulatorJobStoryProducer( new Path(traceFile), cluster, firstJobStartTime, jobConf); jc = new SimulatorJobClient(jt, jobStoryProducer); queue.addAll(jc.init(firstJobStartTime)); // create TTs based on topology.json startTaskTrackers(cluster, now); terminateTime = getConf().getLong("mumak.terminate.time", Long.MAX_VALUE); if (terminateTime <= 0) { throw new IllegalArgumentException("Terminate time must be positive: " + terminateTime); } }
/** * Initiate components in the simulation. The JobConf is * create separately and passed to the init(). * @param JobConf: The configuration for the jobtracker. * @throws InterruptedException * @throws IOException if trace or topology files cannot be opened. */ @SuppressWarnings("deprecation") void init(JobConf jobConf) throws InterruptedException, IOException { FileSystem lfs = FileSystem.getLocal(getConf()); Path logPath = new Path(System.getProperty("hadoop.log.dir")).makeQualified(lfs); jobConf.set("mapred.system.dir", logPath.toString()); jobConf.set("hadoop.job.history.location", (new Path(logPath, "history") .toString())); // start time for virtual clock // possible improvement: set default value to sth more meaningful based on // the 1st job long now = getTimeProperty(jobConf, "mumak.start.time", System.currentTimeMillis()); jt = SimulatorJobTracker.startTracker(jobConf, now, this); jt.offerService(); masterRandomSeed = jobConf.getLong("mumak.random.seed", System.nanoTime()); // max Map/Reduce tasks per node int maxMaps = getConf().getInt( "mapred.tasktracker.map.tasks.maximum", SimulatorTaskTracker.DEFAULT_MAP_SLOTS); int maxReduces = getConf().getInt( "mapred.tasktracker.reduce.tasks.maximum", SimulatorTaskTracker.DEFAULT_REDUCE_SLOTS); MachineNode defaultNode = new MachineNode.Builder("default", 2) .setMapSlots(maxMaps).setReduceSlots(maxReduces).build(); LoggedNetworkTopology topology = new ClusterTopologyReader(new Path( topologyFile), jobConf).get(); // Setting the static mapping before removing numeric IP hosts. setStaticMapping(topology); if (getConf().getBoolean("mumak.topology.filter-numeric-ips", true)) { removeIpHosts(topology); } ZombieCluster cluster = new ZombieCluster(topology, defaultNode); // create TTs based on topology.json long firstJobStartTime = startTaskTrackers(cluster, jobConf, now); long subRandomSeed = RandomSeedGenerator.getSeed("forSimulatorJobStoryProducer", masterRandomSeed); JobStoryProducer jobStoryProducer = new SimulatorJobStoryProducer( new Path(traceFile), cluster, firstJobStartTime, jobConf, subRandomSeed); final SimulatorJobSubmissionPolicy submissionPolicy = SimulatorJobSubmissionPolicy .getPolicy(jobConf); jc = new SimulatorJobClient(jt, jobStoryProducer, submissionPolicy); queue.addAll(jc.init(firstJobStartTime)); //if the taskScheduler is CapacityTaskScheduler start off the JobInitialization //threads too if (jobConf.get("mapred.jobtracker.taskScheduler").equals (CapacityTaskScheduler.class.getName())) { LOG.info("CapacityScheduler used: starting simulatorThreads"); startSimulatorThreadsCapSched(now); } terminateTime = getTimeProperty(jobConf, "mumak.terminate.time", Long.MAX_VALUE); }