@Override public void initializeApplication(ApplicationInitializationContext context) { String user = context.getUser(); ApplicationId appId = context.getApplicationId(); ByteBuffer secret = context.getApplicationDataForService(); // TODO these bytes should be versioned try { Token<JobTokenIdentifier> jt = deserializeServiceData(secret); // TODO: Once SHuffle is out of NM, this can use MR APIs JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId()); recordJobShuffleInfo(jobId, user, jt); } catch (IOException e) { LOG.error("Error during initApp", e); // TODO add API to AuxiliaryServices to report failures } }
private void recoverJobShuffleInfo(String jobIdStr, byte[] data) throws IOException { JobID jobId; try { jobId = JobID.forName(jobIdStr); } catch (IllegalArgumentException e) { throw new IOException("Bad job ID " + jobIdStr + " in state store", e); } JobShuffleInfoProto proto = JobShuffleInfoProto.parseFrom(data); String user = proto.getUser(); TokenProto tokenProto = proto.getJobToken(); Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>( tokenProto.getIdentifier().toByteArray(), tokenProto.getPassword().toByteArray(), new Text(tokenProto.getKind()), new Text(tokenProto.getService())); addJobToken(jobId, user, jobToken); }
private void recordJobShuffleInfo(JobID jobId, String user, Token<JobTokenIdentifier> jobToken) throws IOException { if (stateDb != null) { TokenProto tokenProto = TokenProto.newBuilder() .setIdentifier(ByteString.copyFrom(jobToken.getIdentifier())) .setPassword(ByteString.copyFrom(jobToken.getPassword())) .setKind(jobToken.getKind().toString()) .setService(jobToken.getService().toString()) .build(); JobShuffleInfoProto proto = JobShuffleInfoProto.newBuilder() .setUser(user).setJobToken(tokenProto).build(); try { stateDb.put(bytes(jobId.toString()), proto.toByteArray()); } catch (DBException e) { throw new IOException("Error storing " + jobId, e); } } addJobToken(jobId, user, jobToken); }
private static int getShuffleResponseCode(ShuffleHandler shuffle, Token<JobTokenIdentifier> jt) throws IOException { URL url = new URL("http://127.0.0.1:" + shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + "/mapOutput?job=job_12345_0001&reduce=0&map=attempt_12345_1_m_1_0"); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); String encHash = SecureShuffleUtils.hashFromString( SecureShuffleUtils.buildMsgFrom(url), JobTokenSecretManager.createSecretKey(jt.getPassword())); conn.addRequestProperty( SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); conn.connect(); int rc = conn.getResponseCode(); conn.disconnect(); return rc; }
@Override public void initializeApplication(ApplicationInitializationContext context) { String user = context.getUser(); ApplicationId appId = context.getApplicationId(); ByteBuffer secret = context.getApplicationDataForService(); // TODO these bytes should be versioned try { Token<JobTokenIdentifier> jt = deserializeServiceData(secret); // TODO: Once SHuffle is out of NM, this can use MR APIs JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId()); userRsrc.put(jobId.toString(), user); LOG.info("Added token for " + jobId.toString()); secretManager.addTokenForJob(jobId.toString(), jt); } catch (IOException e) { LOG.error("Error during initApp", e); // TODO add API to AuxiliaryServices to report failures } }
@Override public void initializeApplication(ApplicationInitializationContext context) { String user = context.getUser(); String userFolder = context.getUserFolder(); ApplicationId appId = context.getApplicationId(); ByteBuffer secret = context.getApplicationDataForService(); // TODO these bytes should be versioned try { Token<JobTokenIdentifier> jt = deserializeServiceData(secret); // TODO: Once SHuffle is out of NM, this can use MR APIs JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId()); recordJobShuffleInfo(jobId, user, jt, userFolder); } catch (IOException e) { LOG.error("Error during initApp", e); // TODO add API to AuxiliaryServices to report failures } }
private void recoverJobShuffleInfo(String jobIdStr, byte[] data) throws IOException { JobID jobId; try { jobId = JobID.forName(jobIdStr); } catch (IllegalArgumentException e) { throw new IOException("Bad job ID " + jobIdStr + " in state store", e); } JobShuffleInfoProto proto = JobShuffleInfoProto.parseFrom(data); String user = proto.getUser(); String userFolder = proto.getUserFolder(); TokenProto tokenProto = proto.getJobToken(); Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>( tokenProto.getIdentifier().toByteArray(), tokenProto.getPassword().toByteArray(), new Text(tokenProto.getKind()), new Text(tokenProto.getService())); addJobToken(jobId, user, jobToken, userFolder); }
private void recordJobShuffleInfo(JobID jobId, String user, Token<JobTokenIdentifier> jobToken, String userFolder) throws IOException { if (stateDb != null) { TokenProto tokenProto = TokenProto.newBuilder() .setIdentifier(ByteString.copyFrom(jobToken.getIdentifier())) .setPassword(ByteString.copyFrom(jobToken.getPassword())) .setKind(jobToken.getKind().toString()) .setService(jobToken.getService().toString()) .build(); JobShuffleInfoProto proto = JobShuffleInfoProto.newBuilder() .setUser(user).setJobToken(tokenProto).setUserFolder(userFolder).build(); try { stateDb.put(bytes(jobId.toString()), proto.toByteArray()); } catch (DBException e) { throw new IOException("Error storing " + jobId, e); } } addJobToken(jobId, user, jobToken, userFolder); }
public ReduceTaskAttemptImpl(TaskId id, int attempt, EventHandler eventHandler, Path jobFile, int partition, int numMapTasks, JobConf conf, TaskAttemptListener taskAttemptListener, Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock, AppContext appContext) { super(id, attempt, eventHandler, taskAttemptListener, jobFile, partition, conf, new String[] {}, jobToken, credentials, clock, appContext); this.numMapTasks = numMapTasks; }
public MapTaskAttemptImpl(TaskId taskId, int attempt, EventHandler eventHandler, Path jobFile, int partition, TaskSplitMetaInfo splitInfo, JobConf conf, TaskAttemptListener taskAttemptListener, Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock, AppContext appContext) { super(taskId, attempt, eventHandler, taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(), jobToken, credentials, clock, appContext); this.splitInfo = splitInfo; }
public MapTaskImpl(JobId jobId, int partition, EventHandler eventHandler, Path remoteJobConfFile, JobConf conf, TaskSplitMetaInfo taskSplitMetaInfo, TaskAttemptListener taskAttemptListener, Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock, int appAttemptId, MRAppMetrics metrics, AppContext appContext) { super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile, conf, taskAttemptListener, jobToken, credentials, clock, appAttemptId, metrics, appContext); this.taskSplitMetaInfo = taskSplitMetaInfo; }
public ReduceTaskImpl(JobId jobId, int partition, EventHandler eventHandler, Path jobFile, JobConf conf, int numMapTasks, TaskAttemptListener taskAttemptListener, Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock, int appAttemptId, MRAppMetrics metrics, AppContext appContext) { super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf, taskAttemptListener, jobToken, credentials, clock, appAttemptId, metrics, appContext); this.numMapTasks = numMapTasks; }
protected void setup(JobImpl job) throws IOException { String oldJobIDString = job.oldJobId.toString(); String user = UserGroupInformation.getCurrentUser().getShortUserName(); Path path = MRApps.getStagingAreaDir(job.conf, user); if(LOG.isDebugEnabled()) { LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString); } job.remoteJobSubmitDir = FileSystem.get(job.conf).makeQualified( new Path(path, oldJobIDString)); job.remoteJobConfFile = new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE); // Prepare the TaskAttemptListener server for authentication of Containers // TaskAttemptListener gets the information via jobTokenSecretManager. JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(oldJobIDString)); job.jobToken = new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager); job.jobToken.setService(identifier.getJobId()); // Add it to the jobTokenSecretManager so that TaskAttemptListener server // can authenticate containers(tasks) job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken); LOG.info("Adding job token for " + oldJobIDString + " to jobTokenSecretManager"); // If the job client did not setup the shuffle secret then reuse // the job token secret for the shuffle. if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) { LOG.warn("Shuffle secret key missing from job credentials." + " Using job token secret as shuffle secret."); TokenCache.setShuffleSecretKey(job.jobToken.getPassword(), job.jobCredentials); } }
private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) { ApplicationId appId = ApplicationId.newInstance(clusterTimestamp, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1); int partitions = 2; Path remoteJobConfFile = mock(Path.class); JobConf conf = new JobConf(); TaskAttemptListener taskAttemptListener = mock(TaskAttemptListener.class); Token<JobTokenIdentifier> jobToken = (Token<JobTokenIdentifier>) mock(Token.class); Credentials credentials = null; Clock clock = new SystemClock(); int appAttemptId = 3; MRAppMetrics metrics = mock(MRAppMetrics.class); Resource minContainerRequirements = mock(Resource.class); when(minContainerRequirements.getMemory()).thenReturn(1000); ClusterInfo clusterInfo = mock(ClusterInfo.class); AppContext appContext = mock(AppContext.class); when(appContext.getClusterInfo()).thenReturn(clusterInfo); TaskSplitMetaInfo taskSplitMetaInfo = mock(TaskSplitMetaInfo.class); MapTaskImpl mapTask = new MapTaskImpl(jobId, partitions, eh, remoteJobConfFile, conf, taskSplitMetaInfo, taskAttemptListener, jobToken, credentials, clock, appAttemptId, metrics, appContext); return mapTask; }
public MockTaskImpl(JobId jobId, int partition, EventHandler eventHandler, Path remoteJobConfFile, JobConf conf, TaskAttemptListener taskAttemptListener, Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock, int startCount, MRAppMetrics metrics, AppContext appContext, TaskType taskType) { super(jobId, taskType , partition, eventHandler, remoteJobConfFile, conf, taskAttemptListener, jobToken, credentials, clock, startCount, metrics, appContext); this.taskType = taskType; }
public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Path jobFile, int partition, JobConf conf, Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock, AppContext appContext, TaskType taskType) { super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf, dataLocations, jobToken, credentials, clock, appContext); this.taskType = taskType; }
@Before @SuppressWarnings("unchecked") public void setup() { dispatcher = new InlineDispatcher(); ++startCount; conf = new JobConf(); taskAttemptListener = mock(TaskAttemptListener.class); jobToken = (Token<JobTokenIdentifier>) mock(Token.class); remoteJobConfFile = mock(Path.class); credentials = null; clock = new SystemClock(); metrics = mock(MRAppMetrics.class); dataLocations = new String[1]; appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); jobId = Records.newRecord(JobId.class); jobId.setId(1); jobId.setAppId(appId); appContext = mock(AppContext.class); taskSplitMetaInfo = mock(TaskSplitMetaInfo.class); when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations); taskAttempts = new ArrayList<MockTaskAttemptImpl>(); }
static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret) throws IOException { DataInputByteBuffer in = new DataInputByteBuffer(); in.reset(secret); Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(); jt.readFields(in); return jt; }