/** Create and initialize (but don't start) a single job. * @param forcedState a state to force the job into or null for normal operation. * @param diagnostic a diagnostic message to include with the job. */ protected Job createJob(Configuration conf, JobStateInternal forcedState, String diagnostic) { // create single job Job newJob = new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(), taskAttemptListener, jobTokenSecretManager, jobCredentials, clock, completedTasksFromPreviousRun, metrics, committer, newApiCommitter, currentUser.getUserName(), appSubmitTime, amInfos, context, forcedState, diagnostic); ((RunningAppContext) context).jobs.put(newJob.getID(), newJob); dispatcher.register(JobFinishEvent.Type.class, createJobFinishEventHandler()); return newJob; }
@Test public void testJobRebootOnLastRetryOnUnregistrationFailure() throws Exception { // make startCount as 2 since this is last retry which equals to // DEFAULT_MAX_AM_RETRY // The last param mocks the unregistration failure MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true, 2, false); Configuration conf = new Configuration(); Job job = app.submit(conf); app.waitForState(job, JobState.RUNNING); Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size()); Iterator<Task> it = job.getTasks().values().iterator(); Task task = it.next(); app.waitForState(task, TaskState.RUNNING); //send an reboot event app.getContext().getEventHandler().handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT)); app.waitForInternalState((JobImpl) job, JobStateInternal.REBOOT); // return exteranl state as RUNNING if this is the last retry while // unregistration fails app.waitForState(job, JobState.RUNNING); }
@Test(timeout = 20000) public void testComponentStopOrder() throws Exception { @SuppressWarnings("resource") TestMRApp app = new TestMRApp(1, 1, true, this.getClass().getName(), true); JobImpl job = (JobImpl) app.submit(new Configuration()); app.waitForState(job, JobState.SUCCEEDED); app.verifyCompleted(); int waitTime = 20 * 1000; while (waitTime > 0 && app.numStops < 2) { Thread.sleep(100); waitTime -= 100; } // assert JobHistoryEventHandlerStopped and then clientServiceStopped Assert.assertEquals(1, app.JobHistoryEventHandlerStopped); Assert.assertEquals(2, app.clientServiceStopped); }
@Test public void testNotificationOnLastRetryNormalShutdown() throws Exception { HttpServer2 server = startHttpServer(); // Act like it is the second attempt. Default max attempts is 2 MRApp app = spy(new MRAppWithCustomContainerAllocator( 2, 2, true, this.getClass().getName(), true, 2, true)); doNothing().when(app).sysexit(); JobConf conf = new JobConf(); conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL, JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus"); JobImpl job = (JobImpl)app.submit(conf); app.waitForInternalState(job, JobStateInternal.SUCCEEDED); // Unregistration succeeds: successfullyUnregistered is set app.shutDownJob(); Assert.assertTrue(app.isLastAMRetry()); Assert.assertEquals(1, JobEndServlet.calledTimes); Assert.assertEquals("jobid=" + job.getID() + "&status=SUCCEEDED", JobEndServlet.requestUri.getQuery()); Assert.assertEquals(JobState.SUCCEEDED.toString(), JobEndServlet.foundJobState); server.stop(); }
@Test public void testAbsentNotificationOnNotLastRetryUnregistrationFailure() throws Exception { HttpServer2 server = startHttpServer(); MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false, this.getClass().getName(), true, 1, false)); doNothing().when(app).sysexit(); JobConf conf = new JobConf(); conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL, JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus"); JobImpl job = (JobImpl)app.submit(conf); app.waitForState(job, JobState.RUNNING); app.getContext().getEventHandler() .handle(new JobEvent(app.getJobId(), JobEventType.JOB_AM_REBOOT)); app.waitForInternalState(job, JobStateInternal.REBOOT); // Now shutdown. // Unregistration fails: isLastAMRetry is recalculated, this is not app.shutDownJob(); // Not the last AM attempt. So user should that the job is still running. app.waitForState(job, JobState.RUNNING); Assert.assertFalse(app.isLastAMRetry()); Assert.assertEquals(0, JobEndServlet.calledTimes); Assert.assertNull(JobEndServlet.requestUri); Assert.assertNull(JobEndServlet.foundJobState); server.stop(); }
@Test(timeout=20000) public void testStagingCleanupOrder() throws Exception { MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true, this.getClass().getName(), true); JobImpl job = (JobImpl)app.submit(new Configuration()); app.waitForState(job, JobState.SUCCEEDED); app.verifyCompleted(); int waitTime = 20 * 1000; while (waitTime > 0 && app.numStops < 2) { Thread.sleep(100); waitTime -= 100; } // assert ContainerAllocatorStopped and then tagingDirCleanedup Assert.assertEquals(1, app.ContainerAllocatorStopped); Assert.assertEquals(2, app.stagingDirCleanedup); }
@Test(timeout=20000) public void testStagingCleanupOrder() throws Exception { MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true, this.getClass().getName(), true); JobImpl job = (JobImpl)app.submit(new Configuration()); app.waitForState(job, JobState.SUCCEEDED); app.verifyCompleted(); int waitTime = 20 * 1000; while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) { Thread.sleep(100); waitTime -= 100; } Assert.assertTrue("Staging directory not cleaned before notifying RM", app.cleanedBeforeContainerAllocatorStopped); }