@Override public void handle(ContainerLauncherEvent event) { switch (event.getType()) { case CONTAINER_REMOTE_LAUNCH: getContext().getEventHandler().handle( new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(), shufflePort)); attemptLaunched(event.getTaskAttemptID()); break; case CONTAINER_REMOTE_CLEANUP: getContext().getEventHandler().handle( new TaskAttemptEvent(event.getTaskAttemptID(), TaskAttemptEventType.TA_CONTAINER_CLEANED)); break; } }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent evnt) { TaskAttemptContainerLaunchedEvent event = (TaskAttemptContainerLaunchedEvent) evnt; //set the launch time taskAttempt.launchTime = taskAttempt.clock.getTime(); taskAttempt.shufflePort = event.getShufflePort(); // register it to TaskAttemptListener so that it can start monitoring it. taskAttempt.taskAttemptListener .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID); //TODO Resolve to host / IP in case of a local address. InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr? NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress()); taskAttempt.trackerName = nodeHttpInetAddr.getHostName(); taskAttempt.httpPort = nodeHttpInetAddr.getPort(); taskAttempt.sendLaunchedEvents(); taskAttempt.eventHandler.handle (new SpeculatorEvent (taskAttempt.attemptId, true, taskAttempt.clock.getTime())); //make remoteTask reference as null as it is no more needed //and free up the memory taskAttempt.remoteTask = null; //tell the Task that attempt has started taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_LAUNCHED)); }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent evnt) { TaskAttemptContainerLaunchedEvent event = (TaskAttemptContainerLaunchedEvent) evnt; //set the launch time taskAttempt.launchTime = taskAttempt.clock.getTime(); taskAttempt.shufflePort = event.getShufflePort(); // register it to TaskAttemptListener so that it can start monitoring it. taskAttempt.taskAttemptListener .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID); //TODO Resolve to host / IP in case of a local address. InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr? NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress()); taskAttempt.trackerName = nodeHttpInetAddr.getHostName(); taskAttempt.httpPort = nodeHttpInetAddr.getPort(); taskAttempt.sendLaunchedEvents(); taskAttempt.eventHandler.handle (new SpeculatorEvent (taskAttempt.attemptId, true, taskAttempt.clock.getTime())); //make remoteTask reference as null as it is no more needed //and free up the memory taskAttempt.remoteTask = null; LOG.info("recieve task launched info from:"+taskAttempt.getID().toString()); //tell the Task that attempt has started taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_LAUNCHED)); }
@SuppressWarnings("unchecked") public synchronized void launch(ContainerRemoteLaunchEvent event) { LOG.info("Launching " + taskAttemptID); if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) { state = ContainerState.DONE; sendContainerLaunchFailedMsg(taskAttemptID, "Container was killed before it was launched"); return; } ContainerManagementProtocolProxyData proxy = null; try { proxy = getCMProxy(containerMgrAddress, containerID); // Construct the actual Container ContainerLaunchContext containerLaunchContext = event.getContainerLaunchContext(); // Now launch the actual container StartContainerRequest startRequest = StartContainerRequest.newInstance(containerLaunchContext, event.getContainerToken()); List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); list.add(startRequest); StartContainersRequest requestList = StartContainersRequest.newInstance(list); StartContainersResponse response = proxy.getContainerManagementProtocol().startContainers(requestList); if (response.getFailedRequests() != null && response.getFailedRequests().containsKey(containerID)) { throw response.getFailedRequests().get(containerID).deSerialize(); } ByteBuffer portInfo = response.getAllServicesMetaData().get( ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID); int port = -1; if(portInfo != null) { port = ShuffleHandler.deserializeMetaData(portInfo); } LOG.info("Shuffle port returned by ContainerManager for " + taskAttemptID + " : " + port); if(port < 0) { this.state = ContainerState.FAILED; throw new IllegalStateException("Invalid shuffle port number " + port + " returned for " + taskAttemptID); } // after launching, send launched event to task attempt to move // it from ASSIGNED to RUNNING state context.getEventHandler().handle( new TaskAttemptContainerLaunchedEvent(taskAttemptID, port)); this.state = ContainerState.RUNNING; } catch (Throwable t) { String message = "Container launch failed for " + containerID + " : " + StringUtils.stringifyException(t); this.state = ContainerState.FAILED; sendContainerLaunchFailedMsg(taskAttemptID, message); } finally { if (proxy != null) { cmProxy.mayBeCloseProxy(proxy); } } }
@Test public void testContainerCleanedWhileRunning() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); Path jobFile = mock(Path.class); MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), new SystemClock(), appCtx); NodeId nid = NodeId.newInstance("127.0.0.2", 0); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE)); taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, mock(Map.class))); taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); assertEquals("Task attempt is not in running state", taImpl.getState(), TaskAttemptState.RUNNING); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_CLEANED)); assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", eventHandler.internalError); assertEquals("Task attempt is not assigned on the local rack", Locality.RACK_LOCAL, taImpl.getLocality()); }
@Test public void testContainerCleanedWhileCommitting() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); Path jobFile = mock(Path.class); MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); when(splits.getLocations()).thenReturn(new String[] {}); AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), new SystemClock(), appCtx); NodeId nid = NodeId.newInstance("127.0.0.1", 0); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE)); taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, mock(Map.class))); taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_COMMIT_PENDING)); assertEquals("Task attempt is not in commit pending state", taImpl.getState(), TaskAttemptState.COMMIT_PENDING); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_CLEANED)); assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", eventHandler.internalError); assertEquals("Task attempt is assigned locally", Locality.OFF_SWITCH, taImpl.getLocality()); }
@Test public void testDoubleTooManyFetchFailure() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); Path jobFile = mock(Path.class); MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), new SystemClock(), appCtx); NodeId nid = NodeId.newInstance("127.0.0.1", 0); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE)); taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, mock(Map.class))); taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE)); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_CLEANED)); assertEquals("Task attempt is not in succeeded state", taImpl.getState(), TaskAttemptState.SUCCEEDED); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); assertEquals("Task attempt is not in FAILED state", taImpl.getState(), TaskAttemptState.FAILED); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); assertEquals("Task attempt is not in FAILED state, still", taImpl.getState(), TaskAttemptState.FAILED); assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", eventHandler.internalError); }
@Test public void testTooManyFetchFailureAfterKill() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); Path jobFile = mock(Path.class); MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, mock(Token.class), new Credentials(), new SystemClock(), appCtx); NodeId nid = NodeId.newInstance("127.0.0.1", 0); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE)); taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, mock(Map.class))); taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE)); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_CLEANED)); assertEquals("Task attempt is not in succeeded state", taImpl.getState(), TaskAttemptState.SUCCEEDED); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_KILL)); assertEquals("Task attempt is not in KILLED state", taImpl.getState(), TaskAttemptState.KILLED); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); assertEquals("Task attempt is not in KILLED state, still", taImpl.getState(), TaskAttemptState.KILLED); assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", eventHandler.internalError); }
@Test public void testFetchFailureAttemptFinishTime() throws Exception{ ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); Path jobFile = mock(Path.class); MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn( new InetSocketAddress("localhost", 0)); JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener,mock(Token.class), new Credentials(), new SystemClock(), appCtx); NodeId nid = NodeId.newInstance("127.0.0.1", 0); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE)); taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, mock(Map.class))); taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE)); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_CLEANED)); assertEquals("Task attempt is not in succeeded state", taImpl.getState(), TaskAttemptState.SUCCEEDED); assertTrue("Task Attempt finish time is not greater than 0", taImpl.getFinishTime() > 0); Long finishTime = taImpl.getFinishTime(); Thread.sleep(5); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); assertEquals("Task attempt is not in Too Many Fetch Failure state", taImpl.getState(), TaskAttemptState.FAILED); assertEquals("After TA_TOO_MANY_FETCH_FAILURE," + " Task attempt finish time is not the same ", finishTime, Long.valueOf(taImpl.getFinishTime())); }
@Test public void testContainerKillWhileRunning() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); Path jobFile = mock(Path.class); MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn( new InetSocketAddress("localhost", 0)); JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" }); AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), new SystemClock(), appCtx); NodeId nid = NodeId.newInstance("127.0.0.2", 0); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE)); taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, mock(Map.class))); taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); assertEquals("Task attempt is not in running state", taImpl.getState(), TaskAttemptState.RUNNING); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_KILL)); assertFalse("InternalError occurred trying to handle TA_KILL", eventHandler.internalError); assertEquals("Task should be in KILLED state", TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, taImpl.getInternalState()); }
@Test public void testContainerKillWhileCommitPending() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); Path jobFile = mock(Path.class); MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn( new InetSocketAddress("localhost", 0)); JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" }); AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), new SystemClock(), appCtx); NodeId nid = NodeId.newInstance("127.0.0.2", 0); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE)); taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, mock(Map.class))); taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); assertEquals("Task attempt is not in running state", taImpl.getState(), TaskAttemptState.RUNNING); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_COMMIT_PENDING)); assertEquals("Task should be in COMMIT_PENDING state", TaskAttemptStateInternal.COMMIT_PENDING, taImpl.getInternalState()); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_KILL)); assertFalse("InternalError occurred trying to handle TA_KILL", eventHandler.internalError); assertEquals("Task should be in KILLED state", TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, taImpl.getInternalState()); }
@Test public void testContainerCleanedWhileRunning() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); Path jobFile = mock(Path.class); MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), new SystemClock(), appCtx); NodeId nid = NodeId.newInstance("127.0.0.2", 0); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE)); taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, mock(Map.class))); taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); assertEquals("Task attempt is not in running state", taImpl.getState(), TaskAttemptState.RUNNING); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_CLEANED)); assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", eventHandler.internalError); assertEquals("Task attempt is not assigned on the local rack", Locality.RACK_LOCAL, taImpl.getLocality()); }
@Test public void testContainerCleanedWhileCommitting() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); Path jobFile = mock(Path.class); MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); when(splits.getLocations()).thenReturn(new String[] {}); AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), new SystemClock(), appCtx); NodeId nid = NodeId.newInstance("127.0.0.1", 0); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE)); taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, mock(Map.class))); taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_COMMIT_PENDING)); assertEquals("Task attempt is not in commit pending state", taImpl.getState(), TaskAttemptState.COMMIT_PENDING); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_CLEANED)); assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", eventHandler.internalError); assertEquals("Task attempt is assigned locally", Locality.OFF_SWITCH, taImpl.getLocality()); }
@Test public void testDoubleTooManyFetchFailure() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); TaskId reduceTaskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE); TaskAttemptId reduceTAId = MRBuilderUtils.newTaskAttemptId(reduceTaskId, 0); Path jobFile = mock(Path.class); MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, new Token(), new Credentials(), new SystemClock(), appCtx); NodeId nid = NodeId.newInstance("127.0.0.1", 0); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE)); taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, mock(Map.class))); taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE)); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_COMPLETED)); assertEquals("Task attempt is not in succeeded state", taImpl.getState(), TaskAttemptState.SUCCEEDED); taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(attemptId, reduceTAId, "Host")); assertEquals("Task attempt is not in FAILED state", taImpl.getState(), TaskAttemptState.FAILED); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); assertEquals("Task attempt is not in FAILED state, still", taImpl.getState(), TaskAttemptState.FAILED); assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", eventHandler.internalError); }
@Test public void testTooManyFetchFailureAfterKill() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); Path jobFile = mock(Path.class); MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, mock(Token.class), new Credentials(), new SystemClock(), appCtx); NodeId nid = NodeId.newInstance("127.0.0.1", 0); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE)); taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, mock(Map.class))); taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE)); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_COMPLETED)); assertEquals("Task attempt is not in succeeded state", taImpl.getState(), TaskAttemptState.SUCCEEDED); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_KILL)); assertEquals("Task attempt is not in KILLED state", taImpl.getState(), TaskAttemptState.KILLED); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); assertEquals("Task attempt is not in KILLED state, still", taImpl.getState(), TaskAttemptState.KILLED); assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", eventHandler.internalError); }
@Test public void testFetchFailureAttemptFinishTime() throws Exception{ ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); TaskId reducetaskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE); TaskAttemptId reduceTAId = MRBuilderUtils.newTaskAttemptId(reducetaskId, 0); Path jobFile = mock(Path.class); MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn( new InetSocketAddress("localhost", 0)); JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener,mock(Token.class), new Credentials(), new SystemClock(), appCtx); NodeId nid = NodeId.newInstance("127.0.0.1", 0); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE)); taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, mock(Map.class))); taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE)); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_CONTAINER_COMPLETED)); assertEquals("Task attempt is not in succeeded state", taImpl.getState(), TaskAttemptState.SUCCEEDED); assertTrue("Task Attempt finish time is not greater than 0", taImpl.getFinishTime() > 0); Long finishTime = taImpl.getFinishTime(); Thread.sleep(5); taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(attemptId, reduceTAId, "Host")); assertEquals("Task attempt is not in Too Many Fetch Failure state", taImpl.getState(), TaskAttemptState.FAILED); assertEquals("After TA_TOO_MANY_FETCH_FAILURE," + " Task attempt finish time is not the same ", finishTime, Long.valueOf(taImpl.getFinishTime())); }
private TaskAttemptImpl createTaskAttemptImpl( MockEventHandler eventHandler) { ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); Path jobFile = mock(Path.class); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, mock(Token.class), new Credentials(), new SystemClock(), appCtx); NodeId nid = NodeId.newInstance("127.0.0.1", 0); ContainerId contId = ContainerId.newInstance(appAttemptId, 3); Container container = mock(Container.class); when(container.getId()).thenReturn(contId); when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_SCHEDULE)); taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, mock(Map.class))); taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); return taImpl; }
protected void containerLaunched(TaskAttemptId attemptID, int shufflePort) { getContext().getEventHandler().handle( new TaskAttemptContainerLaunchedEvent(attemptID, shufflePort)); }