@SuppressWarnings({ "unchecked" }) @Override public void transition(final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { final TaskAttemptContainerAssignedEvent cEvent = (TaskAttemptContainerAssignedEvent) event; Container container = cEvent.getContainer(); taskAttempt.container = container; // this is a _real_ Task (classic Hadoop mapred flavor): taskAttempt.remoteTask = taskAttempt.createRemoteTask(); taskAttempt.jvmID = new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(), taskAttempt.remoteTask.isMapTask(), taskAttempt.container.getId().getContainerId()); taskAttempt.taskAttemptListener.registerPendingTask( taskAttempt.remoteTask, taskAttempt.jvmID); taskAttempt.computeRackAndLocality(); //launch the container //create the container object to be launched for a given Task attempt ContainerLaunchContext launchContext = createContainerLaunchContext( cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID, taskAttempt.taskAttemptListener, taskAttempt.credentials); taskAttempt.eventHandler .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId, launchContext, container, taskAttempt.remoteTask)); // send event to speculator that our container needs are satisfied taskAttempt.eventHandler.handle (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1)); }
@SuppressWarnings({ "unchecked" }) @Override public void transition(final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { final TaskAttemptContainerAssignedEvent cEvent = (TaskAttemptContainerAssignedEvent) event; Container container = cEvent.getContainer(); taskAttempt.container = container; // this is a _real_ Task (classic Hadoop mapred flavor): taskAttempt.remoteTask = taskAttempt.createRemoteTask(); taskAttempt.jvmID = new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(), taskAttempt.remoteTask.isMapTask(), taskAttempt.container.getId() .getId()); taskAttempt.taskAttemptListener.registerPendingTask( taskAttempt.remoteTask, taskAttempt.jvmID); taskAttempt.computeRackAndLocality(); //launch the container //create the container object to be launched for a given Task attempt ContainerLaunchContext launchContext = createContainerLaunchContext( cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID, taskAttempt.taskAttemptListener, taskAttempt.credentials); taskAttempt.eventHandler .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId, launchContext, container, taskAttempt.remoteTask)); // send event to speculator that our container needs are satisfied taskAttempt.eventHandler.handle (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1)); }
static ContainerLaunchContext createContainerLaunchContext( Map<ApplicationAccessType, String> applicationACLs, Configuration conf, Token<JobTokenIdentifier> jobToken, Task remoteTask, final org.apache.hadoop.mapred.JobID oldJobId, WrappedJvmID jvmID, TaskAttemptListener taskAttemptListener, Credentials credentials) { synchronized (commonContainerSpecLock) { if (commonContainerSpec == null) { commonContainerSpec = createCommonContainerLaunchContext( applicationACLs, conf, jobToken, oldJobId, credentials); } } // Fill in the fields needed per-container that are missing in the common // spec. boolean userClassesTakesPrecedence = conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false); // Setup environment by cloning from common env. Map<String, String> env = commonContainerSpec.getEnvironment(); Map<String, String> myEnv = new HashMap<String, String>(env.size()); myEnv.putAll(env); if (userClassesTakesPrecedence) { myEnv.put(Environment.CLASSPATH_PREPEND_DISTCACHE.name(), "true"); } MapReduceChildJVM.setVMEnv(myEnv, remoteTask); // Set up the launch command List<String> commands = MapReduceChildJVM.getVMCommand( taskAttemptListener.getAddress(), remoteTask, jvmID); // Duplicate the ByteBuffers for access by multiple containers. Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>(); for (Entry<String, ByteBuffer> entry : commonContainerSpec .getServiceData().entrySet()) { myServiceData.put(entry.getKey(), entry.getValue().duplicate()); } // Construct the actual Container ContainerLaunchContext container = ContainerLaunchContext.newInstance( commonContainerSpec.getLocalResources(), myEnv, commands, myServiceData, commonContainerSpec.getTokens().duplicate(), applicationACLs); return container; }
@Test public void testShuffleProviders() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); Path jobFile = mock(Path.class); EventHandler eventHandler = mock(EventHandler.class); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); jobConf.set(YarnConfiguration.NM_AUX_SERVICES, TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID + "," + TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID); String serviceName = TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID; String serviceStr = String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, serviceName); jobConf.set(serviceStr, TestShuffleHandler1.class.getName()); serviceName = TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID; serviceStr = String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, serviceName); jobConf.set(serviceStr, TestShuffleHandler2.class.getName()); jobConf.set(MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES, TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID + "," + TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID); Credentials credentials = new Credentials(); Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>( ("tokenid").getBytes(), ("tokenpw").getBytes(), new Text("tokenkind"), new Text("tokenservice")); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, mock(TaskSplitMetaInfo.class), jobConf, taListener, jobToken, credentials, new SystemClock(), null); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString()); ContainerLaunchContext launchCtx = TaskAttemptImpl.createContainerLaunchContext(null, jobConf, jobToken, taImpl.createRemoteTask(), TypeConverter.fromYarn(jobId), mock(WrappedJvmID.class), taListener, credentials); Map<String, ByteBuffer> serviceDataMap = launchCtx.getServiceData(); Assert.assertNotNull("TestShuffleHandler1 is missing", serviceDataMap.get(TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID)); Assert.assertNotNull("TestShuffleHandler2 is missing", serviceDataMap.get(TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID)); Assert.assertTrue("mismatch number of services in map", serviceDataMap.size() == 3); // 2 that we entered + 1 for the built-in shuffle-provider }
@Test public void testAttemptContainerRequest() throws Exception { final Text SECRET_KEY_ALIAS = new Text("secretkeyalias"); final byte[] SECRET_KEY = ("secretkey").getBytes(); Map<ApplicationAccessType, String> acls = new HashMap<ApplicationAccessType, String>(1); acls.put(ApplicationAccessType.VIEW_APP, "otheruser"); ApplicationId appId = ApplicationId.newInstance(1, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); Path jobFile = mock(Path.class); EventHandler eventHandler = mock(EventHandler.class); TaskAttemptListener taListener = mock(TaskAttemptListener.class); when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); JobConf jobConf = new JobConf(); jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); jobConf.setBoolean("fs.file.impl.disable.cache", true); jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); // setup UGI for security so tokens and keys are preserved jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(jobConf); Credentials credentials = new Credentials(); credentials.addSecretKey(SECRET_KEY_ALIAS, SECRET_KEY); Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>( ("tokenid").getBytes(), ("tokenpw").getBytes(), new Text("tokenkind"), new Text("tokenservice")); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, mock(TaskSplitMetaInfo.class), jobConf, taListener, jobToken, credentials, new SystemClock(), null); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString()); ContainerLaunchContext launchCtx = TaskAttemptImpl.createContainerLaunchContext(acls, jobConf, jobToken, taImpl.createRemoteTask(), TypeConverter.fromYarn(jobId), mock(WrappedJvmID.class), taListener, credentials); Assert.assertEquals("ACLs mismatch", acls, launchCtx.getApplicationACLs()); Credentials launchCredentials = new Credentials(); DataInputByteBuffer dibb = new DataInputByteBuffer(); dibb.reset(launchCtx.getTokens()); launchCredentials.readTokenStorageStream(dibb); // verify all tokens specified for the task attempt are in the launch context for (Token<? extends TokenIdentifier> token : credentials.getAllTokens()) { Token<? extends TokenIdentifier> launchToken = launchCredentials.getToken(token.getService()); Assert.assertNotNull("Token " + token.getService() + " is missing", launchToken); Assert.assertEquals("Token " + token.getService() + " mismatch", token, launchToken); } // verify the secret key is in the launch context Assert.assertNotNull("Secret key missing", launchCredentials.getSecretKey(SECRET_KEY_ALIAS)); Assert.assertTrue("Secret key mismatch", Arrays.equals(SECRET_KEY, launchCredentials.getSecretKey(SECRET_KEY_ALIAS))); }
static ContainerLaunchContext createContainerLaunchContext( Map<ApplicationAccessType, String> applicationACLs, Configuration conf, Token<JobTokenIdentifier> jobToken, Task remoteTask, final org.apache.hadoop.mapred.JobID oldJobId, WrappedJvmID jvmID, TaskAttemptListener taskAttemptListener, Credentials credentials) { synchronized (commonContainerSpecLock) { if (commonContainerSpec == null) { commonContainerSpec = createCommonContainerLaunchContext( applicationACLs, conf, jobToken, oldJobId, credentials); } } // Fill in the fields needed per-container that are missing in the common // spec. // Setup environment by cloning from common env. Map<String, String> env = commonContainerSpec.getEnvironment(); Map<String, String> myEnv = new HashMap<String, String>(env.size()); myEnv.putAll(env); MapReduceChildJVM.setVMEnv(myEnv, remoteTask); // Set up the launch command List<String> commands = MapReduceChildJVM.getVMCommand( taskAttemptListener.getAddress(), remoteTask, jvmID); // Duplicate the ByteBuffers for access by multiple containers. Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>(); for (Entry<String, ByteBuffer> entry : commonContainerSpec .getServiceData().entrySet()) { myServiceData.put(entry.getKey(), entry.getValue().duplicate()); } // Construct the actual Container ContainerLaunchContext container = ContainerLaunchContext.newInstance( commonContainerSpec.getLocalResources(), myEnv, commands, myServiceData, commonContainerSpec.getTokens().duplicate(), applicationACLs); return container; }
@SuppressWarnings({ "unchecked" }) @Override public void transition(final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { final TaskAttemptContainerAssignedEvent cEvent = (TaskAttemptContainerAssignedEvent) event; Container container = cEvent.getContainer(); taskAttempt.container = container; // this is a _real_ Task (classic Hadoop mapred flavor): taskAttempt.remoteTask = taskAttempt.createRemoteTask(); /* * CDH5.4.0 includes YARN-2312 that bumps up the container-id from 32 * to 64 bits to include the RM epoch so container-ids are unique * across RM restarts. MR JVMId is also updated to use the 64-bit * version of container-id leading to failures on rolling upgrade from * CDH5.3.x to CDH5.4.y (y < 3). * * For 5.4.z (z > 2), let us use the 32-bit version of container-id * for JVMId#jvmId to ensure rolling upgrades from 5.3.x * to 5.4.x work. This shouldn't interfere with 5.5 and beyond. */ taskAttempt.jvmID = new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(), taskAttempt.remoteTask.isMapTask(), taskAttempt.container.getId().getId()); taskAttempt.taskAttemptListener.registerPendingTask( taskAttempt.remoteTask, taskAttempt.jvmID); taskAttempt.computeRackAndLocality(); //launch the container //create the container object to be launched for a given Task attempt ContainerLaunchContext launchContext = createContainerLaunchContext( cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID, taskAttempt.taskAttemptListener, taskAttempt.credentials); taskAttempt.eventHandler .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId, launchContext, container, taskAttempt.remoteTask)); // send event to speculator that our container needs are satisfied taskAttempt.eventHandler.handle (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1)); }
@SuppressWarnings({ "unchecked" }) @Override public void transition(final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { final TaskAttemptContainerAssignedEvent cEvent = (TaskAttemptContainerAssignedEvent) event; Container container = cEvent.getContainer(); taskAttempt.container = container; // this is a _real_ Task (classic Hadoop mapred flavor): if(taskAttempt instanceof MultiMapTaskAttemptImpl){ if(((MultiMapTaskAttemptImpl) taskAttempt).getTaskSplitMetaInfo()==null){ // we do nothing here if we find splitinfo is null LOG.info("quit container from"+taskAttempt.getID().toString()); taskAttempt.eventHandler.handle( new TaskEvent(taskAttempt.getID().getTaskId(), TaskEventType.T_KILL)); return; } } LOG.info("container assigned for attempt"+taskAttempt.getID().toString()); taskAttempt.remoteTask = taskAttempt.createRemoteTask(); taskAttempt.jvmID = new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(), taskAttempt.remoteTask.isMapTask(), taskAttempt.container.getId().getContainerId()); taskAttempt.taskAttemptListener.registerPendingTask( taskAttempt.remoteTask, taskAttempt.jvmID); taskAttempt.computeRackAndLocality(); //launch the container //create the container object to be launched for a given Task attempt ContainerLaunchContext launchContext = createContainerLaunchContext( cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID, taskAttempt.taskAttemptListener, taskAttempt.credentials); taskAttempt.eventHandler .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId, launchContext, container, taskAttempt.remoteTask)); // send event to speculator that our container needs are satisfied taskAttempt.eventHandler.handle (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1)); }
/** * Register a JVM with the listener. This should be called as soon as a * JVM ID is assigned to a task attempt, before it has been launched. * @param task the task itself for this JVM. * @param jvmID The ID of the JVM . */ void registerPendingTask(Task task, WrappedJvmID jvmID);
/** * Register task attempt. This should be called when the JVM has been * launched. * * @param attemptID * the id of the attempt for this JVM. * @param jvmID the ID of the JVM. */ void registerLaunchedTask(TaskAttemptId attemptID, WrappedJvmID jvmID);
/** * Unregister the JVM and the attempt associated with it. This should be * called when the attempt/JVM has finished executing and is being cleaned up. * @param attemptID the ID of the attempt. * @param jvmID the ID of the JVM for that attempt. */ void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID);