/** * Start simulated task trackers based on topology. * @param clusterStory The cluster topology. * @param now * time stamp when the simulator is started, {@link SimulatorTaskTracker}s * are started shortly after this time stamp */ void startTaskTrackers(ClusterStory clusterStory, long now) { /** port assigned to TTs, incremented by 1 for each TT */ int port = 10000; long ms = now + 100; for (MachineNode node : clusterStory.getMachines()) { String hostname = node.getName(); RackNode rackNode = node.getRackNode(); StaticMapping.addNodeToRack(hostname, rackNode.getName()); String taskTrackerName = "tracker_" + hostname + ":localhost/127.0.0.1:" + port; port++; SimulatorTaskTracker tt = new SimulatorTaskTracker(jt, taskTrackerName, hostname, node.getMapSlots(), node.getReduceSlots()); queue.addAll(tt.init(ms++)); } }
/** * Start simulated task trackers based on topology. * @param clusterStory the cluster topology. * @param jobConf configuration object. * @param now * time stamp when the simulator is started, {@link SimulatorTaskTracker}s * are started uniformly randomly spread in [now,now+startDuration). * @return time stamp by which the entire cluster is booted up and all task * trackers are sending hearbeats in their steady rate. */ long startTaskTrackers(ClusterStory cluster, JobConf jobConf, long now) { /** port assigned to TTs, incremented by 1 for each TT */ int port = 10000; int numTaskTrackers = 0; Random random = new Random(RandomSeedGenerator.getSeed( "forStartTaskTrackers()", masterRandomSeed)); final int startDuration = jobConf.getInt("mumak.cluster.startup.duration", DEFAULT_CLUSTER_STARTUP_DURATION); for (MachineNode node : cluster.getMachines()) { jobConf.set("mumak.tasktracker.host.name", node.getName()); jobConf.set("mumak.tasktracker.tracker.name", "tracker_" + node.getName() + ":localhost/127.0.0.1:" + port); long subRandomSeed = RandomSeedGenerator.getSeed( "forTaskTracker" + numTaskTrackers, masterRandomSeed); jobConf.setLong("mumak.tasktracker.random.seed", subRandomSeed); numTaskTrackers++; port++; SimulatorTaskTracker tt = new SimulatorTaskTracker(jt, jobConf); long firstHeartbeat = now + random.nextInt(startDuration); queue.addAll(tt.init(firstHeartbeat)); } // In startDuration + heartbeat interval of the full cluster time each // TT is started up and told on its 2nd heartbeat to beat at a rate // corresponding to the steady state of the cluster long clusterSteady = now + startDuration + jt.getNextHeartbeatInterval(); return clusterSteady; }
/** * Initiate components in the simulation. The JobConf is * create separately and passed to the init(). * @param JobConf: The configuration for the jobtracker. * @throws InterruptedException * @throws IOException if trace or topology files cannot be opened. */ @SuppressWarnings("deprecation") void init(JobConf jobConf) throws InterruptedException, IOException { FileSystem lfs = FileSystem.getLocal(getConf()); Path logPath = new Path(System.getProperty("hadoop.log.dir")).makeQualified(lfs); jobConf.set("mapred.system.dir", logPath.toString()); jobConf.set("hadoop.job.history.location", (new Path(logPath, "history") .toString())); // start time for virtual clock // possible improvement: set default value to sth more meaningful based on // the 1st job long now = getTimeProperty(jobConf, "mumak.start.time", System.currentTimeMillis()); jt = SimulatorJobTracker.startTracker(jobConf, now, this); jt.offerService(); masterRandomSeed = jobConf.getLong("mumak.random.seed", System.nanoTime()); // max Map/Reduce tasks per node int maxMaps = getConf().getInt( "mapred.tasktracker.map.tasks.maximum", SimulatorTaskTracker.DEFAULT_MAP_SLOTS); int maxReduces = getConf().getInt( "mapred.tasktracker.reduce.tasks.maximum", SimulatorTaskTracker.DEFAULT_REDUCE_SLOTS); MachineNode defaultNode = new MachineNode.Builder("default", 2) .setMapSlots(maxMaps).setReduceSlots(maxReduces).build(); LoggedNetworkTopology topology = new ClusterTopologyReader(new Path( topologyFile), jobConf).get(); // Setting the static mapping before removing numeric IP hosts. setStaticMapping(topology); if (getConf().getBoolean("mumak.topology.filter-numeric-ips", true)) { removeIpHosts(topology); } ZombieCluster cluster = new ZombieCluster(topology, defaultNode); // create TTs based on topology.json long firstJobStartTime = startTaskTrackers(cluster, jobConf, now); long subRandomSeed = RandomSeedGenerator.getSeed("forSimulatorJobStoryProducer", masterRandomSeed); JobStoryProducer jobStoryProducer = new SimulatorJobStoryProducer( new Path(traceFile), cluster, firstJobStartTime, jobConf, subRandomSeed); final SimulatorJobSubmissionPolicy submissionPolicy = SimulatorJobSubmissionPolicy .getPolicy(jobConf); jc = new SimulatorJobClient(jt, jobStoryProducer, submissionPolicy); queue.addAll(jc.init(firstJobStartTime)); //if the taskScheduler is CapacityTaskScheduler start off the JobInitialization //threads too if (jobConf.get("mapred.jobtracker.taskScheduler").equals (CapacityTaskScheduler.class.getName())) { LOG.info("CapacityScheduler used: starting simulatorThreads"); startSimulatorThreadsCapSched(now); } terminateTime = getTimeProperty(jobConf, "mumak.terminate.time", Long.MAX_VALUE); }