@Override public Counters getAllCounters() { readLock.lock(); try { JobStateInternal state = getInternalState(); if (state == JobStateInternal.ERROR || state == JobStateInternal.FAILED || state == JobStateInternal.KILLED || state == JobStateInternal.SUCCEEDED) { this.mayBeConstructFinalFullCounters(); return fullCounters; } Counters counters = new Counters(); counters.incrAllCounters(jobCounters); return incrTaskCounters(counters, tasks.values()); } finally { readLock.unlock(); } }
private JobState getExternalState(JobStateInternal smState) { switch (smState) { case KILL_WAIT: case KILL_ABORT: return JobState.KILLED; case SETUP: case COMMITTING: return JobState.RUNNING; case FAIL_WAIT: case FAIL_ABORT: return JobState.FAILED; case REBOOT: if (appContext.isLastAMRetry()) { return JobState.ERROR; } else { // In case of not last retry, return the external state as RUNNING since // otherwise JobClient will exit when it polls the AM for job state return JobState.RUNNING; } default: return JobState.valueOf(smState.name()); } }
JobStateInternal finished(JobStateInternal finalState) { if (getInternalState() == JobStateInternal.RUNNING) { metrics.endRunningJob(this); } if (finishTime == 0) setFinishTime(); eventHandler.handle(new JobFinishEvent(jobId)); switch (finalState) { case KILLED: metrics.killedJob(this); break; case REBOOT: case ERROR: case FAILED: metrics.failedJob(this); break; case SUCCEEDED: metrics.completedJob(this); break; default: throw new IllegalArgumentException("Illegal job state: " + finalState); } return finalState; }
@Override public JobStateInternal transition(JobImpl job, JobEvent event) { job.completedTaskCount++; LOG.info("Num completed Tasks: " + job.completedTaskCount); JobTaskEvent taskEvent = (JobTaskEvent) event; Task task = job.tasks.get(taskEvent.getTaskID()); if (taskEvent.getState() == TaskState.SUCCEEDED) { taskSucceeded(job, task); } else if (taskEvent.getState() == TaskState.FAILED) { taskFailed(job, task); } else if (taskEvent.getState() == TaskState.KILLED) { taskKilled(job, task); } return checkJobAfterTaskCompletion(job); }
/** Create and initialize (but don't start) a single job. * @param forcedState a state to force the job into or null for normal operation. * @param diagnostic a diagnostic message to include with the job. */ protected Job createJob(Configuration conf, JobStateInternal forcedState, String diagnostic) { // create single job Job newJob = new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(), taskAttemptListener, jobTokenSecretManager, jobCredentials, clock, completedTasksFromPreviousRun, metrics, committer, newApiCommitter, currentUser.getUserName(), appSubmitTime, amInfos, context, forcedState, diagnostic); ((RunningAppContext) context).jobs.put(newJob.getID(), newJob); dispatcher.register(JobFinishEvent.Type.class, createJobFinishEventHandler()); return newJob; }
@Test public void testJobRebootOnLastRetryOnUnregistrationFailure() throws Exception { // make startCount as 2 since this is last retry which equals to // DEFAULT_MAX_AM_RETRY // The last param mocks the unregistration failure MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true, 2, false); Configuration conf = new Configuration(); Job job = app.submit(conf); app.waitForState(job, JobState.RUNNING); Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size()); Iterator<Task> it = job.getTasks().values().iterator(); Task task = it.next(); app.waitForState(task, TaskState.RUNNING); //send an reboot event app.getContext().getEventHandler().handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT)); app.waitForInternalState((JobImpl) job, JobStateInternal.REBOOT); // return exteranl state as RUNNING if this is the last retry while // unregistration fails app.waitForState(job, JobState.RUNNING); }
@Test(timeout=20000) public void testCommitJobFailsJob() throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); AsyncDispatcher dispatcher = new AsyncDispatcher(); dispatcher.init(conf); dispatcher.start(); CyclicBarrier syncBarrier = new CyclicBarrier(2); OutputCommitter committer = new TestingOutputCommitter(syncBarrier, false); CommitterEventHandler commitHandler = createCommitterEventHandler(dispatcher, committer); commitHandler.init(conf); commitHandler.start(); JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null); completeJobTasks(job); assertJobState(job, JobStateInternal.COMMITTING); // let the committer fail and verify the job fails syncBarrier.await(); assertJobState(job, JobStateInternal.FAILED); dispatcher.stop(); commitHandler.stop(); }
@Test(timeout=20000) public void testKilledDuringCommit() throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); AsyncDispatcher dispatcher = new AsyncDispatcher(); dispatcher.init(conf); dispatcher.start(); CyclicBarrier syncBarrier = new CyclicBarrier(2); OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true); CommitterEventHandler commitHandler = createCommitterEventHandler(dispatcher, committer); commitHandler.init(conf); commitHandler.start(); JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null); completeJobTasks(job); assertJobState(job, JobStateInternal.COMMITTING); syncBarrier.await(); job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL)); assertJobState(job, JobStateInternal.KILLED); dispatcher.stop(); commitHandler.stop(); }
@Test public void testTransitionsAtFailed() throws IOException { Configuration conf = new Configuration(); AsyncDispatcher dispatcher = new AsyncDispatcher(); dispatcher.init(conf); dispatcher.start(); OutputCommitter committer = mock(OutputCommitter.class); doThrow(new IOException("forcefail")) .when(committer).setupJob(any(JobContext.class)); CommitterEventHandler commitHandler = createCommitterEventHandler(dispatcher, committer); commitHandler.init(conf); commitHandler.start(); AppContext mockContext = mock(AppContext.class); when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false); JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext); JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); job.handle(new JobStartEvent(jobId)); assertJobState(job, JobStateInternal.FAILED); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED)); assertJobState(job, JobStateInternal.FAILED); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED)); assertJobState(job, JobStateInternal.FAILED); job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED)); assertJobState(job, JobStateInternal.FAILED); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)); assertJobState(job, JobStateInternal.FAILED); Assert.assertEquals(JobState.RUNNING, job.getState()); when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true); Assert.assertEquals(JobState.FAILED, job.getState()); dispatcher.stop(); commitHandler.stop(); }
public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId, Configuration conf, EventHandler eventHandler, boolean newApiCommitter, String user, int numSplits, AppContext appContext) { super(jobId, applicationAttemptId, conf, eventHandler, null, new JobTokenSecretManager(), new Credentials(), new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(), MRAppMetrics.create(), null, newApiCommitter, user, System.currentTimeMillis(), null, appContext, null, null); initTransition = getInitTransition(numSplits); localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW, EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED), JobEventType.JOB_INIT, // This is abusive. initTransition); // This "this leak" is okay because the retained pointer is in an // instance variable. localStateMachine = localFactory.make(this); }
@SuppressWarnings("rawtypes") public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId, Configuration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Clock clock, OutputCommitter committer, boolean newApiCommitter, String user, AppContext appContext, JobStateInternal forcedState, String diagnostic) { super(jobId, getApplicationAttemptId(applicationId, getStartCount()), conf, eventHandler, taskAttemptListener, new JobTokenSecretManager(), new Credentials(), clock, getCompletedTaskFromPreviousRun(), metrics, committer, newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(), appContext, forcedState, diagnostic); // This "this leak" is okay because the retained pointer is in an // instance variable. localStateMachine = localFactory.make(this); }
@Override protected Job createJob(Configuration conf, JobStateInternal forcedState, String diagnostic) { UserGroupInformation currentUser = null; try { currentUser = UserGroupInformation.getCurrentUser(); } catch (IOException e) { throw new YarnRuntimeException(e); } Job newJob = new TestJob(getJobId(), getAttemptID(), conf, getDispatcher() .getEventHandler(), getTaskAttemptListener(), getContext() .getClock(), getCommitter(), isNewApiCommitter(), currentUser.getUserName(), getContext(), forcedState, diagnostic); ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); getDispatcher().register(JobFinishEvent.Type.class, createJobFinishEventHandler()); return newJob; }
@Test public void testNotificationOnLastRetryNormalShutdown() throws Exception { HttpServer2 server = startHttpServer(); // Act like it is the second attempt. Default max attempts is 2 MRApp app = spy(new MRAppWithCustomContainerAllocator( 2, 2, true, this.getClass().getName(), true, 2, true)); doNothing().when(app).sysexit(); JobConf conf = new JobConf(); conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL, JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus"); JobImpl job = (JobImpl)app.submit(conf); app.waitForInternalState(job, JobStateInternal.SUCCEEDED); // Unregistration succeeds: successfullyUnregistered is set app.shutDownJob(); Assert.assertTrue(app.isLastAMRetry()); Assert.assertEquals(1, JobEndServlet.calledTimes); Assert.assertEquals("jobid=" + job.getID() + "&status=SUCCEEDED", JobEndServlet.requestUri.getQuery()); Assert.assertEquals(JobState.SUCCEEDED.toString(), JobEndServlet.foundJobState); server.stop(); }
@Test public void testAbsentNotificationOnNotLastRetryUnregistrationFailure() throws Exception { HttpServer2 server = startHttpServer(); MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false, this.getClass().getName(), true, 1, false)); doNothing().when(app).sysexit(); JobConf conf = new JobConf(); conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL, JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus"); JobImpl job = (JobImpl)app.submit(conf); app.waitForState(job, JobState.RUNNING); app.getContext().getEventHandler() .handle(new JobEvent(app.getJobId(), JobEventType.JOB_AM_REBOOT)); app.waitForInternalState(job, JobStateInternal.REBOOT); // Now shutdown. // Unregistration fails: isLastAMRetry is recalculated, this is not app.shutDownJob(); // Not the last AM attempt. So user should that the job is still running. app.waitForState(job, JobState.RUNNING); Assert.assertFalse(app.isLastAMRetry()); Assert.assertEquals(0, JobEndServlet.calledTimes); Assert.assertNull(JobEndServlet.requestUri); Assert.assertNull(JobEndServlet.foundJobState); server.stop(); }
@Test public void testDeletionofStaging() throws IOException { conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); fs = mock(FileSystem.class); when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); //Staging Dir exists String user = UserGroupInformation.getCurrentUser().getShortUserName(); Path stagingDir = MRApps.getStagingAreaDir(conf, user); when(fs.exists(stagingDir)).thenReturn(true); ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 0); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); JobId jobid = recordFactory.newRecordInstance(JobId.class); jobid.setAppId(appId); ContainerAllocator mockAlloc = mock(ContainerAllocator.class); Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1); MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); appMaster.init(conf); appMaster.start(); appMaster.shutDownJob(); //test whether notifyIsLastAMRetry called Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry()); verify(fs).delete(stagingJobPath, true); }
@Test (timeout = 30000) public void testNoDeletionofStagingOnReboot() throws IOException { conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); fs = mock(FileSystem.class); when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true); String user = UserGroupInformation.getCurrentUser().getShortUserName(); Path stagingDir = MRApps.getStagingAreaDir(conf, user); when(fs.exists(stagingDir)).thenReturn(true); ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 0); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); ContainerAllocator mockAlloc = mock(ContainerAllocator.class); Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1); MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, JobStateInternal.REBOOT, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); appMaster.init(conf); appMaster.start(); //shutdown the job, not the lastRetry appMaster.shutDownJob(); //test whether notifyIsLastAMRetry called Assert.assertEquals(false, ((TestMRApp)appMaster).getTestIsLastAMRetry()); verify(fs, times(0)).delete(stagingJobPath, true); }
public void testDeletionofStagingOnReboot() throws IOException { conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); fs = mock(FileSystem.class); when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true); String user = UserGroupInformation.getCurrentUser().getShortUserName(); Path stagingDir = MRApps.getStagingAreaDir(conf, user); when(fs.exists(stagingDir)).thenReturn(true); ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 0); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); ContainerAllocator mockAlloc = mock(ContainerAllocator.class); MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, JobStateInternal.REBOOT, 1); //no retry appMaster.init(conf); appMaster.start(); //shutdown the job, is lastRetry appMaster.shutDownJob(); //test whether notifyIsLastAMRetry called Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry()); verify(fs).delete(stagingJobPath, true); }
@Override protected Job createJob(Configuration conf, JobStateInternal forcedState, String diagnostic) { UserGroupInformation currentUser = null; try { currentUser = UserGroupInformation.getCurrentUser(); } catch (IOException e) { throw new YarnRuntimeException(e); } Job newJob = new TestJob(getJobId(), getAttemptID(), conf, getDispatcher().getEventHandler(), getTaskAttemptListener(), getContext().getClock(), getCommitter(), isNewApiCommitter(), currentUser.getUserName(), getContext(), forcedState, diagnostic); ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); getDispatcher().register(JobFinishEvent.Type.class, createJobFinishEventHandler()); return newJob; }
private void actOnUnusableNode(NodeId nodeId, NodeState nodeState) { // rerun previously successful map tasks // do this only if the job is still in the running state and there are // running reducers if (getInternalState() == JobStateInternal.RUNNING && !allReducersComplete()) { List<TaskAttemptId> taskAttemptIdList = nodesToSucceededTaskAttempts.get(nodeId); if (taskAttemptIdList != null) { String mesg = "TaskAttempt killed because it ran on unusable node " + nodeId; for (TaskAttemptId id : taskAttemptIdList) { if (TaskType.MAP == id.getTaskId().getTaskType()) { // reschedule only map tasks because their outputs maybe unusable LOG.info(mesg + ". AttemptId:" + id); eventHandler.handle(new TaskAttemptKillEvent(id, mesg)); } } } } // currently running task attempts on unusable nodes are handled in // RMContainerAllocator }