FakeTaskInProgress(JobID jId, JobConf jobConf, Task t, boolean isMap, FakeJobInProgress job) { super(jId, "", JobSplit.EMPTY_TASK_SPLIT, job.jobtracker, jobConf, job, 0, 1); this.isMap = isMap; this.fakeJob = job; activeTasks = new TreeMap<TaskAttemptID, String>(); activeTasks.put(t.getTaskID(), "tt"); // create a fake status for a task that is running for a bit this.taskStatus = TaskStatus.createTaskStatus(isMap); taskStatus.setProgress(0.5f); taskStatus.setRunState(TaskStatus.State.RUNNING); if (jobConf.getMapSpeculativeExecution()) { //resetting of the hasSpeculativeMap is done //when speculative map is scheduled by the job. hasSpeculativeMap = true; } if (jobConf.getReduceSpeculativeExecution()) { //resetting of the hasSpeculativeReduce is done //when speculative reduce is scheduled by the job. hasSpeculativeReduce = true; } }
@Override public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize, int numUniqueHosts, int localityLevel) throws IOException { for (int map = 0; map < maps.length; map++) { FakeTaskInProgress tip = (FakeTaskInProgress) maps[map]; if (!tip.isRunning() && !tip.isComplete() && getLocalityLevel(tip, tts) < localityLevel) { TaskAttemptID attemptId = getTaskAttemptID(tip); JobSplit.TaskSplitMetaInfo split = JobSplit.EMPTY_TASK_SPLIT; Task task = new MapTask("", attemptId, 0, split.getSplitIndex(), 1) { @Override public String toString() { return String.format("%s on %s", getTaskID(), tts.getTrackerName()); } }; runningMapTasks++; tip.createTaskAttempt(task, tts.getTrackerName()); nonLocalRunningMaps.add(tip); taskTrackerManager.startTask(tts.getTrackerName(), task, tip); return task; } } return null; }
private static SplitMetaInfo[] writeOldSplits(org.apache.hadoop.mapred.InputSplit[] splits, FSDataOutputStream out, Configuration conf) throws IOException { SplitMetaInfo[] info = new SplitMetaInfo[splits.length]; if(splits.length != 0) { int i = 0; long offset = out.getPos(); for(org.apache.hadoop.mapred.InputSplit split: splits) { long prevLen = out.getPos(); Text.writeString(out, split.getClass().getName()); split.write(out); long currLen = out.getPos(); String[] locations = split.getLocations(); final int max_loc = conf.getInt(MAX_SPLIT_LOCATIONS, 10); if(locations.length > max_loc) { LOG.warn("Max block location exceeded for split: " + split + " splitsize: " + locations.length + " maxsize: " + max_loc); locations = Arrays.copyOf(locations, max_loc); } info[i++] = new JobSplit.SplitMetaInfo(locations, offset, split.getLength()); offset += currLen - prevLen; } } return info; }
private static void writeJobSplitMetaInfo(FileSystem fs, Path filename, FsPermission p, int splitMetaInfoVersion, JobSplit.SplitMetaInfo[] allSplitMetaInfo) throws IOException { // write the splits meta-info to a file for the job tracker FSDataOutputStream out = null; try { out = FileSystem.create(fs, filename, p); out.write(META_SPLIT_FILE_HEADER); WritableUtils.writeVInt(out, splitMetaInfoVersion); WritableUtils.writeVInt(out, allSplitMetaInfo.length); for(JobSplit.SplitMetaInfo splitMetaInfo: allSplitMetaInfo) { splitMetaInfo.write(out); } } finally { IOUtils.closeStream(out); } }
public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize, int numUniqueHosts, int localityLevel) throws IOException { for (int map = 0; map < maps.length; map++) { FakeTaskInProgress tip = (FakeTaskInProgress) maps[map]; if (!tip.isRunning() && !tip.isComplete() && getLocalityLevel(tip, tts) < localityLevel) { TaskAttemptID attemptId = getTaskAttemptID(tip); JobSplit.TaskSplitMetaInfo split = JobSplit.EMPTY_TASK_SPLIT; Task task = new MapTask("", attemptId, 0, split.getSplitIndex(), 1) { @Override public String toString() { return String.format("%s on %s", getTaskID(), tts.getTrackerName()); } }; runningMapTasks++; tip.createTaskAttempt(task, tts.getTrackerName()); nonLocalRunningMaps.add(tip); taskTrackerManager.startTask(tts.getTrackerName(), task, tip); return task; } } return null; }
public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize, int numUniqueHosts, int localityLevel) throws IOException { for (int map = 0; map < maps.length; map++) { HFSPFakeTaskInProgress tip = (HFSPFakeTaskInProgress) maps[map]; if (!tip.isRunning() && !tip.isComplete() && getLocalityLevel(tip, tts) < localityLevel) { TaskAttemptID attemptId = getTaskAttemptID(tip); JobSplit.TaskSplitMetaInfo split = JobSplit.EMPTY_TASK_SPLIT; Task task = new MapTask("", attemptId, 0, split.getSplitIndex(), 1) { @Override public String toString() { return String.format("%s on %s", getTaskID(), tts.getTrackerName()); } }; runningMapTasks++; tip.createTaskAttempt(task, tts.getTrackerName()); nonLocalRunningMaps.add(tip); taskTrackerManager.startTask(tts.getTrackerName(), task, tip); return task; } } return null; }
FakeTaskInProgress( JobID jId, JobConf jobConf, Task t, boolean isMap, FakeJobInProgress job, JobSplit.TaskSplitMetaInfo split) { super(jId, "", split, null, jobConf, job, 0, 1); this.isMap = isMap; this.fakeJob = job; activeTasks = new TreeMap<TaskAttemptID, String>(); activeTasks.put(t.getTaskID(), "tt"); // create a fake status for a task that is running for a bit this.taskStatus = TaskStatus.createTaskStatus(isMap); taskStatus.setProgress(0.5f); taskStatus.setRunState(TaskStatus.State.RUNNING); if (jobConf.getMapSpeculativeExecution()) { //resetting of the hasSpeculativeMap is done //when speculative map is scheduled by the job. hasSpeculativeMap = true; } if (jobConf.getReduceSpeculativeExecution()) { //resetting of the hasSpeculativeReduce is done //when speculative reduce is scheduled by the job. hasSpeculativeReduce = true; } }
private void verifyLocationHints(Path inputSplitsDir, List<TaskLocationHint> actual) throws Exception { JobID jobId = new JobID("dummy", 1); JobSplit.TaskSplitMetaInfo[] splitsInfo = SplitMetaInfoReader.readSplitMetaInfo(jobId, remoteFs, conf, inputSplitsDir); int splitsCount = splitsInfo.length; List<TaskLocationHint> locationHints = new ArrayList<TaskLocationHint>(splitsCount); for (int i = 0; i < splitsCount; ++i) { locationHints.add( TaskLocationHint.createTaskLocationHint(new HashSet<String>( Arrays.asList(splitsInfo[i].getLocations())), null) ); } Assert.assertEquals(locationHints, actual); }
public static void main(String... args) throws IOException { String taskSplitFile = args[0]; Configuration conf = new Configuration(); DataInputStream is = new DataInputStream(new FileInputStream(taskSplitFile)); JobSplit.TaskSplitIndex taskSplitIndex = new JobSplit.TaskSplitIndex(); taskSplitIndex.readFields(is); is.close(); Object split = getSplitDetails(conf, new Path(taskSplitIndex.getSplitLocation()), taskSplitIndex.getStartOffset()); System.out.println("InputSplit instance class = " + split.getClass().getName()); System.out.println("ToString on split = " + split); System.out.println("Reflection fields = " + ToStringBuilder .reflectionToString(split, ToStringStyle.SHORT_PREFIX_STYLE)); }
public void testResourceEstimator() throws Exception { final int maps = 100; final int reduces = 2; final int singleMapOutputSize = 1000; JobConf jc = new JobConf(); JobID jid = new JobID("testJT", 0); jc.setNumMapTasks(maps); jc.setNumReduceTasks(reduces); JobInProgress jip = new JobInProgress(jid, jc, UtilsForTests.getJobTracker()); //unfortunately, we can't set job input size from here. ResourceEstimator re = new ResourceEstimator(jip); for(int i = 0; i < maps; ++i) { if (i < maps / 10) { // re.thresholdToUse is maps / 10 long estOutSize = re.getEstimatedMapOutputSize(); System.out.println(estOutSize); assertEquals(0, estOutSize); } TaskStatus ts = new MapTaskStatus(); ts.setOutputSize(singleMapOutputSize); JobSplit.TaskSplitMetaInfo split = new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0); TaskInProgress tip = new TaskInProgress(jid, "", split, jip.jobtracker, jc, jip, 0, 1); re.updateWithCompletedTask(ts, tip); } assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize()); assertEquals(2* singleMapOutputSize * maps / reduces, re.getEstimatedReduceInputSize()); }
@Override public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize, int ignored) throws IOException { TaskAttemptID attemptId = getTaskAttemptID(true); Task task = new MapTask("", attemptId, 0, new JobSplit.TaskSplitIndex(), 1) { @Override public String toString() { return String.format("%s on %s", getTaskID(), tts.getTrackerName()); } }; taskTrackerManager.update(tts.getTrackerName(), task); runningMapTasks++; return task; }
private TaskInProgress createAndAddTIP(JobTracker jobtracker, JobInProgress jip, TaskType type) { JobConf conf = jip.getJobConf(); JobID id = jip.getJobID(); // now create a fake tip for this fake job TaskInProgress tip = null; if (type == TaskType.MAP) { tip = new TaskInProgress(id, "dummy", JobSplit.EMPTY_TASK_SPLIT, jobtracker, conf, jip, 0, 1); jip.maps = new TaskInProgress[] {tip}; } else if (type == TaskType.REDUCE) { tip = new TaskInProgress(id, "dummy", jip.desiredMaps(), 0, jobtracker, conf, jip, 1); jip.reduces = new TaskInProgress[] {tip}; } else if (type == TaskType.JOB_SETUP) { tip = new TaskInProgress(id, "dummy", JobSplit.EMPTY_TASK_SPLIT, jobtracker, conf, jip, 0, 1); jip.setup = new TaskInProgress[] {tip}; } else if (type == TaskType.JOB_CLEANUP) { tip = new TaskInProgress(id, "dummy", JobSplit.EMPTY_TASK_SPLIT, jobtracker, conf, jip, 0, 1); jip.cleanup = new TaskInProgress[] {tip}; } return tip; }
FakeTaskInProgress(JobID jId, int id, JobConf jobConf, FakeJobInProgress job, String[] inputLocations, JobSplit.TaskSplitMetaInfo split) { super(jId, "", split, job.jobtracker, jobConf, job, id, 1); this.isMap = true; this.fakeJob = job; this.inputLocations = inputLocations; activeTasks = new TreeMap<TaskAttemptID, String>(); taskStatus = TaskStatus.createTaskStatus(isMap); taskStatus.setRunState(TaskStatus.State.UNASSIGNED); }
@SuppressWarnings("unchecked") private static <T extends InputSplit> SplitMetaInfo[] writeNewSplits(Configuration conf, T[] array, FSDataOutputStream out) throws IOException, InterruptedException { SplitMetaInfo[] info = new SplitMetaInfo[array.length]; if(array.length != 0) { SerializationFactory factory = new SerializationFactory(conf); int i = 0; long offset = out.getPos(); for(T split: array) { long prevCount = out.getPos(); Text.writeString(out, split.getClass().getName()); Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass()); serializer.open(out); serializer.serialize(split); long currCount = out.getPos(); String[] locations = split.getLocations(); final int max_loc = conf.getInt(MAX_SPLIT_LOCATIONS, 10); if(locations.length > max_loc) { LOG.warn("Max block location exceeded for split: " + split + " splitsize: " + locations.length + " maxsize: " + max_loc); locations = Arrays.copyOf(locations, max_loc); } info[i++] = new JobSplit.SplitMetaInfo(locations, offset, split.getLength()); offset += currCount - prevCount; } } return info; }
public void testResourceEstimator() throws Exception { final int maps = 100; final int reduces = 2; final int singleMapOutputSize = 1000; JobConf jc = new JobConf(); JobID jid = new JobID("testJT", 0); jc.setNumMapTasks(maps); jc.setNumReduceTasks(reduces); JobInProgress jip = new JobInProgress(jid, jc, UtilsForTests.getJobTracker()); //unfortunately, we can't set job input size from here. ResourceEstimator re = new ResourceEstimator(jip); for(int i = 0; i < maps / 10 ; ++i) { long estOutSize = re.getEstimatedMapOutputSize(); System.out.println(estOutSize); assertEquals(0, estOutSize); TaskStatus ts = new MapTaskStatus(); ts.setOutputSize(singleMapOutputSize); JobSplit.TaskSplitMetaInfo split = new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0); TaskInProgress tip = new TaskInProgress(jid, "", split, jip.jobtracker, jc, jip, 0, 1); re.updateWithCompletedTask(ts, tip); } assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize()); assertEquals(2* singleMapOutputSize * maps / reduces, re.getEstimatedReduceInputSize()); }
FakeTaskInProgress(JobID jId, int id, JobConf jobConf, FakeJobInProgress job, String[] inputLocations, JobSplit.TaskSplitMetaInfo split, JobTracker jt) { super(jId, "", split, jt, jobConf, job, id, 1); this.isMap = true; this.fakeJob = job; this.inputLocations = inputLocations; activeTasks = new TreeMap<TaskAttemptID, String>(); taskStatus = TaskStatus.createTaskStatus(isMap); taskStatus.setRunState(TaskStatus.State.UNASSIGNED); }
HFSPFakeTaskInProgress(JobID jId, JobTracker jobTracker, boolean isMap, int id, JobConf jobConf, HFSPFakeJobInProgress job, String[] inputLocations, JobSplit.TaskSplitMetaInfo split, FakeClock clock) { super(jId, "", split, jobTracker, jobConf, job, id, 1); this.clock = clock; this.isMap = isMap; this.fakeJob = job; this.inputLocations = inputLocations; activeTasks = new TreeMap<TaskAttemptID, String>(); taskStatus = TaskStatus.createTaskStatus(isMap); taskStatus.setRunState(TaskStatus.State.UNASSIGNED); }
synchronized void initSetupCleanupTasks(String jobFile) { if (!jobSetupCleanupNeeded) { LOG.info("Setup/Cleanup not needed for job " + jobId); // nothing to initialize return; } // create cleanup two cleanup tips, one map and one reduce. cleanup = new TaskInProgress[2]; // cleanup map tip. This map doesn't use any splits. Just assign an empty // split. TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT; cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks, 1); cleanup[0].setJobCleanupTask(); // cleanup reduce tip. cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks, jobtracker, conf, this, 1); cleanup[1].setJobCleanupTask(); // create two setup tips, one map and one reduce. setup = new TaskInProgress[2]; // setup map tip. This map doesn't use any split. Just assign an empty // split. setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks + 1, 1); setup[0].setJobSetupTask(); // setup reduce tip. setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks + 1, jobtracker, conf, this, 1); setup[1].setJobSetupTask(); }
/** * Initialize tasks, including setup. */ @Override public synchronized void initTasks() throws IOException { super.initTasks(); JobSplit.TaskSplitMetaInfo emptySplit = new JobSplit.TaskSplitMetaInfo(); setup = new TaskInProgress[2]; setup[0] = new TaskInProgress(getJobID(), "test", emptySplit, jobtracker, getJobConf(), this, numMapTasks + 1, 1); setup[1] = new TaskInProgress(getJobID(), "test", numMapTasks, numReduceTasks + 1, jobtracker, getJobConf(), this, 1); }
/** * Initialize tasks(1 map and 1 reduce task each needs 2 slots, similar to * tasks of a high RAM job). */ @Override public synchronized void initTasks() throws IOException { super.initTasks(); final int numSlotsPerTask = 2; maps = new TaskInProgress[1]; reduces = new TaskInProgress[1]; maps[0] = new FakeTaskInProgress(getJobID(), "test", JobSplit.EMPTY_TASK_SPLIT, jobtracker, getJobConf(), this, 0, numSlotsPerTask); TaskAttemptID attemptId = new TaskAttemptID(maps[0].getTIPId(), 0); // make this task a taskCleanup task of a map task mapCleanupTasks.add(attemptId); TaskStatus stat = new MapTaskStatus(attemptId, 0.01f, 2, TaskStatus.State.FAILED_UNCLEAN, "", "", trackers[0], TaskStatus.Phase.MAP, new Counters()); maps[0].updateStatus(stat); //similarly for reduce task's taskCleanup task reduces[0] = new FakeTaskInProgress(getJobID(), "test", 1, 0, jobtracker, getJobConf(), this, numSlotsPerTask); attemptId = new TaskAttemptID(reduces[0].getTIPId(), 0); // make this task a taskCleanup task of a reduce task reduceCleanupTasks.add(attemptId); stat = new ReduceTaskStatus(attemptId, 0.01f, 2, TaskStatus.State.FAILED_UNCLEAN, "", "", trackers[0], TaskStatus.Phase.REDUCE, new Counters()); reduces[0].updateStatus(stat); }
public void testResourceEstimator() throws Exception { final int maps = 100; final int reduces = 2; final int singleMapOutputSize = 1000; JobConf jc = new JobConf(); JobID jid = new JobID("testJT", 0); jc.setNumMapTasks(maps); jc.setNumReduceTasks(reduces); JobInProgress jip = new JobInProgress(jid, jc, UtilsForTests.getJobTracker()); //unfortunately, we can't set job input size from here. ResourceEstimator re = new ResourceEstimator(jip); for(int i = 0; i < maps / 10 ; ++i) { long estOutSize = re.getEstimatedMapOutputSize(); System.out.println(estOutSize); assertEquals(0, estOutSize); TaskStatus ts = new MapTaskStatus(); ts.setOutputSize(singleMapOutputSize); JobSplit.TaskSplitMetaInfo split = new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0); TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0, 1); re.updateWithCompletedTask(ts, tip); } assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize()); assertEquals(2* singleMapOutputSize * maps / reduces, re.getEstimatedReduceInputSize()); }
@Override public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize, int ignored) throws IOException { TaskAttemptID attemptId = getTaskAttemptID(TaskType.MAP); Task task = new MapTask("", attemptId, 0, new JobSplit.TaskSplitIndex(), 1) { @Override public String toString() { return String.format("%s on %s", getTaskID(), tts.getTrackerName()); } }; taskTrackerManager.update(tts.getTrackerName(), task); runningMapTasks++; return task; }
@Override TaskSplitMetaInfo [] createSplits(org.apache.hadoop.mapreduce.JobID jobId){ TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[numMapTasks]; for (int i = 0; i < numMapTasks; i++) { splits[i] = JobSplit.EMPTY_TASK_SPLIT; } return splits; }
FakeTaskInProgress(JobID jId, int id, JobConf jobConf, FakeJobInProgress job, String[] inputLocations, JobSplit.TaskSplitMetaInfo split) { super(jId, "", split, null, jobConf, job, id, 1); this.isMap = true; this.fakeJob = job; this.inputLocations = inputLocations; activeTasks = new TreeMap<TaskAttemptID, String>(); taskStatus = TaskStatus.createTaskStatus(isMap); taskStatus.setRunState(TaskStatus.State.UNASSIGNED); }