@Test(timeout=60000) public void testJobKillTimeout() throws Exception { long timeToWaitBeforeHardKill = 10000 + MRJobConfig.DEFAULT_MR_AM_HARD_KILL_TIMEOUT_MS; conf.setLong(MRJobConfig.MR_AM_HARD_KILL_TIMEOUT_MS, timeToWaitBeforeHardKill); clientDelegate = mock(ClientServiceDelegate.class); doAnswer( new Answer<ClientServiceDelegate>() { @Override public ClientServiceDelegate answer(InvocationOnMock invocation) throws Throwable { return clientDelegate; } } ).when(clientCache).getClient(any(JobID.class)); when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f, State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp")); long startTimeMillis = System.currentTimeMillis(); yarnRunner.killJob(jobId); assertTrue("killJob should have waited at least " + timeToWaitBeforeHardKill + " ms.", System.currentTimeMillis() - startTimeMillis >= timeToWaitBeforeHardKill); }
@Test(timeout=20000) public void testJobKill() throws Exception { clientDelegate = mock(ClientServiceDelegate.class); when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f, State.PREP, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp")); when(clientDelegate.killJob(any(JobID.class))).thenReturn(true); doAnswer( new Answer<ClientServiceDelegate>() { @Override public ClientServiceDelegate answer(InvocationOnMock invocation) throws Throwable { return clientDelegate; } } ).when(clientCache).getClient(any(JobID.class)); yarnRunner.killJob(jobId); verify(resourceMgrDelegate).killApplication(appId); when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f, State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp")); yarnRunner.killJob(jobId); verify(clientDelegate).killJob(jobId); }
@Test(timeout=20000) public void testJobKill() throws Exception { clientDelegate = mock(ClientServiceDelegate.class); when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f, State.PREP, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp")); when(clientDelegate.killJob(any(JobID.class))).thenReturn(true); doAnswer( new Answer<ClientServiceDelegate>() { @Override public ClientServiceDelegate answer(InvocationOnMock invocation) throws Throwable { return clientDelegate; } } ).when(clientCache).getClient(any(JobID.class)); yarnRunner.killJob(jobId); verify(resourceMgrDelegate).killApplication(appId); when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f, State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp")); yarnRunner.killJob(jobId); verify(clientDelegate).killJob(jobId); when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(null); when(resourceMgrDelegate.getApplicationReport(any(ApplicationId.class))) .thenReturn( ApplicationReport.newInstance(appId, null, "tmp", "tmp", "tmp", "tmp", 0, null, YarnApplicationState.FINISHED, "tmp", "tmp", 0l, 0l, FinalApplicationStatus.SUCCEEDED, null, null, 0f, "tmp", null)); yarnRunner.killJob(jobId); verify(clientDelegate).killJob(jobId); }
private String getJobPriorityNames() { StringBuffer sb = new StringBuffer(); for (JobPriority p : JobPriority.values()) { sb.append(p.name()).append(" "); } return sb.substring(0, sb.length()-1); }
private void waitForPriorityToUpdate(Job job, JobPriority expectedStatus) throws IOException, InterruptedException { // Max wait time to get the priority update can be kept as 20sec (100 * // 100ms) int waitCnt = 200; while (waitCnt-- > 0) { if (job.getPriority().equals(expectedStatus)) { // Stop waiting as priority is updated. break; } else { Thread.sleep(100); } } }
private String getJobPriorityNames() { StringBuffer sb = new StringBuffer(); for (JobPriority p : JobPriority.values()) { // UNDEFINED_PRIORITY need not to be displayed in usage if (JobPriority.UNDEFINED_PRIORITY == p) { continue; } sb.append(p.name()).append(" "); } return sb.substring(0, sb.length()-1); }
@Override public JobStatus submitJob( JobID jobId, String jobSubmitDir, Credentials ts) throws IOException { JobStatus status = new JobStatus(jobId, 0.0f, 0.0f, 0.0f, 0.0f, JobStatus.State.RUNNING, JobPriority.NORMAL, "", "", "", ""); return status; }
@Test public void testShowJob() throws Exception { TestJobClient client = new TestJobClient(new JobConf()); long startTime = System.currentTimeMillis(); JobID jobID = new JobID(String.valueOf(startTime), 12345); JobStatus mockJobStatus = mock(JobStatus.class); when(mockJobStatus.getJobID()).thenReturn(jobID); when(mockJobStatus.getJobName()).thenReturn(jobID.toString()); when(mockJobStatus.getState()).thenReturn(JobStatus.State.RUNNING); when(mockJobStatus.getStartTime()).thenReturn(startTime); when(mockJobStatus.getUsername()).thenReturn("mockuser"); when(mockJobStatus.getQueue()).thenReturn("mockqueue"); when(mockJobStatus.getPriority()).thenReturn(JobPriority.NORMAL); when(mockJobStatus.getNumUsedSlots()).thenReturn(1); when(mockJobStatus.getNumReservedSlots()).thenReturn(1); when(mockJobStatus.getUsedMem()).thenReturn(1024); when(mockJobStatus.getReservedMem()).thenReturn(512); when(mockJobStatus.getNeededMem()).thenReturn(2048); when(mockJobStatus.getSchedulingInfo()).thenReturn("NA"); Job mockJob = mock(Job.class); when(mockJob.getTaskReports(isA(TaskType.class))).thenReturn( new TaskReport[5]); Cluster mockCluster = mock(Cluster.class); when(mockCluster.getJob(jobID)).thenReturn(mockJob); client.setCluster(mockCluster); ByteArrayOutputStream out = new ByteArrayOutputStream(); client.displayJobList(new JobStatus[] {mockJobStatus}, new PrintWriter(out)); String commandLineOutput = out.toString(); System.out.println(commandLineOutput); Assert.assertTrue(commandLineOutput.contains("Total jobs:1")); verify(mockJobStatus, atLeastOnce()).getJobID(); verify(mockJobStatus).getState(); verify(mockJobStatus).getStartTime(); verify(mockJobStatus).getUsername(); verify(mockJobStatus).getQueue(); verify(mockJobStatus).getPriority(); verify(mockJobStatus).getNumUsedSlots(); verify(mockJobStatus).getNumReservedSlots(); verify(mockJobStatus).getUsedMem(); verify(mockJobStatus).getReservedMem(); verify(mockJobStatus).getNeededMem(); verify(mockJobStatus).getSchedulingInfo(); // This call should not go to each AM. verify(mockCluster, never()).getJob(jobID); verify(mockJob, never()).getTaskReports(isA(TaskType.class)); }
@Test(timeout = 3000000) public void testJobWithChangePriority() throws Exception { if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test."); return; } Configuration sleepConf = new Configuration(mrCluster.getConfig()); // set master address to local to test that local mode applied if framework // equals local sleepConf.set(MRConfig.MASTER_ADDRESS, "local"); sleepConf .setInt("yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms", 5); SleepJob sleepJob = new SleepJob(); sleepJob.setConf(sleepConf); Job job = sleepJob.createJob(1, 1, 1000, 20, 50, 1); job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. job.setJarByClass(SleepJob.class); job.setMaxMapAttempts(1); // speed up failures job.submit(); // Set the priority to HIGH job.setPriority(JobPriority.HIGH); waitForPriorityToUpdate(job, JobPriority.HIGH); // Verify the priority from job itself Assert.assertEquals(job.getPriority(), JobPriority.HIGH); // Change priority to NORMAL (3) with new api job.setPriorityAsInteger(3); // Verify the priority from job itself waitForPriorityToUpdate(job, JobPriority.NORMAL); Assert.assertEquals(job.getPriority(), JobPriority.NORMAL); // Change priority to a high integer value with new api job.setPriorityAsInteger(89); // Verify the priority from job itself waitForPriorityToUpdate(job, JobPriority.UNDEFINED_PRIORITY); Assert.assertEquals(job.getPriority(), JobPriority.UNDEFINED_PRIORITY); boolean succeeded = job.waitForCompletion(true); Assert.assertTrue(succeeded); Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); }
@Test public void testShowJob() throws Exception { TestJobClient client = new TestJobClient(new JobConf()); long startTime = System.currentTimeMillis(); JobID jobID = new JobID(String.valueOf(startTime), 12345); JobStatus mockJobStatus = mock(JobStatus.class); when(mockJobStatus.getJobID()).thenReturn(jobID); when(mockJobStatus.getState()).thenReturn(JobStatus.State.RUNNING); when(mockJobStatus.getStartTime()).thenReturn(startTime); when(mockJobStatus.getUsername()).thenReturn("mockuser"); when(mockJobStatus.getQueue()).thenReturn("mockqueue"); when(mockJobStatus.getPriority()).thenReturn(JobPriority.NORMAL); when(mockJobStatus.getNumUsedSlots()).thenReturn(1); when(mockJobStatus.getNumReservedSlots()).thenReturn(1); when(mockJobStatus.getUsedMem()).thenReturn(1024); when(mockJobStatus.getReservedMem()).thenReturn(512); when(mockJobStatus.getNeededMem()).thenReturn(2048); when(mockJobStatus.getSchedulingInfo()).thenReturn("NA"); Job mockJob = mock(Job.class); when(mockJob.getTaskReports(isA(TaskType.class))).thenReturn( new TaskReport[5]); Cluster mockCluster = mock(Cluster.class); when(mockCluster.getJob(jobID)).thenReturn(mockJob); client.setCluster(mockCluster); ByteArrayOutputStream out = new ByteArrayOutputStream(); client.displayJobList(new JobStatus[] {mockJobStatus}, new PrintWriter(out)); String commandLineOutput = out.toString(); System.out.println(commandLineOutput); Assert.assertTrue(commandLineOutput.contains("Total jobs:1")); verify(mockJobStatus, atLeastOnce()).getJobID(); verify(mockJobStatus).getState(); verify(mockJobStatus).getStartTime(); verify(mockJobStatus).getUsername(); verify(mockJobStatus).getQueue(); verify(mockJobStatus).getPriority(); verify(mockJobStatus).getNumUsedSlots(); verify(mockJobStatus).getNumReservedSlots(); verify(mockJobStatus).getUsedMem(); verify(mockJobStatus).getReservedMem(); verify(mockJobStatus).getNeededMem(); verify(mockJobStatus).getSchedulingInfo(); // This call should not go to each AM. verify(mockCluster, never()).getJob(jobID); verify(mockJob, never()).getTaskReports(isA(TaskType.class)); }
@Override public JobStatus submitJob(JobID jobId) throws IOException { JobStatus status = new JobStatus(jobId, 0.0f, 0.0f, 0.0f, 0.0f, JobStatus.State.RUNNING, JobPriority.NORMAL, "", "", "", ""); return status; }
SubmittedJob(JobID jobID, String jobSubmitDirectory, Credentials credentials, Configuration configuration) throws IOException, InterruptedException { this.jobID = jobID; this.configuration = configuration; this.jobSubmitDirectoryPath = new Path(jobSubmitDirectory); this.fileSystem = FileSystem.get(configuration); JobSplit.TaskSplitMetaInfo splitInfo[] = SplitMetaInfoReader.readSplitMetaInfo(jobID, fileSystem, configuration, jobSubmitDirectoryPath); Path jobSplitFile = JobSubmissionFiles.getJobSplitFile(jobSubmitDirectoryPath); FSDataInputStream stream = fileSystem.open(jobSplitFile); for (JobSplit.TaskSplitMetaInfo info : splitInfo) { Object split = getSplitDetails(stream, info.getStartOffset(), configuration); inputSplits.add(split); splitLocations.put(split, info.getLocations()); LOG.info("Adding split for execution. Split = " + split + " Locations: " + Arrays.toString(splitLocations.get(split))); } stream.close(); jobConfPath = JobSubmissionFiles.getJobConfPath(jobSubmitDirectoryPath); if (!fileSystem.exists(jobConfPath)) { throw new IOException("Cannot find job.xml. Path = " + jobConfPath); } //We cannot just use JobConf(Path) constructor, //because it does not work for HDFS locations. //The comment in Configuration#loadResource() states, //for the case when the Path to the resource is provided: //"Can't use FileSystem API or we get an infinite loop //since FileSystem uses Configuration API. Use java.io.File instead." // //Workaround: construct empty Configuration, provide it with //input stream and give it to JobConf constructor. FSDataInputStream jobConfStream = fileSystem.open(jobConfPath); Configuration jobXML = new Configuration(false); jobXML.addResource(jobConfStream); //The configuration does not actually gets read before we attempt to //read some property. Call to #size() will make Configuration to //read the input stream. jobXML.size(); //We are done with input stream, can close it now. jobConfStream.close(); jobConf = new JobConf(jobXML); newApi = jobConf.getUseNewMapper(); jobStatus = new JobStatus(jobID, 0f, 0f, 0f, 0f, JobStatus.State.RUNNING, JobPriority.NORMAL, UserGroupInformation.getCurrentUser().getUserName(), jobID.toString(), jobConfPath.toString(), ""); }