@Override protected ContainerLauncher createContainerLauncher(AppContext context) { return new MockContainerLauncher() { @Override public void handle(ContainerLauncherEvent event) { if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) { ContainerRemoteLaunchEvent launchEvent = (ContainerRemoteLaunchEvent) event; ContainerLaunchContext launchContext = launchEvent.getContainerLaunchContext(); String cmdString = launchContext.getCommands().toString(); LOG.info("launchContext " + cmdString); myCommandLine = cmdString; cmdEnvironment = launchContext.getEnvironment(); } super.handle(event); } }; }
@Override protected ContainerLauncher createContainerLauncher(AppContext context) { return new MockContainerLauncher() { @Override public void handle(ContainerLauncherEvent event) { if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) { ContainerRemoteLaunchEvent launchEvent = (ContainerRemoteLaunchEvent) event; ContainerLaunchContext launchContext = launchEvent.getContainerLaunchContext(); String cmdString = launchContext.getCommands().toString(); LOG.info("launchContext " + cmdString); launchCmdList.add(cmdString); cmdEnvironment = launchContext.getEnvironment(); } super.handle(event); } }; }
@Override protected ContainerLauncher createContainerLauncher(AppContext context) { return new MockContainerLauncher() { @Override public void handle(ContainerLauncherEvent event) { if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) { ContainerRemoteLaunchEvent launchEvent = (ContainerRemoteLaunchEvent) event; ContainerLaunchContext launchContext = launchEvent.getContainerLaunchContext(); String cmdString = launchContext.getCommands().toString(); LOG.info("launchContext " + cmdString); myCommandLine = cmdString; } super.handle(event); } }; }
@SuppressWarnings({ "unchecked" }) @Override public void transition(final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { final TaskAttemptContainerAssignedEvent cEvent = (TaskAttemptContainerAssignedEvent) event; Container container = cEvent.getContainer(); taskAttempt.container = container; // this is a _real_ Task (classic Hadoop mapred flavor): taskAttempt.remoteTask = taskAttempt.createRemoteTask(); taskAttempt.jvmID = new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(), taskAttempt.remoteTask.isMapTask(), taskAttempt.container.getId().getContainerId()); taskAttempt.taskAttemptListener.registerPendingTask( taskAttempt.remoteTask, taskAttempt.jvmID); taskAttempt.computeRackAndLocality(); //launch the container //create the container object to be launched for a given Task attempt ContainerLaunchContext launchContext = createContainerLaunchContext( cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID, taskAttempt.taskAttemptListener, taskAttempt.credentials); taskAttempt.eventHandler .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId, launchContext, container, taskAttempt.remoteTask)); // send event to speculator that our container needs are satisfied taskAttempt.eventHandler.handle (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1)); }
@Test public void testContainerPassThrough() throws Exception { MRApp app = new MRApp(0, 1, true, this.getClass().getName(), true) { @Override protected ContainerLauncher createContainerLauncher(AppContext context) { return new MockContainerLauncher() { @Override public void handle(ContainerLauncherEvent event) { if (event instanceof ContainerRemoteLaunchEvent) { containerObtainedByContainerLauncher = ((ContainerRemoteLaunchEvent) event).getAllocatedContainer(); } super.handle(event); } }; }; }; Job job = app.submit(new Configuration()); app.waitForState(job, JobState.SUCCEEDED); app.verifyCompleted(); Collection<Task> tasks = job.getTasks().values(); Collection<TaskAttempt> taskAttempts = tasks.iterator().next().getAttempts().values(); TaskAttemptImpl taskAttempt = (TaskAttemptImpl) taskAttempts.iterator().next(); // Container from RM should pass through to the launcher. Container object // should be the same. Assert.assertTrue(taskAttempt.container == containerObtainedByContainerLauncher); }
@SuppressWarnings({ "unchecked" }) @Override public void transition(final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { final TaskAttemptContainerAssignedEvent cEvent = (TaskAttemptContainerAssignedEvent) event; Container container = cEvent.getContainer(); taskAttempt.container = container; // this is a _real_ Task (classic Hadoop mapred flavor): taskAttempt.remoteTask = taskAttempt.createRemoteTask(); taskAttempt.jvmID = new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(), taskAttempt.remoteTask.isMapTask(), taskAttempt.container.getId() .getId()); taskAttempt.taskAttemptListener.registerPendingTask( taskAttempt.remoteTask, taskAttempt.jvmID); taskAttempt.computeRackAndLocality(); //launch the container //create the container object to be launched for a given Task attempt ContainerLaunchContext launchContext = createContainerLaunchContext( cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID, taskAttempt.taskAttemptListener, taskAttempt.credentials); taskAttempt.eventHandler .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId, launchContext, container, taskAttempt.remoteTask)); // send event to speculator that our container needs are satisfied taskAttempt.eventHandler.handle (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1)); }
@SuppressWarnings("rawtypes") @Test(timeout=10000) public void testKillJob() throws Exception { JobConf conf = new JobConf(); AppContext context = mock(AppContext.class); // a simple event handler solely to detect the container cleaned event final CountDownLatch isDone = new CountDownLatch(1); EventHandler handler = new EventHandler() { @Override public void handle(Event event) { LOG.info("handling event " + event.getClass() + " with type " + event.getType()); if (event instanceof TaskAttemptEvent) { if (event.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) { isDone.countDown(); } } } }; when(context.getEventHandler()).thenReturn(handler); // create and start the launcher LocalContainerLauncher launcher = new LocalContainerLauncher(context, mock(TaskUmbilicalProtocol.class)); launcher.init(conf); launcher.start(); // create mocked job, task, and task attempt // a single-mapper job JobId jobId = MRBuilderUtils.newJobId(System.currentTimeMillis(), 1, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptId taId = MRBuilderUtils.newTaskAttemptId(taskId, 0); Job job = mock(Job.class); when(job.getTotalMaps()).thenReturn(1); when(job.getTotalReduces()).thenReturn(0); Map<JobId,Job> jobs = new HashMap<JobId,Job>(); jobs.put(jobId, job); // app context returns the one and only job when(context.getAllJobs()).thenReturn(jobs); org.apache.hadoop.mapreduce.v2.app.job.Task ytask = mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class); when(ytask.getType()).thenReturn(TaskType.MAP); when(job.getTask(taskId)).thenReturn(ytask); // create a sleeping mapper that runs beyond the test timeout MapTask mapTask = mock(MapTask.class); when(mapTask.isMapOrReduce()).thenReturn(true); when(mapTask.isMapTask()).thenReturn(true); TaskAttemptID taskID = TypeConverter.fromYarn(taId); when(mapTask.getTaskID()).thenReturn(taskID); when(mapTask.getJobID()).thenReturn(taskID.getJobID()); doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { // sleep for a long time LOG.info("sleeping for 5 minutes..."); Thread.sleep(5*60*1000); return null; } }).when(mapTask).run(isA(JobConf.class), isA(TaskUmbilicalProtocol.class)); // pump in a task attempt launch event ContainerLauncherEvent launchEvent = new ContainerRemoteLaunchEvent(taId, null, createMockContainer(), mapTask); launcher.handle(launchEvent); Thread.sleep(200); // now pump in a container clean-up event ContainerLauncherEvent cleanupEvent = new ContainerLauncherEvent(taId, null, null, null, ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP); launcher.handle(cleanupEvent); // wait for the event to fire: this should be received promptly isDone.await(); launcher.close(); }
@SuppressWarnings({ "unchecked" }) @Override public void transition(final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { final TaskAttemptContainerAssignedEvent cEvent = (TaskAttemptContainerAssignedEvent) event; Container container = cEvent.getContainer(); taskAttempt.container = container; // this is a _real_ Task (classic Hadoop mapred flavor): taskAttempt.remoteTask = taskAttempt.createRemoteTask(); /* * CDH5.4.0 includes YARN-2312 that bumps up the container-id from 32 * to 64 bits to include the RM epoch so container-ids are unique * across RM restarts. MR JVMId is also updated to use the 64-bit * version of container-id leading to failures on rolling upgrade from * CDH5.3.x to CDH5.4.y (y < 3). * * For 5.4.z (z > 2), let us use the 32-bit version of container-id * for JVMId#jvmId to ensure rolling upgrades from 5.3.x * to 5.4.x work. This shouldn't interfere with 5.5 and beyond. */ taskAttempt.jvmID = new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(), taskAttempt.remoteTask.isMapTask(), taskAttempt.container.getId().getId()); taskAttempt.taskAttemptListener.registerPendingTask( taskAttempt.remoteTask, taskAttempt.jvmID); taskAttempt.computeRackAndLocality(); //launch the container //create the container object to be launched for a given Task attempt ContainerLaunchContext launchContext = createContainerLaunchContext( cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID, taskAttempt.taskAttemptListener, taskAttempt.credentials); taskAttempt.eventHandler .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId, launchContext, container, taskAttempt.remoteTask)); // send event to speculator that our container needs are satisfied taskAttempt.eventHandler.handle (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1)); }
@SuppressWarnings({ "unchecked" }) @Override public void transition(final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { final TaskAttemptContainerAssignedEvent cEvent = (TaskAttemptContainerAssignedEvent) event; Container container = cEvent.getContainer(); taskAttempt.container = container; // this is a _real_ Task (classic Hadoop mapred flavor): if(taskAttempt instanceof MultiMapTaskAttemptImpl){ if(((MultiMapTaskAttemptImpl) taskAttempt).getTaskSplitMetaInfo()==null){ // we do nothing here if we find splitinfo is null LOG.info("quit container from"+taskAttempt.getID().toString()); taskAttempt.eventHandler.handle( new TaskEvent(taskAttempt.getID().getTaskId(), TaskEventType.T_KILL)); return; } } LOG.info("container assigned for attempt"+taskAttempt.getID().toString()); taskAttempt.remoteTask = taskAttempt.createRemoteTask(); taskAttempt.jvmID = new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(), taskAttempt.remoteTask.isMapTask(), taskAttempt.container.getId().getContainerId()); taskAttempt.taskAttemptListener.registerPendingTask( taskAttempt.remoteTask, taskAttempt.jvmID); taskAttempt.computeRackAndLocality(); //launch the container //create the container object to be launched for a given Task attempt ContainerLaunchContext launchContext = createContainerLaunchContext( cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID, taskAttempt.taskAttemptListener, taskAttempt.credentials); taskAttempt.eventHandler .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId, launchContext, container, taskAttempt.remoteTask)); // send event to speculator that our container needs are satisfied taskAttempt.eventHandler.handle (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1)); }