private static StubbedJob createStubbedJob(Configuration conf, Dispatcher dispatcher, int numSplits, AppContext appContext) { JobID jobID = JobID.forName("job_1234567890000_0001"); JobId jobId = TypeConverter.toYarn(jobID); if (appContext == null) { appContext = mock(AppContext.class); when(appContext.hasSuccessfullyUnregistered()).thenReturn(true); } StubbedJob job = new StubbedJob(jobId, ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0), conf,dispatcher.getEventHandler(), true, "somebody", numSplits, appContext); dispatcher.register(JobEventType.class, job); EventHandler mockHandler = mock(EventHandler.class); dispatcher.register(TaskEventType.class, mockHandler); dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class, mockHandler); dispatcher.register(JobFinishEvent.Type.class, mockHandler); return job; }
@Override public void handle(ContainerAllocatorEvent event) { ContainerId cId = ContainerId.newContainerId(getContext().getApplicationAttemptId(), containerCount++); NodeId nodeId = NodeId.newInstance(NM_HOST, NM_PORT); Resource resource = Resource.newInstance(1234, 2, 2); ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier(cId, nodeId.toString(), "user", resource, System.currentTimeMillis() + 10000, 42, 42, Priority.newInstance(0), 0); Token containerToken = newContainerToken(nodeId, "password".getBytes(), containerTokenIdentifier); Container container = Container.newInstance(cId, nodeId, NM_HOST + ":" + NM_HTTP_PORT, resource, null, containerToken); JobID id = TypeConverter.fromYarn(applicationId); JobId jobId = TypeConverter.toYarn(id); getContext().getEventHandler().handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.REDUCE, 100))); getContext().getEventHandler().handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.MAP, 100))); getContext().getEventHandler().handle( new TaskAttemptContainerAssignedEvent(event.getAttemptID(), container, null)); }
@Test public void testJobNamePercentEncoding() throws IOException { JobIndexInfo info = new JobIndexInfo(); JobID oldJobId = JobID.forName(JOB_ID); JobId jobId = TypeConverter.toYarn(oldJobId); info.setJobId(jobId); info.setSubmitTime(Long.parseLong(SUBMIT_TIME)); info.setUser(USER_NAME); info.setJobName(JOB_NAME_WITH_DELIMITER); info.setFinishTime(Long.parseLong(FINISH_TIME)); info.setNumMaps(Integer.parseInt(NUM_MAPS)); info.setNumReduces(Integer.parseInt(NUM_REDUCES)); info.setJobStatus(JOB_STATUS); info.setQueueName(QUEUE_NAME); info.setJobStartTime(Long.parseLong(JOB_START_TIME)); String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info); Assert.assertTrue("Job name not encoded correctly into job history file", jobHistoryFile.contains(JOB_NAME_WITH_DELIMITER_ESCAPE)); }
@Test public void testQueueNamePercentEncoding() throws IOException { JobIndexInfo info = new JobIndexInfo(); JobID oldJobId = JobID.forName(JOB_ID); JobId jobId = TypeConverter.toYarn(oldJobId); info.setJobId(jobId); info.setSubmitTime(Long.parseLong(SUBMIT_TIME)); info.setUser(USER_NAME); info.setJobName(JOB_NAME); info.setFinishTime(Long.parseLong(FINISH_TIME)); info.setNumMaps(Integer.parseInt(NUM_MAPS)); info.setNumReduces(Integer.parseInt(NUM_REDUCES)); info.setJobStatus(JOB_STATUS); info.setQueueName(QUEUE_NAME_WITH_DELIMITER); info.setJobStartTime(Long.parseLong(JOB_START_TIME)); String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info); Assert.assertTrue("Queue name not encoded correctly into job history file", jobHistoryFile.contains(QUEUE_NAME_WITH_DELIMITER_ESCAPE)); }
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName, HistoryEventEmitter thatg) { JobID jobID = JobID.forName(jobIDName); if (jobIDName == null) { return null; } String priority = line.get("JOB_PRIORITY"); if (priority != null) { return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority)); } return null; }
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName, HistoryEventEmitter thatg) { if (jobIDName == null) { return null; } JobID jobID = JobID.forName(jobIDName); String status = line.get("JOB_STATUS"); if (status != null) { return new JobStatusChangedEvent(jobID, status); } return null; }
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName, HistoryEventEmitter thatg) { if (jobIDName == null) { return null; } JobID jobID = JobID.forName(jobIDName); String launchTime = line.get("LAUNCH_TIME"); if (launchTime != null) { Job20LineHistoryEventEmitter that = (Job20LineHistoryEventEmitter) thatg; return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long .parseLong(launchTime)); } return null; }
/** * test some methods of CompletedTaskAttempt */ @Test (timeout=5000) public void testCompletedTaskAttempt(){ TaskAttemptInfo attemptInfo= mock(TaskAttemptInfo.class); when(attemptInfo.getRackname()).thenReturn("Rackname"); when(attemptInfo.getShuffleFinishTime()).thenReturn(11L); when(attemptInfo.getSortFinishTime()).thenReturn(12L); when(attemptInfo.getShufflePort()).thenReturn(10); JobID jobId= new JobID("12345",0); TaskID taskId =new TaskID(jobId,TaskType.REDUCE, 0); TaskAttemptID taskAttemptId= new TaskAttemptID(taskId, 0); when(attemptInfo.getAttemptId()).thenReturn(taskAttemptId); CompletedTaskAttempt taskAttemt= new CompletedTaskAttempt(null,attemptInfo); assertEquals( "Rackname", taskAttemt.getNodeRackName()); assertEquals( Phase.CLEANUP, taskAttemt.getPhase()); assertTrue( taskAttemt.isFinished()); assertEquals( 11L, taskAttemt.getShuffleFinishTime()); assertEquals( 12L, taskAttemt.getSortFinishTime()); assertEquals( 10, taskAttemt.getShufflePort()); }
@Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); commitThreadCancelTimeoutMs = conf.getInt( MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS); commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS, MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS); try { fs = FileSystem.get(conf); JobID id = TypeConverter.fromYarn(context.getApplicationID()); JobId jobId = TypeConverter.toYarn(id); String user = UserGroupInformation.getCurrentUser().getShortUserName(); startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId); endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId); endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId); } catch (IOException e) { throw new YarnRuntimeException(e); } }
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName, HistoryEventEmitter thatg) { if (jobIDName == null) { return null; } JobID jobID = JobID.forName(jobIDName); String finishTime = line.get("FINISH_TIME"); String status = line.get("JOB_STATUS"); String finishedMaps = line.get("FINISHED_MAPS"); String finishedReduces = line.get("FINISHED_REDUCES"); String failedMaps = line.get("FAILED_MAPS"); String failedReduces = line.get("FAILED_REDUCES"); String counters = line.get("COUNTERS"); if (status != null && status.equalsIgnoreCase("success") && finishTime != null && finishedMaps != null && finishedReduces != null) { return new JobFinishedEvent(jobID, Long.parseLong(finishTime), Integer .parseInt(finishedMaps), Integer.parseInt(finishedReduces), Integer .parseInt(failedMaps), Integer.parseInt(failedReduces), null, null, maybeParseCounters(counters)); } return null; }
/** * test a getters of TaskAttemptFinishedEvent and TaskAttemptFinished * * @throws Exception */ @Test(timeout = 10000) public void testTaskAttemptFinishedEvent() throws Exception { JobID jid = new JobID("001", 1); TaskID tid = new TaskID(jid, TaskType.REDUCE, 2); TaskAttemptID taskAttemptId = new TaskAttemptID(tid, 3); Counters counters = new Counters(); TaskAttemptFinishedEvent test = new TaskAttemptFinishedEvent(taskAttemptId, TaskType.REDUCE, "TEST", 123L, "RAKNAME", "HOSTNAME", "STATUS", counters); assertEquals(test.getAttemptId().toString(), taskAttemptId.toString()); assertEquals(test.getCounters(), counters); assertEquals(test.getFinishTime(), 123L); assertEquals(test.getHostname(), "HOSTNAME"); assertEquals(test.getRackName(), "RAKNAME"); assertEquals(test.getState(), "STATUS"); assertEquals(test.getTaskId(), tid); assertEquals(test.getTaskStatus(), "TEST"); assertEquals(test.getTaskType(), TaskType.REDUCE); }
public synchronized ClientServiceDelegate getClient(JobID jobId) { if (hsProxy == null) { try { hsProxy = instantiateHistoryProxy(); } catch (IOException e) { LOG.warn("Could not connect to History server.", e); throw new YarnRuntimeException("Could not connect to History server.", e); } } ClientServiceDelegate client = cache.get(jobId); if (client == null) { client = new ClientServiceDelegate(conf, rm, jobId, hsProxy); cache.put(jobId, client); } return client; }
public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0); GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class); request.setJobId(jobID); Counters cnt = ((GetCountersResponse) invoke("getCounters", GetCountersRequest.class, request)).getCounters(); return TypeConverter.fromYarn(cnt); }
public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter .toYarn(arg0); GetTaskAttemptCompletionEventsRequest request = recordFactory .newRecordInstance(GetTaskAttemptCompletionEventsRequest.class); request.setJobId(jobID); request.setFromEventId(arg1); request.setMaxEvents(arg2); List<org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent> list = ((GetTaskAttemptCompletionEventsResponse) invoke( "getTaskAttemptCompletionEvents", GetTaskAttemptCompletionEventsRequest.class, request)). getCompletionEventList(); return TypeConverter .fromYarn(list .toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0])); }
public JobStatus getJobStatus(JobID oldJobID) throws IOException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter.toYarn(oldJobID); GetJobReportRequest request = recordFactory.newRecordInstance(GetJobReportRequest.class); request.setJobId(jobId); JobReport report = ((GetJobReportResponse) invoke("getJobReport", GetJobReportRequest.class, request)).getJobReport(); JobStatus jobStatus = null; if (report != null) { if (StringUtils.isEmpty(report.getJobFile())) { String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID); report.setJobFile(jobFile); } String historyTrackingUrl = report.getTrackingUrl(); String url = StringUtils.isNotEmpty(historyTrackingUrl) ? historyTrackingUrl : trackingUrl; jobStatus = TypeConverter.fromYarn(report, url); } return jobStatus; }
public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType) throws IOException{ org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter.toYarn(oldJobID); GetTaskReportsRequest request = recordFactory.newRecordInstance(GetTaskReportsRequest.class); request.setJobId(jobId); request.setTaskType(TypeConverter.toYarn(taskType)); List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports = ((GetTaskReportsResponse) invoke("getTaskReports", GetTaskReportsRequest.class, request)).getTaskReportList(); return TypeConverter.fromYarn (taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]); }
@Test public void testMaxBlockLocationsNewSplits() throws Exception { TEST_DIR.mkdirs(); try { Configuration conf = new Configuration(); conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4); Path submitDir = new Path(TEST_DIR.getAbsolutePath()); FileSystem fs = FileSystem.getLocal(conf); FileSplit split = new FileSplit(new Path("/some/path"), 0, 1, new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" }); JobSplitWriter.createSplitFiles(submitDir, conf, fs, new FileSplit[] { split }); JobSplit.TaskSplitMetaInfo[] infos = SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf, submitDir); assertEquals("unexpected number of splits", 1, infos.length); assertEquals("unexpected number of split locations", 4, infos[0].getLocations().length); } finally { FileUtil.fullyDelete(TEST_DIR); } }
/** * Mask the job ID part in a {@link TaskAttemptID}. * * @param attemptId * raw {@link TaskAttemptID} read from trace * @return masked {@link TaskAttemptID} with empty {@link JobID}. */ private TaskAttemptID maskAttemptID(TaskAttemptID attemptId) { JobID jobId = new JobID(); TaskType taskType = attemptId.getTaskType(); TaskID taskId = attemptId.getTaskID(); return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), taskType, taskId.getId(), attemptId.getId()); }
@Test (timeout=5000) public void testJobInfo() throws IOException { JobID jid = new JobID("001", 1); Text user = new Text("User"); Path path = new Path("/tmp/test"); JobInfo info = new JobInfo(jid, user, path); ByteArrayOutputStream out = new ByteArrayOutputStream(); info.write(new DataOutputStream(out)); JobInfo copyinfo = new JobInfo(); copyinfo.readFields(new DataInputStream(new ByteArrayInputStream(out .toByteArray()))); assertEquals(info.getJobID().toString(), copyinfo.getJobID().toString()); assertEquals(info.getJobSubmitDir().getName(), copyinfo.getJobSubmitDir() .getName()); assertEquals(info.getUser().toString(), copyinfo.getUser().toString()); }
@Test(timeout=60000) public void testJobKillTimeout() throws Exception { long timeToWaitBeforeHardKill = 10000 + MRJobConfig.DEFAULT_MR_AM_HARD_KILL_TIMEOUT_MS; conf.setLong(MRJobConfig.MR_AM_HARD_KILL_TIMEOUT_MS, timeToWaitBeforeHardKill); clientDelegate = mock(ClientServiceDelegate.class); doAnswer( new Answer<ClientServiceDelegate>() { @Override public ClientServiceDelegate answer(InvocationOnMock invocation) throws Throwable { return clientDelegate; } } ).when(clientCache).getClient(any(JobID.class)); when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f, State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp")); long startTimeMillis = System.currentTimeMillis(); yarnRunner.killJob(jobId); assertTrue("killJob should have waited at least " + timeToWaitBeforeHardKill + " ms.", System.currentTimeMillis() - startTimeMillis >= timeToWaitBeforeHardKill); }
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName, HistoryEventEmitter thatg) { if (jobIDName == null) { return null; } JobID jobID = JobID.forName(jobIDName); String launchTime = line.get("LAUNCH_TIME"); String status = line.get("JOB_STATUS"); String totalMaps = line.get("TOTAL_MAPS"); String totalReduces = line.get("TOTAL_REDUCES"); String uberized = line.get("UBERIZED"); if (launchTime != null && totalMaps != null && totalReduces != null) { return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer .parseInt(totalMaps), Integer.parseInt(totalReduces), status, Boolean.parseBoolean(uberized)); } return null; }
@Test public void testUserNamePercentEncoding() throws IOException { JobIndexInfo info = new JobIndexInfo(); JobID oldJobId = JobID.forName(JOB_ID); JobId jobId = TypeConverter.toYarn(oldJobId); info.setJobId(jobId); info.setSubmitTime(Long.parseLong(SUBMIT_TIME)); info.setUser(USER_NAME_WITH_DELIMITER); info.setJobName(JOB_NAME); info.setFinishTime(Long.parseLong(FINISH_TIME)); info.setNumMaps(Integer.parseInt(NUM_MAPS)); info.setNumReduces(Integer.parseInt(NUM_REDUCES)); info.setJobStatus(JOB_STATUS); info.setQueueName(QUEUE_NAME); info.setJobStartTime(Long.parseLong(JOB_START_TIME)); String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info); Assert.assertTrue("User name not encoded correctly into job history file", jobHistoryFile.contains(USER_NAME_WITH_DELIMITER_ESCAPE)); }
/** * Create an event to record unsuccessful completion (killed/failed) of jobs * @param id Job ID * @param finishTime Finish time of the job * @param finishedMaps Number of finished maps * @param finishedReduces Number of finished reduces * @param status Status of the job * @param diagnostics job runtime diagnostics */ public JobUnsuccessfulCompletionEvent(JobID id, long finishTime, int finishedMaps, int finishedReduces, String status, Iterable<String> diagnostics) { datum.setJobid(new Utf8(id.toString())); datum.setFinishTime(finishTime); datum.setFinishedMaps(finishedMaps); datum.setFinishedReduces(finishedReduces); datum.setJobStatus(new Utf8(status)); if (diagnostics == null) { diagnostics = NODIAGS_LIST; } datum.setDiagnostics(new Utf8(Joiner.on('\n').skipNulls() .join(diagnostics))); }
/** * Create an event to record successful job completion * @param id Job ID * @param finishTime Finish time of the job * @param finishedMaps The number of finished maps * @param finishedReduces The number of finished reduces * @param failedMaps The number of failed maps * @param failedReduces The number of failed reduces * @param mapCounters Map Counters for the job * @param reduceCounters Reduce Counters for the job * @param totalCounters Total Counters for the job */ public JobFinishedEvent(JobID id, long finishTime, int finishedMaps, int finishedReduces, int failedMaps, int failedReduces, Counters mapCounters, Counters reduceCounters, Counters totalCounters) { this.jobId = id; this.finishTime = finishTime; this.finishedMaps = finishedMaps; this.finishedReduces = finishedReduces; this.failedMaps = failedMaps; this.failedReduces = failedReduces; this.mapCounters = mapCounters; this.reduceCounters = reduceCounters; this.totalCounters = totalCounters; }
public void setDatum(Object oDatum) { this.datum = (JobFinished) oDatum; this.jobId = JobID.forName(datum.jobid.toString()); this.finishTime = datum.finishTime; this.finishedMaps = datum.finishedMaps; this.finishedReduces = datum.finishedReduces; this.failedMaps = datum.failedMaps; this.failedReduces = datum.failedReduces; this.mapCounters = EventReader.fromAvro(datum.mapCounters); this.reduceCounters = EventReader.fromAvro(datum.reduceCounters); this.totalCounters = EventReader.fromAvro(datum.totalCounters); }
/** * Extracts job id from the current hadoop version's job history file name. * @param fileName job history file name from which job id is to be extracted * @return job id if the history file name format is same as that of the * current hadoop version. Returns null otherwise. */ private static String extractJobIDFromCurrentHistoryFile(String fileName) { JobID id = null; if (org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils .isValidJobHistoryFileName(fileName)) { try { id = org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils .getJobIDFromHistoryFilePath(fileName); } catch (IOException e) { // Ignore this exception and go ahead with getting of jobID assuming // older hadoop verison's history file } } if (id != null) { return id.toString(); } return null; }
@SuppressWarnings("rawtypes") @Test public void testTipFailed() throws Exception { JobConf job = new JobConf(); job.setNumMapTasks(2); TaskStatus status = new TaskStatus() { @Override public boolean getIsMap() { return false; } @Override public void addFetchFailedMap(TaskAttemptID mapTaskId) { } }; Progress progress = new Progress(); TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE, 0, 0); ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status, reduceId, null, progress, null, null, null); JobID jobId = new JobID(); TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1); scheduler.tipFailed(taskId1); Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(), 0.0f); Assert.assertFalse(scheduler.waitUntilDone(1)); TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0); scheduler.tipFailed(taskId0); Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(), 0.0f); Assert.assertTrue(scheduler.waitUntilDone(1)); }
private int sanitizeValue(int oldVal, int defaultVal, String name, JobID id) { if (oldVal == -1) { LOG.warn(name +" not defined for "+id); return defaultVal; } return oldVal; }
@Test public void getSplits() throws Exception { S3MapReduceCpOptions options = getOptions(); Configuration configuration = new Configuration(); configuration.set("mapred.map.tasks", String.valueOf(options.getMaxMaps())); CopyListing.getCopyListing(configuration, CREDENTIALS, options).buildListing( new Path(cluster.getFileSystem().getUri().toString() + "/tmp/testDynInputFormat/fileList.seq"), options); JobContext jobContext = new JobContextImpl(configuration, new JobID()); DynamicInputFormat<Text, CopyListingFileStatus> inputFormat = new DynamicInputFormat<>(); List<InputSplit> splits = inputFormat.getSplits(jobContext); int nFiles = 0; int taskId = 0; for (InputSplit split : splits) { RecordReader<Text, CopyListingFileStatus> recordReader = inputFormat.createRecordReader(split, null); StubContext stubContext = new StubContext(jobContext.getConfiguration(), recordReader, taskId); final TaskAttemptContext taskAttemptContext = stubContext.getContext(); recordReader.initialize(splits.get(0), taskAttemptContext); float previousProgressValue = 0f; while (recordReader.nextKeyValue()) { CopyListingFileStatus fileStatus = recordReader.getCurrentValue(); String source = fileStatus.getPath().toString(); assertTrue(expectedFilePaths.contains(source)); final float progress = recordReader.getProgress(); assertTrue(progress >= previousProgressValue); assertTrue(progress >= 0.0f); assertTrue(progress <= 1.0f); previousProgressValue = progress; ++nFiles; } assertTrue(recordReader.getProgress() == 1.0f); ++taskId; } Assert.assertEquals(expectedFilePaths.size(), nFiles); }
@Override protected void serviceStart() throws Exception { scheduler= createSchedulerProxy(); JobID id = TypeConverter.fromYarn(this.applicationId); JobId jobId = TypeConverter.toYarn(id); job = context.getJob(jobId); register(); startAllocatorThread(); super.serviceStart(); }
@Test public void testCheckAccess() { // Create two unique users String user1 = System.getProperty("user.name"); String user2 = user1 + "1234"; UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser(user1); UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser(user2); // Create the job JobID jobID = JobID.forName("job_1234567890000_0001"); JobId jobId = TypeConverter.toYarn(jobID); // Setup configuration access only to user1 (owner) Configuration conf1 = new Configuration(); conf1.setBoolean(MRConfig.MR_ACLS_ENABLED, true); conf1.set(MRJobConfig.JOB_ACL_VIEW_JOB, ""); // Verify access JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null, null, null, null, true, user1, 0, null, null, null, null); Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB)); // Setup configuration access to the user1 (owner) and user2 Configuration conf2 = new Configuration(); conf2.setBoolean(MRConfig.MR_ACLS_ENABLED, true); conf2.set(MRJobConfig.JOB_ACL_VIEW_JOB, user2); // Verify access JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null, null, null, null, true, user1, 0, null, null, null, null); Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB)); // Setup configuration access with security enabled and access to all Configuration conf3 = new Configuration(); conf3.setBoolean(MRConfig.MR_ACLS_ENABLED, true); conf3.set(MRJobConfig.JOB_ACL_VIEW_JOB, "*"); // Verify access JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null, null, null, null, true, user1, 0, null, null, null, null); Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB)); // Setup configuration access without security enabled Configuration conf4 = new Configuration(); conf4.setBoolean(MRConfig.MR_ACLS_ENABLED, false); conf4.set(MRJobConfig.JOB_ACL_VIEW_JOB, ""); // Verify access JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null, null, null, null, true, user1, 0, null, null, null, null); Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB)); // Setup configuration access without security enabled Configuration conf5 = new Configuration(); conf5.setBoolean(MRConfig.MR_ACLS_ENABLED, true); conf5.set(MRJobConfig.JOB_ACL_VIEW_JOB, ""); // Verify access JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null, null, null, null, true, user1, 0, null, null, null, null); Assert.assertTrue(job5.checkAccess(ugi1, null)); Assert.assertTrue(job5.checkAccess(ugi2, null)); }
@Test(timeout = 10000) public void testJobQueueChange() throws Exception { org.apache.hadoop.mapreduce.JobID jid = new JobID("001", 1); JobQueueChangeEvent test = new JobQueueChangeEvent(jid, "newqueue"); assertEquals(test.getJobId().toString(), jid.toString()); assertEquals(test.getJobQueueName(), "newqueue"); }
@Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { addHistoryToken(ts); // Construct necessary information to start the MR AM ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager try { ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics); } return clientCache.getClient(jobId).getJobStatus(jobId); } catch (YarnException e) { throw new IOException(e); } }
@Test(timeout=20000) public void testJobKill() throws Exception { clientDelegate = mock(ClientServiceDelegate.class); when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f, State.PREP, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp")); when(clientDelegate.killJob(any(JobID.class))).thenReturn(true); doAnswer( new Answer<ClientServiceDelegate>() { @Override public ClientServiceDelegate answer(InvocationOnMock invocation) throws Throwable { return clientDelegate; } } ).when(clientCache).getClient(any(JobID.class)); yarnRunner.killJob(jobId); verify(resourceMgrDelegate).killApplication(appId); when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f, State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp")); yarnRunner.killJob(jobId); verify(clientDelegate).killJob(jobId); when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(null); when(resourceMgrDelegate.getApplicationReport(any(ApplicationId.class))) .thenReturn( ApplicationReport.newInstance(appId, null, "tmp", "tmp", "tmp", "tmp", 0, null, YarnApplicationState.FINISHED, "tmp", "tmp", 0l, 0l, FinalApplicationStatus.SUCCEEDED, null, null, 0f, "tmp", null)); yarnRunner.killJob(jobId); verify(clientDelegate).killJob(jobId); }
public static String getApplicationWebURLOnJHSWithoutScheme(Configuration conf, ApplicationId appId) throws UnknownHostException { //construct the history url for job String addr = getJHSWebappURLWithoutScheme(conf); Iterator<String> it = ADDR_SPLITTER.split(addr).iterator(); it.next(); // ignore the bind host String port = it.next(); // Use hs address to figure out the host for webapp addr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS); String host = ADDR_SPLITTER.split(addr).iterator().next(); String hsAddress = JOINER.join(host, ":", port); InetSocketAddress address = NetUtils.createSocketAddr( hsAddress, getDefaultJHSWebappPort(), getDefaultJHSWebappURLWithoutScheme()); StringBuffer sb = new StringBuffer(); if (address.getAddress().isAnyLocalAddress() || address.getAddress().isLoopbackAddress()) { sb.append(InetAddress.getLocalHost().getCanonicalHostName()); } else { sb.append(address.getHostName()); } sb.append(":").append(address.getPort()); sb.append("/jobhistory/job/"); JobID jobId = TypeConverter.fromYarn(appId); sb.append(jobId.toString()); return sb.toString(); }
@Test public void testEncodingDecodingEquivalence() throws IOException { JobIndexInfo info = new JobIndexInfo(); JobID oldJobId = JobID.forName(JOB_ID); JobId jobId = TypeConverter.toYarn(oldJobId); info.setJobId(jobId); info.setSubmitTime(Long.parseLong(SUBMIT_TIME)); info.setUser(USER_NAME); info.setJobName(JOB_NAME); info.setFinishTime(Long.parseLong(FINISH_TIME)); info.setNumMaps(Integer.parseInt(NUM_MAPS)); info.setNumReduces(Integer.parseInt(NUM_REDUCES)); info.setJobStatus(JOB_STATUS); info.setQueueName(QUEUE_NAME); info.setJobStartTime(Long.parseLong(JOB_START_TIME)); String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info); JobIndexInfo parsedInfo = FileNameIndexUtils.getIndexInfo(jobHistoryFile); Assert.assertEquals("Job id different after encoding and decoding", info.getJobId(), parsedInfo.getJobId()); Assert.assertEquals("Submit time different after encoding and decoding", info.getSubmitTime(), parsedInfo.getSubmitTime()); Assert.assertEquals("User different after encoding and decoding", info.getUser(), parsedInfo.getUser()); Assert.assertEquals("Job name different after encoding and decoding", info.getJobName(), parsedInfo.getJobName()); Assert.assertEquals("Finish time different after encoding and decoding", info.getFinishTime(), parsedInfo.getFinishTime()); Assert.assertEquals("Num maps different after encoding and decoding", info.getNumMaps(), parsedInfo.getNumMaps()); Assert.assertEquals("Num reduces different after encoding and decoding", info.getNumReduces(), parsedInfo.getNumReduces()); Assert.assertEquals("Job status different after encoding and decoding", info.getJobStatus(), parsedInfo.getJobStatus()); Assert.assertEquals("Queue name different after encoding and decoding", info.getQueueName(), parsedInfo.getQueueName()); Assert.assertEquals("Job start time different after encoding and decoding", info.getJobStartTime(), parsedInfo.getJobStartTime()); }
@Test public void testJobHistoryFileNameBackwardsCompatible() throws IOException { JobID oldJobId = JobID.forName(JOB_ID); JobId jobId = TypeConverter.toYarn(oldJobId); long submitTime = Long.parseLong(SUBMIT_TIME); long finishTime = Long.parseLong(FINISH_TIME); int numMaps = Integer.parseInt(NUM_MAPS); int numReduces = Integer.parseInt(NUM_REDUCES); String jobHistoryFile = String.format(OLD_JOB_HISTORY_FILE_FORMATTER, JOB_ID, SUBMIT_TIME, USER_NAME, JOB_NAME, FINISH_TIME, NUM_MAPS, NUM_REDUCES, JOB_STATUS); JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile); Assert.assertEquals("Job id incorrect after decoding old history file", jobId, info.getJobId()); Assert.assertEquals("Submit time incorrect after decoding old history file", submitTime, info.getSubmitTime()); Assert.assertEquals("User incorrect after decoding old history file", USER_NAME, info.getUser()); Assert.assertEquals("Job name incorrect after decoding old history file", JOB_NAME, info.getJobName()); Assert.assertEquals("Finish time incorrect after decoding old history file", finishTime, info.getFinishTime()); Assert.assertEquals("Num maps incorrect after decoding old history file", numMaps, info.getNumMaps()); Assert.assertEquals("Num reduces incorrect after decoding old history file", numReduces, info.getNumReduces()); Assert.assertEquals("Job status incorrect after decoding old history file", JOB_STATUS, info.getJobStatus()); Assert.assertNull("Queue name incorrect after decoding old history file", info.getQueueName()); }
@Test (timeout = 120000) public void testGetJobFileWithUser() { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/my/path/to/staging"); String jobFile = MRApps.getJobFile(conf, "dummy-user", new JobID("dummy-job", 12345)); assertNotNull("getJobFile results in null.", jobFile); assertEquals("jobFile with specified user is not as expected.", "/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile); }
@Override public Map<JobId, Job> getAllJobs(ApplicationId appID) { if (LOG.isDebugEnabled()) { LOG.debug("Called getAllJobs(AppId): " + appID); } // currently there is 1 to 1 mapping between app and job id org.apache.hadoop.mapreduce.JobID oldJobID = TypeConverter.fromYarn(appID); Map<JobId, Job> jobs = new HashMap<JobId, Job>(); JobId jobID = TypeConverter.toYarn(oldJobID); jobs.put(jobID, getJob(jobID)); return jobs; }
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName, HistoryEventEmitter thatg) { JobID jobID = JobID.forName(jobIDName); if (jobIDName == null) { return null; } String submitTime = line.get("SUBMIT_TIME"); String jobConf = line.get("JOBCONF"); String user = line.get("USER"); if (user == null) { user = "nulluser"; } String jobName = line.get("JOBNAME"); String jobQueueName = line.get("JOB_QUEUE");// could be null String workflowId = line.get("WORKFLOW_ID"); if (workflowId == null) { workflowId = ""; } String workflowName = line.get("WORKFLOW_NAME"); if (workflowName == null) { workflowName = ""; } String workflowNodeName = line.get("WORKFLOW_NODE_NAME"); if (workflowNodeName == null) { workflowNodeName = ""; } String workflowAdjacencies = line.get("WORKFLOW_ADJACENCIES"); if (workflowAdjacencies == null) { workflowAdjacencies = ""; } String workflowTags = line.get("WORKFLOW_TAGS"); if (workflowTags == null) { workflowTags = ""; } if (submitTime != null) { Job20LineHistoryEventEmitter that = (Job20LineHistoryEventEmitter) thatg; that.originalSubmitTime = Long.parseLong(submitTime); Map<JobACL, AccessControlList> jobACLs = new HashMap<JobACL, AccessControlList>(); return new JobSubmittedEvent(jobID, jobName, user, that.originalSubmitTime, jobConf, jobACLs, jobQueueName, workflowId, workflowName, workflowNodeName, workflowAdjacencies, workflowTags); } return null; }