private static void completeJobTasks(JobImpl job) { // complete the map tasks and the reduce tasks so we start committing int numMaps = job.getTotalMaps(); for (int i = 0; i < numMaps; ++i) { job.handle(new JobTaskEvent( MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP), TaskState.SUCCEEDED)); Assert.assertEquals(JobState.RUNNING, job.getState()); } int numReduces = job.getTotalReduces(); for (int i = 0; i < numReduces; ++i) { job.handle(new JobTaskEvent( MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP), TaskState.SUCCEEDED)); Assert.assertEquals(JobState.RUNNING, job.getState()); } }
private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, int memory, String[] hosts, boolean earlierFailedAttempt, boolean reduce) { TaskId taskId; if (reduce) { taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); } else { taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); } TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId); Resource containerNeed = Resource.newInstance(memory, 1); if (earlierFailedAttempt) { return ContainerRequestEvent .createContainerRequestEventForFailedContainer(attemptId, containerNeed); } return new ContainerRequestEvent(attemptId, containerNeed, hosts, new String[] { NetworkTopology.DEFAULT_RACK }); }
private static TaskAttemptCompletionEvent createTce(int eventId, boolean isMap, TaskAttemptCompletionEventStatus status) { JobId jid = MRBuilderUtils.newJobId(12345, 1, 1); TaskId tid = MRBuilderUtils.newTaskId(jid, 0, isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP : org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0); RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); TaskAttemptCompletionEvent tce = recordFactory .newRecordInstance(TaskAttemptCompletionEvent.class); tce.setEventId(eventId); tce.setAttemptId(attemptId); tce.setStatus(status); return tce; }
@Override public JobReport getReport() { readLock.lock(); try { JobState state = getState(); // jobFile can be null if the job is not yet inited. String jobFile = remoteJobConfFile == null ? "" : remoteJobConfFile.toString(); StringBuilder diagsb = new StringBuilder(); for (String s : getDiagnostics()) { diagsb.append(s).append("\n"); } if (getInternalState() == JobStateInternal.NEW) { return MRBuilderUtils.newJobReport(jobId, jobName, username, state, appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f, cleanupProgress, jobFile, amInfos, isUber, diagsb.toString()); } computeProgress(); JobReport report = MRBuilderUtils.newJobReport(jobId, jobName, username, state, appSubmitTime, startTime, finishTime, setupProgress, this.mapProgress, this.reduceProgress, cleanupProgress, jobFile, amInfos, isUber, diagsb.toString()); return report; } finally { readLock.unlock(); } }
@Test public void testRenameMapOutputForReduce() throws Exception { final JobConf conf = new JobConf(); final MROutputFiles mrOutputFiles = new MROutputFiles(); mrOutputFiles.setConf(conf); // make sure both dirs are distinct // conf.set(MRConfig.LOCAL_DIR, localDirs[0].toString()); final Path mapOut = mrOutputFiles.getOutputFileForWrite(1); conf.set(MRConfig.LOCAL_DIR, localDirs[1].toString()); final Path mapOutIdx = mrOutputFiles.getOutputIndexFileForWrite(1); Assert.assertNotEquals("Paths must be different!", mapOut.getParent(), mapOutIdx.getParent()); // make both dirs part of LOCAL_DIR conf.setStrings(MRConfig.LOCAL_DIR, localDirs); final FileContext lfc = FileContext.getLocalFSFileContext(conf); lfc.create(mapOut, EnumSet.of(CREATE)).close(); lfc.create(mapOutIdx, EnumSet.of(CREATE)).close(); final JobId jobId = MRBuilderUtils.newJobId(12345L, 1, 2); final TaskId tid = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 0); LocalContainerLauncher.renameMapOutputForReduce(conf, taid, mrOutputFiles); }
private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) { ApplicationId appId = ApplicationId.newInstance(clusterTimestamp, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1); int partitions = 2; Path remoteJobConfFile = mock(Path.class); JobConf conf = new JobConf(); TaskAttemptListener taskAttemptListener = mock(TaskAttemptListener.class); Token<JobTokenIdentifier> jobToken = (Token<JobTokenIdentifier>) mock(Token.class); Credentials credentials = null; Clock clock = new SystemClock(); int appAttemptId = 3; MRAppMetrics metrics = mock(MRAppMetrics.class); Resource minContainerRequirements = mock(Resource.class); when(minContainerRequirements.getMemory()).thenReturn(1000); ClusterInfo clusterInfo = mock(ClusterInfo.class); AppContext appContext = mock(AppContext.class); when(appContext.getClusterInfo()).thenReturn(clusterInfo); TaskSplitMetaInfo taskSplitMetaInfo = mock(TaskSplitMetaInfo.class); MapTaskImpl mapTask = new MapTaskImpl(jobId, partitions, eh, remoteJobConfFile, conf, taskSplitMetaInfo, taskAttemptListener, jobToken, credentials, clock, appAttemptId, metrics, appContext); return mapTask; }
private TaskAttemptImpl createMapTaskAttemptImplForTest( EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) { ApplicationId appId = ApplicationId.newInstance(1, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptListener taListener = mock(TaskAttemptListener.class); Path jobFile = mock(Path.class); JobConf jobConf = new JobConf(); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, taskSplitMetaInfo, jobConf, taListener, null, null, clock, null); return taImpl; }
@Test public void testAbortJobCalledAfterKillingTasks() throws IOException { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000"); InlineDispatcher dispatcher = new InlineDispatcher(); dispatcher.init(conf); dispatcher.start(); OutputCommitter committer = Mockito.mock(OutputCommitter.class); CommitterEventHandler commitHandler = createCommitterEventHandler(dispatcher, committer); commitHandler.init(conf); commitHandler.start(); JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null); //Fail one task. This should land the JobImpl in the FAIL_WAIT state job.handle(new JobTaskEvent( MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP), TaskState.FAILED)); //Verify abort job hasn't been called Mockito.verify(committer, Mockito.never()) .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); assertJobState(job, JobStateInternal.FAIL_WAIT); //Verify abortJob is called once and the job failed Mockito.verify(committer, Mockito.timeout(2000).times(1)) .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); assertJobState(job, JobStateInternal.FAILED); dispatcher.stop(); }
private static AMInfo createAMInfo(int attempt) { ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( ApplicationId.newInstance(100, 1), attempt); ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); return MRBuilderUtils.newAMInfo(appAttemptId, System.currentTimeMillis(), containerId, NM_HOST, NM_PORT, NM_HTTP_PORT); }
public static TaskAttemptId makeTaskAttemptId(long ts, int appId, int taskId, TaskType taskType, int id) { ApplicationId aID = ApplicationId.newInstance(ts, appId); JobId jID = MRBuilderUtils.newJobId(aID, id); TaskId tID = MRBuilderUtils.newTaskId(jID, taskId, taskType); return MRBuilderUtils.newTaskAttemptId(tID, id); }
@SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testTimeout() throws InterruptedException { EventHandler mockHandler = mock(EventHandler.class); Clock clock = new SystemClock(); TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1); Configuration conf = new Configuration(); conf.setInt(MRJobConfig.TASK_TIMEOUT, 10); //10 ms conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms hb.init(conf); hb.start(); try { ApplicationId appId = ApplicationId.newInstance(0l, 5); JobId jobId = MRBuilderUtils.newJobId(appId, 4); TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP); TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2); hb.register(taid); Thread.sleep(100); //Events only happen when the task is canceled verify(mockHandler, times(2)).handle(any(Event.class)); } finally { hb.stop(); } }
private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId, String host, boolean reduce) { TaskId taskId; if (reduce) { taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); } else { taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); } TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId); return new ContainerFailedEvent(attemptId, host); }
private ContainerAllocatorEvent createDeallocateEvent(JobId jobId, int taskAttemptId, boolean reduce) { TaskId taskId; if (reduce) { taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); } else { taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); } TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId); return new ContainerAllocatorEvent(attemptId, ContainerAllocator.EventType.CONTAINER_DEALLOCATE); }
@Override public List<AMInfo> getAMInfos() { List<AMInfo> amInfos = new LinkedList<AMInfo>(); for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo .getAMInfos()) { AMInfo amInfo = MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(), jhAmInfo.getStartTime(), jhAmInfo.getContainerId(), jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(), jhAmInfo.getNodeManagerHttpPort()); amInfos.add(amInfo); } return amInfos; }
@Test(timeout = 10000) public void testAverageMergeTime() throws IOException { String historyFileName = "job_1329348432655_0001-1329348443227-user-Sleep+job-1329348468601-10-1-SUCCEEDED-default.jhist"; String confFileName = "job_1329348432655_0001_conf.xml"; Configuration conf = new Configuration(); JobACLsManager jobAclsMgr = new JobACLsManager(conf); Path fulleHistoryPath = new Path(TestJobHistoryEntities.class.getClassLoader() .getResource(historyFileName) .getFile()); Path fullConfPath = new Path(TestJobHistoryEntities.class.getClassLoader() .getResource(confFileName) .getFile()); HistoryFileInfo info = mock(HistoryFileInfo.class); when(info.getConfFile()).thenReturn(fullConfPath); JobId jobId = MRBuilderUtils.newJobId(1329348432655l, 1, 1); CompletedJob completedJob = new CompletedJob(conf, jobId, fulleHistoryPath, true, "user", info, jobAclsMgr); JobInfo jobInfo = new JobInfo(completedJob); // There are 2 tasks with merge time of 45 and 55 respectively. So average // merge time should be 50. Assert.assertEquals(50L, jobInfo.getAvgMergeTime().longValue()); }
/** * Trivial test case that verifies basic functionality of {@link * JobIdHistoryFileInfoMap} */ @Test(timeout = 2000) public void testWithSingleElement() throws InterruptedException { JobIdHistoryFileInfoMap mapWithSize = new JobIdHistoryFileInfoMap(); JobId jobId = MRBuilderUtils.newJobId(1, 1, 1); HistoryFileInfo fileInfo1 = Mockito.mock(HistoryFileInfo.class); Mockito.when(fileInfo1.getJobId()).thenReturn(jobId); // add it twice assertEquals("Incorrect return on putIfAbsent()", null, mapWithSize.putIfAbsent(jobId, fileInfo1)); assertEquals("Incorrect return on putIfAbsent()", fileInfo1, mapWithSize.putIfAbsent(jobId, fileInfo1)); // check get() assertEquals("Incorrect get()", fileInfo1, mapWithSize.get(jobId)); assertTrue("Incorrect size()", checkSize(mapWithSize, 1)); // check navigableKeySet() NavigableSet<JobId> set = mapWithSize.navigableKeySet(); assertEquals("Incorrect navigableKeySet()", 1, set.size()); assertTrue("Incorrect navigableKeySet()", set.contains(jobId)); // check values() Collection<HistoryFileInfo> values = mapWithSize.values(); assertEquals("Incorrect values()", 1, values.size()); assertTrue("Incorrect values()", values.contains(fileInfo1)); }
@Test (timeout = 1000) public void testAddExisting() { JobListCache cache = new JobListCache(2, 1000); JobId jobId = MRBuilderUtils.newJobId(1, 1, 1); HistoryFileInfo fileInfo = Mockito.mock(HistoryFileInfo.class); Mockito.when(fileInfo.getJobId()).thenReturn(jobId); cache.addIfAbsent(fileInfo); cache.addIfAbsent(fileInfo); assertEquals("Incorrect number of cache entries", 1, cache.values().size()); }
@Test (timeout = 1000) public void testEviction() throws InterruptedException { int maxSize = 2; JobListCache cache = new JobListCache(maxSize, 1000); JobId jobId1 = MRBuilderUtils.newJobId(1, 1, 1); HistoryFileInfo fileInfo1 = Mockito.mock(HistoryFileInfo.class); Mockito.when(fileInfo1.getJobId()).thenReturn(jobId1); JobId jobId2 = MRBuilderUtils.newJobId(2, 2, 2); HistoryFileInfo fileInfo2 = Mockito.mock(HistoryFileInfo.class); Mockito.when(fileInfo2.getJobId()).thenReturn(jobId2); JobId jobId3 = MRBuilderUtils.newJobId(3, 3, 3); HistoryFileInfo fileInfo3 = Mockito.mock(HistoryFileInfo.class); Mockito.when(fileInfo3.getJobId()).thenReturn(jobId3); cache.addIfAbsent(fileInfo1); cache.addIfAbsent(fileInfo2); cache.addIfAbsent(fileInfo3); Collection <HistoryFileInfo> values; for (int i = 0; i < 9; i++) { values = cache.values(); if (values.size() > maxSize) { Thread.sleep(100); } else { assertFalse("fileInfo1 should have been evicted", values.contains(fileInfo1)); return; } } fail("JobListCache didn't delete the extra entry"); }
@Test (timeout=10000) public void testCompletedTask() throws Exception { HistoryFileInfo info = mock(HistoryFileInfo.class); when(info.getConfFile()).thenReturn(fullConfPath); completedJob = new CompletedJob(conf, jobId, fullHistoryPath, loadTasks, "user", info, jobAclsManager); TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); Map<TaskId, Task> mapTasks = completedJob.getTasks(TaskType.MAP); Map<TaskId, Task> reduceTasks = completedJob.getTasks(TaskType.REDUCE); assertEquals(10, mapTasks.size()); assertEquals(2, reduceTasks.size()); Task mt1 = mapTasks.get(mt1Id); assertEquals(1, mt1.getAttempts().size()); assertEquals(TaskState.SUCCEEDED, mt1.getState()); TaskReport mt1Report = mt1.getReport(); assertEquals(TaskState.SUCCEEDED, mt1Report.getTaskState()); assertEquals(mt1Id, mt1Report.getTaskId()); Task rt1 = reduceTasks.get(rt1Id); assertEquals(1, rt1.getAttempts().size()); assertEquals(TaskState.SUCCEEDED, rt1.getState()); TaskReport rt1Report = rt1.getReport(); assertEquals(TaskState.SUCCEEDED, rt1Report.getTaskState()); assertEquals(rt1Id, rt1Report.getTaskId()); }
@Test (timeout=10000) public void testCompletedTaskAttempt() throws Exception { HistoryFileInfo info = mock(HistoryFileInfo.class); when(info.getConfFile()).thenReturn(fullConfPath); completedJob = new CompletedJob(conf, jobId, fullHistoryPath, loadTasks, "user", info, jobAclsManager); TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); TaskAttemptId mta1Id = MRBuilderUtils.newTaskAttemptId(mt1Id, 0); TaskAttemptId rta1Id = MRBuilderUtils.newTaskAttemptId(rt1Id, 0); Task mt1 = completedJob.getTask(mt1Id); Task rt1 = completedJob.getTask(rt1Id); TaskAttempt mta1 = mt1.getAttempt(mta1Id); assertEquals(TaskAttemptState.SUCCEEDED, mta1.getState()); assertEquals("localhost:45454", mta1.getAssignedContainerMgrAddress()); assertEquals("localhost:9999", mta1.getNodeHttpAddress()); TaskAttemptReport mta1Report = mta1.getReport(); assertEquals(TaskAttemptState.SUCCEEDED, mta1Report.getTaskAttemptState()); assertEquals("localhost", mta1Report.getNodeManagerHost()); assertEquals(45454, mta1Report.getNodeManagerPort()); assertEquals(9999, mta1Report.getNodeManagerHttpPort()); TaskAttempt rta1 = rt1.getAttempt(rta1Id); assertEquals(TaskAttemptState.SUCCEEDED, rta1.getState()); assertEquals("localhost:45454", rta1.getAssignedContainerMgrAddress()); assertEquals("localhost:9999", rta1.getNodeHttpAddress()); TaskAttemptReport rta1Report = rta1.getReport(); assertEquals(TaskAttemptState.SUCCEEDED, rta1Report.getTaskAttemptState()); assertEquals("localhost", rta1Report.getNodeManagerHost()); assertEquals(45454, rta1Report.getNodeManagerPort()); assertEquals(9999, rta1Report.getNodeManagerHttpPort()); }
@Override public JobReport getReport() { readLock.lock(); try { JobState state = getState(); // jobFile can be null if the job is not yet inited. String jobFile = remoteJobConfFile == null ? "" : remoteJobConfFile.toString(); StringBuilder diagsb = new StringBuilder(); for (String s : getDiagnostics()) { diagsb.append(s).append("\n"); } if (getInternalState() == JobStateInternal.NEW) { return MRBuilderUtils.newJobReport(jobId, jobName, reporterUserName, state, appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f, cleanupProgress, jobFile, amInfos, isUber, diagsb.toString()); } computeProgress(); JobReport report = MRBuilderUtils.newJobReport(jobId, jobName, reporterUserName, state, appSubmitTime, startTime, finishTime, setupProgress, this.mapProgress, this.reduceProgress, cleanupProgress, jobFile, amInfos, isUber, diagsb.toString(), jobPriority); return report; } finally { readLock.unlock(); } }
@Before @SuppressWarnings("rawtypes") // mocked generics public void setup() { ApplicationId appId = ApplicationId.newInstance(200, 1); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); jid = MRBuilderUtils.newJobId(appId, 1); mActxt = mock(RunningAppContext.class); EventHandler ea = mock(EventHandler.class); when(mActxt.getEventHandler()).thenReturn(ea); for (int i = 0; i < 40; ++i) { ContainerId cId = ContainerId.newContainerId(appAttemptId, i); if (0 == i % 7) { preemptedContainers.add(cId); } TaskId tId = 0 == i % 2 ? MRBuilderUtils.newTaskId(jid, i / 2, TaskType.MAP) : MRBuilderUtils.newTaskId(jid, i / 2 + 1, TaskType.REDUCE); assignedContainers.put(cId, MRBuilderUtils.newTaskAttemptId(tId, 0)); contToResourceMap.put(cId, Resource.newInstance(2 * minAlloc, 2)); } for (Map.Entry<ContainerId,TaskAttemptId> ent : assignedContainers.entrySet()) { System.out.println("cont:" + ent.getKey().getContainerId() + " type:" + ent.getValue().getTaskId().getTaskType() + " res:" + contToResourceMap.get(ent.getKey()).getMemory() + "MB" ); } }
@Test(expected = RMContainerAllocationException.class) public void testAttemptNotFoundCausesRMCommunicatorException() throws Exception { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); dispatcher.await(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); dispatcher.await(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); dispatcher.await(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); // Now kill the application rm.killApp(app.getApplicationId()); rm.waitForState(app.getApplicationId(), RMAppState.KILLED); allocator.schedule(); }