/** * Test case to check task tracker reservation for a job which * has a job blacklisted tracker. * <ol> * <li>Run a job which fails on one of the tracker.</li> * <li>Check if the job succeeds and has no reservation.</li> * </ol> * * @throws Exception */ public void testTrackerReservationWithJobBlackListedTracker() throws Exception { FakeJobInProgress job = TestTaskTrackerBlacklisting.runBlackListingJob( jobTracker, trackers); assertEquals("Job has no blacklisted trackers", 1, job .getBlackListedTrackers().size()); assertTrue("Tracker 1 not blacklisted for the job", job .getBlackListedTrackers().contains( JobInProgress.convertTrackerNameToHostName(trackers[0]))); assertEquals("Job didnt complete successfully complete", job.getStatus() .getRunState(), JobStatus.SUCCEEDED); assertEquals("Reservation for the job not released: Maps", 0, job.getNumReservedTaskTrackersForMaps()); assertEquals("Reservation for the job not released : Reduces", 0, job.getNumReservedTaskTrackersForReduces()); ClusterMetrics metrics = jobTracker.getClusterMetrics(); assertEquals("reserved map slots do not match", 0, metrics.getReservedMapSlots()); assertEquals("reserved reduce slots do not match", 0, metrics.getReservedReduceSlots()); }
void addNewTaskStatus(FakeJobInProgress job, TaskType taskType, boolean useMapSlot, String tracker, List<TaskStatus> reports) throws IOException { TaskAttemptID task = null; TaskStatus status = null; if (taskType == TaskType.MAP) { task = job.findMapTask(tracker); status = new MapTaskStatus(task, 0.01f, 2, TaskStatus.State.RUNNING, "", "", tracker, TaskStatus.Phase.MAP, new Counters()); } else if (taskType == TaskType.TASK_CLEANUP) { if (useMapSlot) { status = job.maps[0].taskStatuses.get( new TaskAttemptID(job.maps[0].getTIPId(), 0)); } else { status = job.reduces[0].taskStatuses.get( new TaskAttemptID(job.reduces[0].getTIPId(), 0)); } } else { task = job.findReduceTask(tracker); status = new ReduceTaskStatus(task, 0.01f, 2, TaskStatus.State.RUNNING, "", "", tracker, TaskStatus.Phase.REDUCE, new Counters()); } reports.add(status); }
/** * Test that a setup task can be run against a map slot * if it is free. * @throws IOException */ public void testSetupTaskReturnedForFreeMapSlots() throws IOException { // create a job with a setup task. FakeJobInProgress job = createJob(TaskType.JOB_SETUP); jobTracker.jobs.put(job.getJobID(), job); // create a status simulating a free tasktracker List<TaskStatus> reports = new ArrayList<TaskStatus>(); TaskTrackerStatus ttStatus = createTaskTrackerStatus(trackers[2], reports); // verify that a setup task can be assigned to a map slot. List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus); assertEquals(1, tasks.size()); assertTrue(tasks.get(0).isJobSetupTask()); assertTrue(tasks.get(0).isMapTask()); jobTracker.jobs.clear(); }
/** * Test to check that map slots are counted when returning * a setup task. * @throws IOException */ public void testMapSlotsCountedForSetup() throws IOException { // create a job with a setup task. FakeJobInProgress job = createJob(TaskType.JOB_SETUP); jobTracker.jobs.put(job.getJobID(), job); // create another job for reservation FakeJobInProgress job1 = createJob(null); jobTracker.jobs.put(job1.getJobID(), job1); // create TT status for testing getSetupAndCleanupTasks List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>(); addNewTaskStatus(job, TaskType.MAP, true, trackers[0], taskStatuses); TaskTrackerStatus ttStatus = createTaskTrackerStatus(trackers[0], taskStatuses); // test that there should be no map setup task returned. List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus); assertEquals(1, tasks.size()); assertTrue(tasks.get(0).isJobSetupTask()); assertFalse(tasks.get(0).isMapTask()); jobTracker.jobs.clear(); }
/** * Test to check that reduce slots are also counted when returning * a setup task. * @throws IOException */ public void testReduceSlotsCountedForSetup() throws IOException { // create a job with a setup task. FakeJobInProgress job = createJob(TaskType.JOB_SETUP); jobTracker.jobs.put(job.getJobID(), job); // create another job for reservation FakeJobInProgress job1 = createJob(null); jobTracker.jobs.put(job1.getJobID(), job1); // create TT status for testing getSetupAndCleanupTasks List<TaskStatus> reports = new ArrayList<TaskStatus>(); // because free map slots are checked first in code, // we fill up map slots also. addNewTaskStatus(job1, TaskType.MAP, true, trackers[1], reports); addNewTaskStatus(job1, TaskType.REDUCE, false,trackers[1], reports); TaskTrackerStatus ttStatus = createTaskTrackerStatus(trackers[1], reports); // test that there should be no setup task returned, // as both map and reduce slots are occupied. List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus); assertNull(tasks); jobTracker.jobs.clear(); }
/** * Test to check that map slots are counted when returning * a taskCleanup task. * @throws IOException */ public void testNumSlotsUsedForTaskCleanup() throws IOException { // Create a high RAM job with a map task's cleanup task and a reduce task's // cleanup task. Make this Fake job a high RAM job by setting the slots // required for map/reduce task to 2. FakeJobInProgress job = createJob(TaskType.TASK_CLEANUP); jobTracker.jobs.put(job.getJobID(), job); // create TT status for testing getSetupAndCleanupTasks List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>(); TaskTrackerStatus ttStatus = createTaskTrackerStatus(trackers[0], taskStatuses);//create dummy status // validate mapTaskCleanup task validateNumSlotsUsedForTaskCleanup(ttStatus); // validate reduceTaskCleanup task validateNumSlotsUsedForTaskCleanup(ttStatus); jobTracker.jobs.clear(); }
public void testTaskToSpeculate() throws IOException { TaskAttemptID[] taskAttemptID = new TaskAttemptID[6]; JobConf conf = new JobConf(); conf.setSpeculativeExecution(true); conf.setNumMapTasks(5); conf.setNumReduceTasks(5); conf.setFloat(JobInProgress.SPECULATIVE_SLOWNODE_THRESHOLD, 100f); conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f); conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0); conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); //schedule maps taskAttemptID[0] = job.findReduceTask(trackers[0]); taskAttemptID[1] = job.findReduceTask(trackers[1]); taskAttemptID[2] = job.findReduceTask(trackers[2]); taskAttemptID[3] = job.findReduceTask(trackers[3]); taskAttemptID[4] = job.findReduceTask(trackers[3]); clock.advance(5000); job.finishTask(taskAttemptID[0]); clock.advance(1000); job.finishTask(taskAttemptID[1]); clock.advance(20000); clock.advanceBySpeculativeLag(); job.refresh(clock.getTime()); //we should get a speculative task now taskAttemptID[5] = job.findReduceTask(trackers[4]); assertEquals(taskAttemptID[5].getTaskID().getId(),2); clock.advance(5000); job.finishTask(taskAttemptID[5]); job.refresh(clock.getTime()); taskAttemptID[5] = job.findReduceTask(trackers[4]); assertEquals(taskAttemptID[5].getTaskID().getId(),3); }
public void testTaskLATEScheduling() throws IOException { TaskAttemptID[] taskAttemptID = new TaskAttemptID[20]; JobConf conf = new JobConf(); conf.setSpeculativeExecution(true); conf.setNumMapTasks(5); conf.setNumReduceTasks(0); conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0); conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0); conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); taskAttemptID[0] = job.findMapTask(trackers[0]); taskAttemptID[1] = job.findMapTask(trackers[1]); taskAttemptID[2] = job.findMapTask(trackers[2]); taskAttemptID[3] = job.findMapTask(trackers[3]); clock.advance(2000); job.finishTask(taskAttemptID[0]); job.finishTask(taskAttemptID[1]); job.finishTask(taskAttemptID[2]); clock.advance(250000); taskAttemptID[4] = job.findMapTask(trackers[3]); clock.advanceBySpeculativeLag(); //by doing the above clock adjustments, we bring the progress rate of //taskID 3 lower than 4. For taskID 3, the rate is 85/317000 //and for taskID 4, the rate is 20/65000. But when we ask for a spec task //now, we should get back taskID 4 (since that is expected to complete //later than taskID 3). job.refresh(clock.getTime()); job.progressMade(taskAttemptID[3], 0.85f); job.progressMade(taskAttemptID[4], 0.20f); taskAttemptID[5] = job.findMapTask(trackers[4]); assertEquals(taskAttemptID[5].getTaskID().getId(),4); }
public void testFastTaskScheduling() throws IOException { TaskAttemptID[] taskAttemptID = new TaskAttemptID[2]; JobConf conf = new JobConf(); conf.setSpeculativeExecution(true); conf.setNumMapTasks(2); conf.setNumReduceTasks(0); conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0); conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0); conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f); conf.setMapSpeculativeDuration(300L); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); // a really fast task #1 taskAttemptID[0] = job.findMapTask(trackers[0]); clock.advance(2000); job.finishTask(taskAttemptID[0]); // task #2 is slow taskAttemptID[1] = job.findMapTask(trackers[1]); clock.advanceBySpeculativeLag(); clock.advance(5000); // 65 secs have elapsed since task scheduling // set progress so that it will complete within // 300 seconds job.progressMade(taskAttemptID[1], 0.7f); // no new map task should be found job.refresh(clock.getTime()); assertEquals(job.findMapTask(trackers[2]), null); }
private int speculativeCap(int totalTasks, int numEarlyComplete, int slots) throws IOException { TaskAttemptID[] taskAttemptID = new TaskAttemptID[1500]; JobConf conf = new JobConf(); conf.setSpeculativeExecution(true); conf.setNumMapTasks(totalTasks); conf.setNumReduceTasks(0); jobTracker.setNumSlots(slots); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); int i; for (i = 0; i < totalTasks; i++) { taskAttemptID[i] = job.findMapTask(trackers[0]); } clock.advance(5000); for (i = 0; i < numEarlyComplete; i++) { job.finishTask(taskAttemptID[i]); } clock.advanceBySpeculativeLag(); for (i = numEarlyComplete; i < totalTasks; i++) { job.progressMade(taskAttemptID[i], 0.85f); } clock.advance(50000); for (i = 0; i < (totalTasks - numEarlyComplete); i++) { job.refresh(clock.getTime()); taskAttemptID[i] = job.findMapTask(trackers[1]); clock.advance(2000); if (taskAttemptID[i] != null) { //add some good progress constantly for the different //task-attempts so that //the tasktracker doesn't get into the slow trackers category job.progressMade(taskAttemptID[i], 0.99f); } else { break; } } return i; }
public void testSlowMapProgressingRate() throws IOException { clock.advance(1000); TaskAttemptID[] taskAttemptID = new TaskAttemptID[6]; JobConf conf = new JobConf(); conf.setSpeculativeExecution(true); conf.setNumMapTasks(3); conf.setNumReduceTasks(0); //use processing rate for speculation conf.setBoolean("mapreduce.job.speculative.using.processing.rate", true); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); //schedule maps taskAttemptID[0] = job.findMapTask(trackers[0]); taskAttemptID[1] = job.findMapTask(trackers[1]); taskAttemptID[2] = job.findMapTask(trackers[2]); clock.advance(1000); job.finishTask(taskAttemptID[0]); //if consider the progress rate, we should speculate task 1 //but if consider the processing rate, which is map_input_bytes/time //then we should speculate task 2 job.processingRate(taskAttemptID[1], Task.Counter.MAP_INPUT_BYTES, 100000000, 0.1f, TaskStatus.Phase.MAP); job.processingRate(taskAttemptID[2], Task.Counter.MAP_INPUT_BYTES, 1000, 0.5f, TaskStatus.Phase.MAP); clock.advanceBySpeculativeLag(); //we should get a speculative task now job.refresh(clock.getTime()); taskAttemptID[3] = job.findMapTask(trackers[0]); assertEquals(taskAttemptID[3].getTaskID().getId(),2); }
public void testLostTracker() throws IOException { // Tracker 0 contacts JT FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]); TaskAttemptID[] tid = new TaskAttemptID[2]; JobConf conf = new JobConf(); conf.setNumMapTasks(1); conf.setNumReduceTasks(1); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); // Tracker 0 gets the map task tid[0] = job.findMapTask(trackers[0]); job.finishTask(tid[0]); // Advance clock. Tracker 0 would have got lost clock.advance(8 * 1000); jobTracker.checkExpiredTrackers(); // Tracker 1 establishes contact with JT FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]); // Tracker1 should get assigned the lost map task tid[1] = job.findMapTask(trackers[1]); assertNotNull("Map Task from Lost Tracker did not get reassigned", tid[1]); assertEquals("Task ID of reassigned map task does not match", tid[0].getTaskID().toString(), tid[1].getTaskID().toString()); job.finishTask(tid[1]); }
public void testTaskToSpeculate() throws IOException { TaskAttemptID[] taskAttemptID = new TaskAttemptID[6]; JobConf conf = new JobConf(); conf.setSpeculativeExecution(true); conf.setNumMapTasks(5); conf.setNumReduceTasks(5); conf.setFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); //schedule maps taskAttemptID[0] = job.findReduceTask(trackers[0]); taskAttemptID[1] = job.findReduceTask(trackers[1]); taskAttemptID[2] = job.findReduceTask(trackers[2]); taskAttemptID[3] = job.findReduceTask(trackers[3]); taskAttemptID[4] = job.findReduceTask(trackers[3]); clock.advance(5000); job.finishTask(taskAttemptID[0]); clock.advance(1000); job.finishTask(taskAttemptID[1]); clock.advance(20000); clock.advanceBySpeculativeLag(); //we should get a speculative task now taskAttemptID[5] = job.findReduceTask(trackers[4]); assertEquals(taskAttemptID[5].getTaskID().getId(),2); clock.advance(5000); job.finishTask(taskAttemptID[5]); taskAttemptID[5] = job.findReduceTask(trackers[4]); assertEquals(taskAttemptID[5].getTaskID().getId(),3); // Verify total speculative tasks by jobtracker instrumentation assertEquals("Total speculative maps", 1, fakeInst.numSpeculativeMaps); assertEquals("Total speculative reduces", 3, fakeInst.numSpeculativeReduces); LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps); LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces); }
public void testTaskLATEScheduling() throws IOException { TaskAttemptID[] taskAttemptID = new TaskAttemptID[20]; JobConf conf = new JobConf(); conf.setSpeculativeExecution(true); conf.setNumMapTasks(5); conf.setNumReduceTasks(0); conf.setFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); taskAttemptID[0] = job.findMapTask(trackers[0]); taskAttemptID[1] = job.findMapTask(trackers[1]); taskAttemptID[2] = job.findMapTask(trackers[2]); taskAttemptID[3] = job.findMapTask(trackers[3]); clock.advance(2000); job.finishTask(taskAttemptID[0]); job.finishTask(taskAttemptID[1]); job.finishTask(taskAttemptID[2]); clock.advance(250000); taskAttemptID[4] = job.findMapTask(trackers[3]); clock.advanceBySpeculativeLag(); //by doing the above clock adjustments, we bring the progress rate of //taskID 3 lower than 4. For taskID 3, the rate is 85/317000 //and for taskID 4, the rate is 20/65000. But when we ask for a spec task //now, we should get back taskID 4 (since that is expected to complete //later than taskID 3). job.progressMade(taskAttemptID[3], 0.85f); job.progressMade(taskAttemptID[4], 0.20f); taskAttemptID[5] = job.findMapTask(trackers[4]); assertEquals(taskAttemptID[5].getTaskID().getId(),4); // Verify total speculative tasks by jobtracker instrumentation assertEquals("Total speculative maps", 2, fakeInst.numSpeculativeMaps); assertEquals("Total speculative reduces", 3, fakeInst.numSpeculativeReduces); LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps); LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces); }
private int speculativeCap(int totalTasks, int numEarlyComplete, int slots) throws IOException { TaskAttemptID[] taskAttemptID = new TaskAttemptID[1500]; JobConf conf = new JobConf(); conf.setSpeculativeExecution(true); conf.setNumMapTasks(totalTasks); conf.setNumReduceTasks(0); jobTracker.setNumSlots(slots); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); int i; for (i = 0; i < totalTasks; i++) { taskAttemptID[i] = job.findMapTask(trackers[0]); } clock.advance(5000); for (i = 0; i < numEarlyComplete; i++) { job.finishTask(taskAttemptID[i]); } clock.advanceBySpeculativeLag(); for (i = numEarlyComplete; i < totalTasks; i++) { job.progressMade(taskAttemptID[i], 0.85f); } clock.advance(50000); for (i = 0; i < (totalTasks - numEarlyComplete); i++) { taskAttemptID[i] = job.findMapTask(trackers[1]); clock.advance(2000); if (taskAttemptID[i] != null) { //add some good progress constantly for the different //task-attempts so that //the tasktracker doesn't get into the slow trackers category job.progressMade(taskAttemptID[i], 0.99f); } else { break; } } return i; }
public void testRunningTaskCountWithSpeculation() throws IOException { TaskAttemptID[] taskAttemptID = new TaskAttemptID[8]; JobConf conf = new JobConf(); conf.setSpeculativeExecution(true); conf.setNumMapTasks(3); conf.setNumReduceTasks(3); conf.setFloat(JobInProgress.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f); conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0); conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); //Check for runningMap counts first //schedule maps taskAttemptID[0] = job.findMapTask(trackers[0]); taskAttemptID[1] = job.findMapTask(trackers[1]); taskAttemptID[2] = job.findMapTask(trackers[2]); clock.advance(5000); job.finishTask(taskAttemptID[0]); clock.advance(1000); job.finishTask(taskAttemptID[1]); clock.advanceBySpeculativeLag(); //we should get a speculative task now job.refresh(clock.getTime()); taskAttemptID[3] = job.findMapTask(trackers[3]); job.refresh(clock.getTime()); int oldRunningMap = job.runningMaps(); LOG.info("No of running maps before fail was " + oldRunningMap); job.failTask(taskAttemptID[2]); job.refresh(clock.getTime()); assertEquals( "Running maps count should be updated from " + oldRunningMap + " to " + (oldRunningMap - 1), job.runningMaps(), oldRunningMap - 1); LOG.info(" Job running maps after fail " + job.runningMaps()); clock.advance(5000); job.finishTask(taskAttemptID[3]); //check for runningReduce count. taskAttemptID[4] = job.findReduceTask(trackers[0]); taskAttemptID[5] = job.findReduceTask(trackers[1]); taskAttemptID[6] = job.findReduceTask(trackers[2]); clock.advance(5000); job.finishTask(taskAttemptID[4]); clock.advance(1000); job.finishTask(taskAttemptID[5]); job.refresh(clock.getTime()); clock.advanceBySpeculativeLag(); taskAttemptID[7] = job.findReduceTask(trackers[4]); job.refresh(clock.getTime()); int oldRunningReduces = job.runningReduces(); job.failTask(taskAttemptID[6]); job.refresh(clock.getTime()); LOG.info( " No of running Reduces before fail " + oldRunningReduces); LOG.info( " No of runing reduces after fail " + job.runningReduces()); assertEquals( "Running reduces count should be updated from " + oldRunningReduces + " to " + (oldRunningReduces - 1), job.runningReduces(), oldRunningReduces - 1); job.finishTask(taskAttemptID[7]); }
public void testIsSlowTracker() throws IOException { TaskAttemptID[] taskAttemptID = new TaskAttemptID[20]; JobConf conf = new JobConf(); conf.setSpeculativeExecution(true); conf.setNumMapTasks(10); conf.setNumReduceTasks(0); conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0); conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); //schedule some tasks taskAttemptID[0] = job.findMapTask(trackers[0]); taskAttemptID[1] = job.findMapTask(trackers[0]); taskAttemptID[2] = job.findMapTask(trackers[0]); taskAttemptID[3] = job.findMapTask(trackers[1]); taskAttemptID[4] = job.findMapTask(trackers[1]); taskAttemptID[5] = job.findMapTask(trackers[1]); taskAttemptID[6] = job.findMapTask(trackers[2]); taskAttemptID[7] = job.findMapTask(trackers[2]); taskAttemptID[8] = job.findMapTask(trackers[2]); clock.advance(1000); //Some tasks finish in 1 second (on trackers[0]) job.finishTask(taskAttemptID[0]); job.finishTask(taskAttemptID[1]); job.finishTask(taskAttemptID[2]); clock.advance(1000); //Some tasks finish in 2 second (on trackers[1]) job.finishTask(taskAttemptID[3]); job.finishTask(taskAttemptID[4]); job.finishTask(taskAttemptID[5]); assertEquals("Tracker "+ trackers[0] + " expected to be not slow ", job.isSlowTracker(trackers[0]), false); clock.advance(100000); //After a long time, some tasks finished on trackers[2] job.finishTask(taskAttemptID[6]); job.finishTask(taskAttemptID[7]); job.finishTask(taskAttemptID[8]); job.refresh(clock.getTime()); assertEquals("Tracker "+ trackers[2] + " expected to be slow ", job.isSlowTracker(trackers[2]), true); }
/** * tests that a task that has a remaining time less than duration * time */ public void testTaskSpeculationStddevCap() throws IOException { TaskAttemptID[] taskAttemptID = new TaskAttemptID[8]; JobConf conf = new JobConf(); conf.setSpeculativeExecution(true); conf.setFloat(JobInProgress.SPECULATIVE_STDDEVMEANRATIO_MAX, 0.33f); conf.setNumMapTasks(7); conf.setNumReduceTasks(0); conf.setFloat(JobInProgress.SPECULATIVE_MAP_UNFINISHED_THRESHOLD_KEY, 0); conf.setFloat(JobInProgress.SPECULATIVE_REDUCE_UNFINISHED_THRESHOLD_KEY, 0); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); // all but one tasks start off taskAttemptID[0] = job.findMapTask(trackers[0]); taskAttemptID[1] = job.findMapTask(trackers[1]); taskAttemptID[2] = job.findMapTask(trackers[2]); taskAttemptID[3] = job.findMapTask(trackers[0]); taskAttemptID[4] = job.findMapTask(trackers[1]); taskAttemptID[5] = job.findMapTask(trackers[2]); // 3 tasks finish really fast in 15s clock.advance (15 * 1000); job.finishTask(taskAttemptID[0]); job.finishTask(taskAttemptID[1]); job.finishTask(taskAttemptID[2]); // advance to 600s and schedule last mapper clock.advance (585 * 1000); taskAttemptID[6] = job.findMapTask(trackers[0]); // advance to 700s and report progress clock.advance (10 * 60 * 1000); // set progress rates job.progressMade(taskAttemptID[3], 0.2f); job.progressMade(taskAttemptID[4], 0.5f); job.progressMade(taskAttemptID[5], 0.6f); job.progressMade(taskAttemptID[6], 0.02f); // the progress has been set in such a way that // stddev > mean. now we depend on stddev capping // for speculation. job.refresh(clock.getTime()); taskAttemptID[7] = job.findMapTask(trackers[1]); // no new map task should be found if(taskAttemptID[7] == null) Assert.fail(); }
public void testSpeculateLastTask() throws Exception { TaskAttemptID[] taskAttemptID = new TaskAttemptID[8]; JobConf conf = new JobConf(); conf.setSpeculativeExecution(true); conf.setNumMapTasks(3); conf.setNumReduceTasks(3); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); taskAttemptID[0] = job.findMapTask(trackers[0]); taskAttemptID[1] = job.findMapTask(trackers[1]); taskAttemptID[2] = job.findMapTask(trackers[2]); clock.advanceBySpeculativeLag(); job.finishTask(taskAttemptID[0]); job.finishTask(taskAttemptID[1]); // Speculative last unfinised task job.refresh(clock.getTime()); taskAttemptID[3] = job.findMapTask(trackers[3]); Assert.assertNotNull(taskAttemptID[3]); job.finishTask(taskAttemptID[2]); job.finishTask(taskAttemptID[3]); taskAttemptID[4] = job.findReduceTask(trackers[0]); taskAttemptID[5] = job.findReduceTask(trackers[1]); taskAttemptID[6] = job.findReduceTask(trackers[2]); clock.advanceBySpeculativeLag(); job.finishTask(taskAttemptID[4]); job.finishTask(taskAttemptID[5]); // Speculative last unfinised task job.refresh(clock.getTime()); taskAttemptID[7] = job.findReduceTask(trackers[3]); Assert.assertNotNull(taskAttemptID[7]); job.finishTask(taskAttemptID[6]); job.finishTask(taskAttemptID[7]); }
public void testSlowReduceProgressingRate() throws IOException { TaskAttemptID[] taskAttemptID = new TaskAttemptID[6]; JobConf conf = new JobConf(); conf.setSpeculativeExecution(true); conf.setNumMapTasks(4); conf.setNumReduceTasks(4); //use processing rate for speculation conf.setBoolean("mapreduce.job.speculative.using.processing.rate", true); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); //schedule reduces taskAttemptID[0] = job.findReduceTask(trackers[0]); taskAttemptID[1] = job.findReduceTask(trackers[1]); taskAttemptID[2] = job.findReduceTask(trackers[2]); taskAttemptID[3] = job.findReduceTask(trackers[3]); clock.advance(1000); //task 0 just starts copying, while task 1, 2, 3 are already in the reducing //phase. If we compared the progress rate, then we should speculate 0. //However, by comparing the processing rate in the copy phase, among all 4 //tasks, task 0 is fast, and we should not speculate it. //for task 1, 2, 3, they are all in the reducing phase, with same progress, //however, task 1 has smaller processing rate(the statistics of the reduce //phase for all the tasks will also include statistics for task 0, whose //processing rate is 0) job.finishCopy(taskAttemptID[1], clock.getTime(), 10000); job.finishCopy(taskAttemptID[2], clock.getTime(), 10000); job.finishCopy(taskAttemptID[3], clock.getTime(), 10000); clock.advance(1000); job.finishSort(taskAttemptID[1], clock.getTime()); job.finishSort(taskAttemptID[2], clock.getTime()); job.finishSort(taskAttemptID[3], clock.getTime()); job.processingRate(taskAttemptID[0], Task.Counter.REDUCE_SHUFFLE_BYTES, 100000000, 0.1f, TaskStatus.Phase.SHUFFLE); job.processingRate(taskAttemptID[1], Task.Counter.REDUCE_INPUT_BYTES, 1000, 0.8f, TaskStatus.Phase.REDUCE); job.processingRate(taskAttemptID[2], Task.Counter.REDUCE_INPUT_BYTES, 100000000, 0.8f, TaskStatus.Phase.REDUCE); job.processingRate(taskAttemptID[3], Task.Counter.REDUCE_INPUT_BYTES, 100000000, 0.8f, TaskStatus.Phase.REDUCE); clock.advanceBySpeculativeLag(); //we should get a speculative task now job.refresh(clock.getTime()); taskAttemptID[4] = job.findReduceTask(trackers[4]); assertEquals(taskAttemptID[4].getTaskID().getId(),1); }
/** * Test case to test if task tracker reservation. * <ol> * <li>Run a cluster with 3 trackers.</li> * <li>Submit a job which reserves all the slots in two * trackers.</li> * <li>Run the job on another tracker which has * no reservations</li> * <li>Finish the job and observe the reservations are * successfully canceled</li> * </ol> * * @throws Exception */ public void testTaskTrackerReservation() throws Exception { JobConf conf = new JobConf(); conf.setNumMapTasks(1); conf.setNumReduceTasks(1); conf.setSpeculativeExecution(false); conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false); //Set task tracker objects for reservation. TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]); TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]); TaskTracker tt3 = jobTracker.getTaskTracker(trackers[2]); TaskTrackerStatus status1 = new TaskTrackerStatus( trackers[0],JobInProgress.convertTrackerNameToHostName( trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2); TaskTrackerStatus status2 = new TaskTrackerStatus( trackers[1],JobInProgress.convertTrackerNameToHostName( trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2); TaskTrackerStatus status3 = new TaskTrackerStatus( trackers[1],JobInProgress.convertTrackerNameToHostName( trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2); tt1.setStatus(status1); tt2.setStatus(status2); tt3.setStatus(status3); FakeJobInProgress fjob = new FakeJobInProgress(conf, jobTracker); fjob.setClusterSize(3); fjob.initTasks(); tt1.reserveSlots(TaskType.MAP, fjob, 2); tt1.reserveSlots(TaskType.REDUCE, fjob, 2); tt3.reserveSlots(TaskType.MAP, fjob, 2); tt3.reserveSlots(TaskType.REDUCE, fjob, 2); assertEquals("Trackers not reserved for the job : maps", 2, fjob.getNumReservedTaskTrackersForMaps()); assertEquals("Trackers not reserved for the job : reduces", 2, fjob.getNumReservedTaskTrackersForReduces()); ClusterMetrics metrics = jobTracker.getClusterMetrics(); assertEquals("reserved map slots do not match", 4, metrics.getReservedMapSlots()); assertEquals("reserved reduce slots do not match", 4, metrics.getReservedReduceSlots()); TaskAttemptID mTid = fjob.findMapTask(trackers[1]); TaskAttemptID rTid = fjob.findReduceTask(trackers[1]); fjob.finishTask(mTid); fjob.finishTask(rTid); assertEquals("Job didnt complete successfully complete", fjob.getStatus() .getRunState(), JobStatus.SUCCEEDED); assertEquals("Reservation for the job not released: Maps", 0, fjob.getNumReservedTaskTrackersForMaps()); assertEquals("Reservation for the job not released : Reduces", 0, fjob.getNumReservedTaskTrackersForReduces()); metrics = jobTracker.getClusterMetrics(); assertEquals("reserved map slots do not match", 0, metrics.getReservedMapSlots()); assertEquals("reserved reduce slots do not match", 0, metrics.getReservedReduceSlots()); }
/** * Test whether the tracker gets blacklisted after its lost. */ public void testLostTrackerBeforeBlacklisting() throws Exception { FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]); TaskAttemptID[] tid = new TaskAttemptID[3]; JobConf conf = new JobConf(); conf.setNumMapTasks(1); conf.setNumReduceTasks(1); conf.set(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, "1"); conf.set(MRJobConfig.SETUP_CLEANUP_NEEDED, "false"); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); job.setClusterSize(4); // Tracker 0 gets the map task tid[0] = job.findMapTask(trackers[0]); job.finishTask(tid[0]); // validate the total tracker count assertEquals("Active tracker count mismatch", 1, jobTracker.getClusterStatus(false).getTaskTrackers()); // lose the tracker clock.advance(1100); jobTracker.checkExpiredTrackers(); assertFalse("Tracker 0 not lost", jobTracker.getClusterStatus(false).getActiveTrackerNames() .contains(trackers[0])); // validate the total tracker count assertEquals("Active tracker count mismatch", 0, jobTracker.getClusterStatus(false).getTaskTrackers()); // Tracker 1 establishes contact with JT FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]); // Tracker1 should get assigned the lost map task tid[1] = job.findMapTask(trackers[1]); assertNotNull("Map Task from Lost Tracker did not get reassigned", tid[1]); assertEquals("Task ID of reassigned map task does not match", tid[0].getTaskID().toString(), tid[1].getTaskID().toString()); // finish the map task job.finishTask(tid[1]); // finish the reduce task tid[2] = job.findReduceTask(trackers[1]); job.finishTask(tid[2]); // check if job is successful assertEquals("Job not successful", JobStatus.SUCCEEDED, job.getStatus().getRunState()); // check if the tracker is lost // validate the total tracker count assertEquals("Active tracker count mismatch", 1, jobTracker.getClusterStatus(false).getTaskTrackers()); // validate blacklisted count .. since we lost one blacklisted tracker assertEquals("Blacklisted tracker count mismatch", 0, jobTracker.getClusterStatus(false).getBlacklistedTrackers()); }
/** * Test whether the tracker gets lost after its blacklisted. */ public void testLostTrackerAfterBlacklisting() throws Exception { FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]); clock.advance(600); TaskAttemptID[] tid = new TaskAttemptID[2]; JobConf conf = new JobConf(); conf.setNumMapTasks(1); conf.setNumReduceTasks(0); conf.set(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, "1"); conf.set(MRJobConfig.SETUP_CLEANUP_NEEDED, "false"); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); job.setClusterSize(4); // check if the tracker count is correct assertEquals("Active tracker count mismatch", 1, jobTracker.taskTrackers().size()); // Tracker 0 gets the map task tid[0] = job.findMapTask(trackers[0]); // Fail the task job.failTask(tid[0]); // Tracker 1 establishes contact with JT FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]); // check if the tracker count is correct assertEquals("Active tracker count mismatch", 2, jobTracker.taskTrackers().size()); // Tracker 1 gets the map task tid[1] = job.findMapTask(trackers[1]); // Finish the task and also the job job.finishTask(tid[1]); // check if job is successful assertEquals("Job not successful", JobStatus.SUCCEEDED, job.getStatus().getRunState()); // check if the trackers 1 got blacklisted assertTrue("Tracker 0 not blacklisted", jobTracker.getBlacklistedTrackers()[0].getTaskTrackerName() .equals(trackers[0])); // check if the tracker count is correct assertEquals("Active tracker count mismatch", 2, jobTracker.taskTrackers().size()); // validate blacklisted count assertEquals("Blacklisted tracker count mismatch", 1, jobTracker.getClusterStatus(false).getBlacklistedTrackers()); // Advance clock. Tracker 0 should be lost clock.advance(500); jobTracker.checkExpiredTrackers(); // check if the task tracker is lost assertFalse("Tracker 0 not lost", jobTracker.getClusterStatus(false).getActiveTrackerNames() .contains(trackers[0])); // check if the lost tracker has removed from the jobtracker assertEquals("Active tracker count mismatch", 1, jobTracker.taskTrackers().size()); // validate blacklisted count assertEquals("Blacklisted tracker count mismatch", 0, jobTracker.getClusterStatus(false).getBlacklistedTrackers()); }
public void testRunningTaskCountWithSpeculation() throws IOException { TaskAttemptID[] taskAttemptID = new TaskAttemptID[8]; JobConf conf = new JobConf(); conf.setSpeculativeExecution(true); conf.setNumMapTasks(3); conf.setNumReduceTasks(3); conf.setFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); //Check for runningMap counts first //schedule maps taskAttemptID[0] = job.findMapTask(trackers[0]); taskAttemptID[1] = job.findMapTask(trackers[1]); taskAttemptID[2] = job.findMapTask(trackers[2]); clock.advance(5000); job.finishTask(taskAttemptID[0]); clock.advance(1000); job.finishTask(taskAttemptID[1]); clock.advanceBySpeculativeLag(); //we should get a speculative task now taskAttemptID[3] = job.findMapTask(trackers[3]); int oldRunningMap = job.runningMaps(); LOG.info("No of running maps before fail was " + oldRunningMap); job.failTask(taskAttemptID[2]); assertEquals( "Running maps count should be updated from " + oldRunningMap + " to " + (oldRunningMap - 1), job.runningMaps(), oldRunningMap - 1); LOG.info(" Job running maps after fail " + job.runningMaps()); clock.advance(5000); job.finishTask(taskAttemptID[3]); //check for runningReduce count. taskAttemptID[4] = job.findReduceTask(trackers[0]); taskAttemptID[5] = job.findReduceTask(trackers[1]); taskAttemptID[6] = job.findReduceTask(trackers[2]); clock.advance(5000); job.finishTask(taskAttemptID[4]); clock.advance(1000); job.finishTask(taskAttemptID[5]); clock.advanceBySpeculativeLag(); taskAttemptID[7] = job.findReduceTask(trackers[4]); int oldRunningReduces = job.runningReduces(); job.failTask(taskAttemptID[6]); LOG.info( " No of running Reduces before fail " + oldRunningReduces); LOG.info( " No of runing reduces after fail " + job.runningReduces()); assertEquals( "Running reduces count should be updated from " + oldRunningReduces + " to " + (oldRunningReduces - 1), job.runningReduces(), oldRunningReduces - 1); // Verify total speculative tasks by jobtracker instrumentation assertEquals("Total speculative maps", 1, fakeInst.numSpeculativeMaps); assertEquals("Total speculative reduces", 1, fakeInst.numSpeculativeReduces); LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps); LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces); job.finishTask(taskAttemptID[7]); }
public void testIsSlowTracker() throws IOException { TaskAttemptID[] taskAttemptID = new TaskAttemptID[20]; JobConf conf = new JobConf(); conf.setSpeculativeExecution(true); conf.setNumMapTasks(10); conf.setNumReduceTasks(0); FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker); job.initTasks(); //schedule some tasks taskAttemptID[0] = job.findMapTask(trackers[0]); taskAttemptID[1] = job.findMapTask(trackers[0]); taskAttemptID[2] = job.findMapTask(trackers[0]); taskAttemptID[3] = job.findMapTask(trackers[1]); taskAttemptID[4] = job.findMapTask(trackers[1]); taskAttemptID[5] = job.findMapTask(trackers[1]); taskAttemptID[6] = job.findMapTask(trackers[2]); taskAttemptID[7] = job.findMapTask(trackers[2]); taskAttemptID[8] = job.findMapTask(trackers[2]); clock.advance(1000); //Some tasks finish in 1 second (on trackers[0]) job.finishTask(taskAttemptID[0]); job.finishTask(taskAttemptID[1]); job.finishTask(taskAttemptID[2]); clock.advance(1000); //Some tasks finish in 2 second (on trackers[1]) job.finishTask(taskAttemptID[3]); job.finishTask(taskAttemptID[4]); job.finishTask(taskAttemptID[5]); assertEquals("Tracker "+ trackers[0] + " expected to be not slow ", job.isSlowTracker(trackers[0]), false); clock.advance(100000); //After a long time, some tasks finished on trackers[2] job.finishTask(taskAttemptID[6]); job.finishTask(taskAttemptID[7]); job.finishTask(taskAttemptID[8]); assertEquals("Tracker "+ trackers[2] + " expected to be slow ", job.isSlowTracker(trackers[2]), true); // Verify total speculative tasks by jobtracker instrumentation assertEquals("Total speculative maps", 1, fakeInst.numSpeculativeMaps); assertEquals("Total speculative reduces", 1, fakeInst.numSpeculativeReduces); LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps); LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces); }
public void testReservedSlots() throws Exception { Configuration conf = mr.createJobConf(); conf.setInt(JobContext.NUM_MAPS, 1); Job job = Job.getInstance(cluster, conf); job.setNumReduceTasks(1); job.setSpeculativeExecution(false); job.setJobSetupCleanupNeeded(false); //Set task tracker objects for reservation. TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]); TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]); TaskTrackerStatus status1 = new TaskTrackerStatus( trackers[0],JobInProgress.convertTrackerNameToHostName( trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2); TaskTrackerStatus status2 = new TaskTrackerStatus( trackers[1],JobInProgress.convertTrackerNameToHostName( trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2); tt1.setStatus(status1); tt2.setStatus(status2); fakeJob = new FakeJobInProgress(new JobConf(job.getConfiguration()), jobTracker); fakeJob.setClusterSize(3); fakeJob.initTasks(); FakeObjectUtilities.sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId); FakeObjectUtilities.sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId); responseId++; ClusterMetrics metrics = cluster.getClusterStatus(); assertEquals("reserved map slots do not match", 2, metrics.getReservedMapSlots()); assertEquals("reserved reduce slots do not match", 2, metrics.getReservedReduceSlots()); // redo to test re-reservations. FakeObjectUtilities.sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId); FakeObjectUtilities.sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId); responseId++; metrics = cluster.getClusterStatus(); assertEquals("reserved map slots do not match", 4, metrics.getReservedMapSlots()); assertEquals("reserved reduce slots do not match", 4, metrics.getReservedReduceSlots()); TaskAttemptID mTid = fakeJob.findMapTask(trackers[1]); TaskAttemptID rTid = fakeJob.findReduceTask(trackers[1]); fakeJob.finishTask(mTid); fakeJob.finishTask(rTid); assertEquals("Job didnt complete successfully complete", fakeJob.getStatus().getRunState(), JobStatus.SUCCEEDED); metrics = cluster.getClusterStatus(); assertEquals("reserved map slots do not match", 0, metrics.getReservedMapSlots()); assertEquals("reserved reduce slots do not match", 0, metrics.getReservedReduceSlots()); }