@SuppressWarnings("unchecked") protected void handleJobCommit(CommitterJobCommitEvent event) { try { touchz(startCommitFile); jobCommitStarted(); waitForValidCommitWindow(); committer.commitJob(event.getJobContext()); touchz(endCommitSuccessFile); context.getEventHandler().handle( new JobCommitCompletedEvent(event.getJobID())); } catch (Exception e) { try { touchz(endCommitFailureFile); } catch (Exception e2) { LOG.error("could not create failure file.", e2); } LOG.error("Could not commit job", e); context.getEventHandler().handle( new JobCommitFailedEvent(event.getJobID(), StringUtils.stringifyException(e))); } finally { jobCommitEnded(); } }
@Test public void testBasic() throws Exception { AppContext mockContext = mock(AppContext.class); OutputCommitter mockCommitter = mock(OutputCommitter.class); Clock mockClock = mock(Clock.class); CommitterEventHandler handler = new CommitterEventHandler(mockContext, mockCommitter, new TestingRMHeartbeatHandler()); YarnConfiguration conf = new YarnConfiguration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); JobContext mockJobContext = mock(JobContext.class); ApplicationAttemptId attemptid = ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0"); JobId jobId = TypeConverter.toYarn( TypeConverter.fromYarn(attemptid.getApplicationId())); WaitForItHandler waitForItHandler = new WaitForItHandler(); when(mockContext.getApplicationID()).thenReturn(attemptid.getApplicationId()); when(mockContext.getApplicationAttemptId()).thenReturn(attemptid); when(mockContext.getEventHandler()).thenReturn(waitForItHandler); when(mockContext.getClock()).thenReturn(mockClock); handler.init(conf); handler.start(); try { handler.handle(new CommitterJobCommitEvent(jobId, mockJobContext)); String user = UserGroupInformation.getCurrentUser().getShortUserName(); Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId); Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId); Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId); Event e = waitForItHandler.getAndClearEvent(); assertNotNull(e); assertTrue(e instanceof JobCommitCompletedEvent); FileSystem fs = FileSystem.get(conf); assertTrue(startCommitFile.toString(), fs.exists(startCommitFile)); assertTrue(endCommitSuccessFile.toString(), fs.exists(endCommitSuccessFile)); assertFalse(endCommitFailureFile.toString(), fs.exists(endCommitFailureFile)); verify(mockCommitter).commitJob(any(JobContext.class)); } finally { handler.stop(); } }
@Test public void testBasic() throws Exception { AppContext mockContext = mock(AppContext.class); OutputCommitter mockCommitter = mock(OutputCommitter.class); Clock mockClock = mock(Clock.class); CommitterEventHandler handler = new CommitterEventHandler(mockContext, mockCommitter, new TestingRMHeartbeatHandler()); YarnConfiguration conf = new YarnConfiguration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); JobContext mockJobContext = mock(JobContext.class); ApplicationAttemptId attemptid = ApplicationAttemptId.fromString( "appattempt_1234567890000_0001_0"); JobId jobId = TypeConverter.toYarn( TypeConverter.fromYarn(attemptid.getApplicationId())); WaitForItHandler waitForItHandler = new WaitForItHandler(); when(mockContext.getApplicationID()).thenReturn(attemptid.getApplicationId()); when(mockContext.getApplicationAttemptId()).thenReturn(attemptid); when(mockContext.getEventHandler()).thenReturn(waitForItHandler); when(mockContext.getClock()).thenReturn(mockClock); handler.init(conf); handler.start(); try { handler.handle(new CommitterJobCommitEvent(jobId, mockJobContext)); String user = UserGroupInformation.getCurrentUser().getShortUserName(); Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId); Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId); Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId); Event e = waitForItHandler.getAndClearEvent(); assertNotNull(e); assertTrue(e instanceof JobCommitCompletedEvent); FileSystem fs = FileSystem.get(conf); assertTrue(startCommitFile.toString(), fs.exists(startCommitFile)); assertTrue(endCommitSuccessFile.toString(), fs.exists(endCommitSuccessFile)); assertFalse(endCommitFailureFile.toString(), fs.exists(endCommitFailureFile)); verify(mockCommitter).commitJob(any(JobContext.class)); } finally { handler.stop(); } }