@Override public JobsInfo getPartialJobs(Long offset, Long count, String user, String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, JobState jobState) { return getPartialJobs(getAllPartialJobs().values(), offset, count, user, queue, sBegin, sEnd, fBegin, fEnd, jobState); }
@Override public JobsInfo getPartialJobs(Long offset, Long count, String user, String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, JobState jobState) { return CachedHistoryStorage.getPartialJobs(this.partialJobs.values(), offset, count, user, queue, sBegin, sEnd, fBegin, fEnd, jobState); }
JobsInfo getPartialJobs(Long offset, Long count, String user, String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, JobState jobState);
public static JobsInfo getPartialJobs(Collection<Job> jobs, Long offset, Long count, String user, String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, JobState jobState) { JobsInfo allJobs = new JobsInfo(); if (sBegin == null || sBegin < 0) sBegin = 0l; if (sEnd == null) sEnd = Long.MAX_VALUE; if (fBegin == null || fBegin < 0) fBegin = 0l; if (fEnd == null) fEnd = Long.MAX_VALUE; if (offset == null || offset < 0) offset = 0l; if (count == null) count = Long.MAX_VALUE; if (offset > jobs.size()) { return allJobs; } long at = 0; long end = offset + count - 1; if (end < 0) { // due to overflow end = Long.MAX_VALUE; } for (Job job : jobs) { if (at > end) { break; } // can't really validate queue is a valid one since queues could change if (queue != null && !queue.isEmpty()) { if (!job.getQueueName().equals(queue)) { continue; } } if (user != null && !user.isEmpty()) { if (!job.getUserName().equals(user)) { continue; } } JobReport report = job.getReport(); if (report.getStartTime() < sBegin || report.getStartTime() > sEnd) { continue; } if (report.getFinishTime() < fBegin || report.getFinishTime() > fEnd) { continue; } if (jobState != null && jobState != report.getJobState()) { continue; } at++; if ((at - 1) < offset) { continue; } JobInfo jobInfo = new JobInfo(job); allJobs.add(jobInfo); } return allJobs; }
/** * Simple test some methods of JobHistory */ @Test(timeout = 20000) public void testJobHistoryMethods() throws Exception { LOG.info("STARTING testJobHistoryMethods"); try { Configuration configuration = new Configuration(); configuration .setClass( NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(configuration); MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(), true); app.submit(configuration); Job job = app.getContext().getAllJobs().values().iterator().next(); app.waitForState(job, JobState.SUCCEEDED); JobHistory jobHistory = new JobHistory(); jobHistory.init(configuration); // Method getAllJobs Assert.assertEquals(1, jobHistory.getAllJobs().size()); // and with ApplicationId Assert.assertEquals(1, jobHistory.getAllJobs(app.getAppID()).size()); JobsInfo jobsinfo = jobHistory.getPartialJobs(0L, 10L, null, "default", 0L, System.currentTimeMillis() + 1, 0L, System.currentTimeMillis() + 1, JobState.SUCCEEDED); Assert.assertEquals(1, jobsinfo.getJobs().size()); Assert.assertNotNull(jobHistory.getApplicationAttemptId()); // test Application Id Assert.assertEquals("application_0_0000", jobHistory.getApplicationID() .toString()); Assert .assertEquals("Job History Server", jobHistory.getApplicationName()); // method does not work Assert.assertNull(jobHistory.getEventHandler()); // method does not work Assert.assertNull(jobHistory.getClock()); // method does not work Assert.assertNull(jobHistory.getClusterInfo()); } finally { LOG.info("FINISHED testJobHistoryMethods"); } }
@Test public void testRefreshLoadedJobCacheUnSupportedOperation() { jobHistory = spy(new JobHistory()); HistoryStorage storage = new HistoryStorage() { @Override public void setHistoryFileManager(HistoryFileManager hsManager) { // TODO Auto-generated method stub } @Override public JobsInfo getPartialJobs(Long offset, Long count, String user, String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, JobState jobState) { // TODO Auto-generated method stub return null; } @Override public Job getFullJob(JobId jobId) { // TODO Auto-generated method stub return null; } @Override public Map<JobId, Job> getAllPartialJobs() { // TODO Auto-generated method stub return null; } }; doReturn(storage).when(jobHistory).createHistoryStorage(); jobHistory.init(new Configuration()); jobHistory.start(); Throwable th = null; try { jobHistory.refreshLoadedJobCache(); } catch (Exception e) { th = e; } assertTrue(th instanceof UnsupportedOperationException); }
/** * Simple test some methods of JobHistory */ @Test(timeout = 20000) public void testJobHistoryMethods() throws Exception { LOG.info("STARTING testJobHistoryMethods"); try { Configuration configuration = new Configuration(); configuration .setClass( NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(configuration); MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(), true); app.submit(configuration); Job job = app.getContext().getAllJobs().values().iterator().next(); JobId jobId = job.getID(); LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); app.waitForState(job, JobState.SUCCEEDED); JobHistory jobHistory = new JobHistory(); jobHistory.init(configuration); // Method getAllJobs Assert.assertEquals(1, jobHistory.getAllJobs().size()); // and with ApplicationId Assert.assertEquals(1, jobHistory.getAllJobs(app.getAppID()).size()); JobsInfo jobsinfo = jobHistory.getPartialJobs(0L, 10L, null, "default", 0L, System.currentTimeMillis() + 1, 0L, System.currentTimeMillis() + 1, JobState.SUCCEEDED); Assert.assertEquals(1, jobsinfo.getJobs().size()); Assert.assertNotNull(jobHistory.getApplicationAttemptId()); // test Application Id Assert.assertEquals("application_0_0000", jobHistory.getApplicationID() .toString()); Assert .assertEquals("Job History Server", jobHistory.getApplicationName()); // method does not work Assert.assertNull(jobHistory.getEventHandler()); // method does not work Assert.assertNull(jobHistory.getClock()); // method does not work Assert.assertNull(jobHistory.getClusterInfo()); } finally { LOG.info("FINISHED testJobHistoryMethods"); } }
/** * Simple test some methods of JobHistory */ @Test(timeout = 20000) public void testJobHistoryMethods() throws Exception { LOG.info("STARTING testJobHistoryMethods"); try { Configuration configuration = new Configuration(); configuration .setClass( CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(configuration); MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(), true); app.submit(configuration); Job job = app.getContext().getAllJobs().values().iterator().next(); app.waitForState(job, JobState.SUCCEEDED); JobHistory jobHistory = new JobHistory(); jobHistory.init(configuration); // Method getAllJobs Assert.assertEquals(1, jobHistory.getAllJobs().size()); // and with ApplicationId Assert.assertEquals(1, jobHistory.getAllJobs(app.getAppID()).size()); JobsInfo jobsinfo = jobHistory.getPartialJobs(0L, 10L, null, "default", 0L, System.currentTimeMillis() + 1, 0L, System.currentTimeMillis() + 1, JobState.SUCCEEDED); Assert.assertEquals(1, jobsinfo.getJobs().size()); Assert.assertNotNull(jobHistory.getApplicationAttemptId()); // test Application Id Assert.assertEquals("application_0_0000", jobHistory.getApplicationID() .toString()); Assert .assertEquals("Job History Server", jobHistory.getApplicationName()); // method does not work Assert.assertNull(jobHistory.getEventHandler()); // method does not work Assert.assertNull(jobHistory.getClock()); // method does not work Assert.assertNull(jobHistory.getClusterInfo()); } finally { LOG.info("FINISHED testJobHistoryMethods"); } }