protected void unregister() { try { doUnregistration(); } catch(Exception are) { LOG.error("Exception while unregistering ", are); // if unregistration failed, isLastAMRetry needs to be recalculated // to see whether AM really has the chance to retry RunningAppContext raContext = (RunningAppContext) context; raContext.resetIsLastAMRetry(); } }
@SuppressWarnings("resource") private void testDeletionofStagingOnUnregistrationFailure( int maxAttempts, boolean shouldHaveDeleted) throws IOException { conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); fs = mock(FileSystem.class); when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); //Staging Dir exists String user = UserGroupInformation.getCurrentUser().getShortUserName(); Path stagingDir = MRApps.getStagingAreaDir(conf, user); when(fs.exists(stagingDir)).thenReturn(true); ApplicationId appId = ApplicationId.newInstance(0, 1); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); JobId jobid = recordFactory.newRecordInstance(JobId.class); jobid.setAppId(appId); TestMRApp appMaster = new TestMRApp(attemptId, null, JobStateInternal.RUNNING, maxAttempts); appMaster.crushUnregistration = true; appMaster.init(conf); appMaster.start(); appMaster.shutDownJob(); ((RunningAppContext) appMaster.getContext()).resetIsLastAMRetry(); if (shouldHaveDeleted) { Assert.assertEquals(new Boolean(true), appMaster.isLastAMRetry()); verify(fs).delete(stagingJobPath, true); } else { Assert.assertEquals(new Boolean(false), appMaster.isLastAMRetry()); verify(fs, never()).delete(stagingJobPath, true); } }
@Before @SuppressWarnings("rawtypes") // mocked generics public void setup() { ApplicationId appId = ApplicationId.newInstance(200, 1); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); jid = MRBuilderUtils.newJobId(appId, 1); mActxt = mock(RunningAppContext.class); EventHandler ea = mock(EventHandler.class); when(mActxt.getEventHandler()).thenReturn(ea); for (int i = 0; i < 40; ++i) { ContainerId cId = ContainerId.newContainerId(appAttemptId, i); if (0 == i % 7) { preemptedContainers.add(cId); } TaskId tId = 0 == i % 2 ? MRBuilderUtils.newTaskId(jid, i / 2, TaskType.MAP) : MRBuilderUtils.newTaskId(jid, i / 2 + 1, TaskType.REDUCE); assignedContainers.put(cId, MRBuilderUtils.newTaskAttemptId(tId, 0)); contToResourceMap.put(cId, Resource.newInstance(2 * minAlloc, 2)); } for (Map.Entry<ContainerId,TaskAttemptId> ent : assignedContainers.entrySet()) { System.out.println("cont:" + ent.getKey().getContainerId() + " type:" + ent.getValue().getTaskId().getTaskType() + " res:" + contToResourceMap.get(ent.getKey()).getMemory() + "MB" ); } }
protected void unregister() { try { doUnregistration(); } catch(Exception are) { LOG.error("Exception while unregistering ", are); // if unregistration failed, isLastAMRetry needs to be recalculated // to see whether AM really has the chance to retry RunningAppContext raContext = (RunningAppContext) context; raContext.computeIsLastAMRetry(); } }
@SuppressWarnings("resource") private void testDeletionofStagingOnUnregistrationFailure( int maxAttempts, boolean shouldHaveDeleted) throws IOException { conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); fs = mock(FileSystem.class); when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); //Staging Dir exists String user = UserGroupInformation.getCurrentUser().getShortUserName(); Path stagingDir = MRApps.getStagingAreaDir(conf, user); when(fs.exists(stagingDir)).thenReturn(true); ApplicationId appId = ApplicationId.newInstance(0, 1); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); JobId jobid = recordFactory.newRecordInstance(JobId.class); jobid.setAppId(appId); TestMRApp appMaster = new TestMRApp(attemptId, null, JobStateInternal.RUNNING, maxAttempts); appMaster.crushUnregistration = true; appMaster.init(conf); appMaster.start(); appMaster.shutDownJob(); ((RunningAppContext) appMaster.getContext()).computeIsLastAMRetry(); if (shouldHaveDeleted) { Assert.assertEquals(new Boolean(true), appMaster.isLastAMRetry()); verify(fs).delete(stagingJobPath, true); } else { Assert.assertEquals(new Boolean(false), appMaster.isLastAMRetry()); verify(fs, never()).delete(stagingJobPath, true); } }
@VisibleForTesting protected void doUnregistration() throws YarnException, IOException, InterruptedException { FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED; JobImpl jobImpl = (JobImpl)job; if (jobImpl.getInternalState() == JobStateInternal.SUCCEEDED) { finishState = FinalApplicationStatus.SUCCEEDED; } else if (jobImpl.getInternalState() == JobStateInternal.KILLED || (jobImpl.getInternalState() == JobStateInternal.RUNNING && isSignalled)) { finishState = FinalApplicationStatus.KILLED; } else if (jobImpl.getInternalState() == JobStateInternal.FAILED || jobImpl.getInternalState() == JobStateInternal.ERROR) { finishState = FinalApplicationStatus.FAILED; } StringBuffer sb = new StringBuffer(); for (String s : job.getDiagnostics()) { sb.append(s).append("\n"); } LOG.info("Setting job diagnostics to " + sb.toString()); String historyUrl = MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(getConfig(), context.getApplicationID()); LOG.info("History url is " + historyUrl); FinishApplicationMasterRequest request = FinishApplicationMasterRequest.newInstance(finishState, sb.toString(), historyUrl); try { while (true) { FinishApplicationMasterResponse response = scheduler.finishApplicationMaster(request); if (response.getIsUnregistered()) { // When excepting ClientService, other services are already stopped, // it is safe to let clients know the final states. ClientService // should wait for some time so clients have enough time to know the // final states. RunningAppContext raContext = (RunningAppContext) context; raContext.markSuccessfulUnregistration(); break; } LOG.info("Waiting for application to be successfully unregistered."); Thread.sleep(rmPollInterval); } } catch (ApplicationMasterNotRegisteredException e) { // RM might have restarted or failed over and so lost the fact that AM had // registered before. register(); doUnregistration(); } }
@SuppressWarnings("unchecked") @Test public void testKillAMPreemptPolicy() { ApplicationId appId = ApplicationId.newInstance(123456789, 1); ContainerId container = ContainerId.newContainerId( ApplicationAttemptId.newInstance(appId, 1), 1); AMPreemptionPolicy.Context mPctxt = mock(AMPreemptionPolicy.Context.class); when(mPctxt.getTaskAttempt(any(ContainerId.class))).thenReturn( MRBuilderUtils.newTaskAttemptId(MRBuilderUtils.newTaskId( MRBuilderUtils.newJobId(appId, 1), 1, TaskType.MAP), 0)); List<Container> p = new ArrayList<Container>(); p.add(Container.newInstance(container, null, null, null, null, null)); when(mPctxt.getContainers(any(TaskType.class))).thenReturn(p); KillAMPreemptionPolicy policy = new KillAMPreemptionPolicy(); // strictContract is null & contract is null RunningAppContext mActxt = getRunningAppContext(); policy.init(mActxt); PreemptionMessage pM = getPreemptionMessage(false, false, container); policy.preempt(mPctxt, pM); verify(mActxt.getEventHandler(), times(0)).handle( any(TaskAttemptEvent.class)); verify(mActxt.getEventHandler(), times(0)).handle( any(JobCounterUpdateEvent.class)); // strictContract is not null & contract is null mActxt = getRunningAppContext(); policy.init(mActxt); pM = getPreemptionMessage(true, false, container); policy.preempt(mPctxt, pM); verify(mActxt.getEventHandler(), times(2)).handle( any(TaskAttemptEvent.class)); verify(mActxt.getEventHandler(), times(2)).handle( any(JobCounterUpdateEvent.class)); // strictContract is null & contract is not null mActxt = getRunningAppContext(); policy.init(mActxt); pM = getPreemptionMessage(false, true, container); policy.preempt(mPctxt, pM); verify(mActxt.getEventHandler(), times(2)).handle( any(TaskAttemptEvent.class)); verify(mActxt.getEventHandler(), times(2)).handle( any(JobCounterUpdateEvent.class)); // strictContract is not null & contract is not null mActxt = getRunningAppContext(); policy.init(mActxt); pM = getPreemptionMessage(true, true, container); policy.preempt(mPctxt, pM); verify(mActxt.getEventHandler(), times(4)).handle( any(TaskAttemptEvent.class)); verify(mActxt.getEventHandler(), times(4)).handle( any(JobCounterUpdateEvent.class)); }
private RunningAppContext getRunningAppContext() { RunningAppContext mActxt = mock(RunningAppContext.class); EventHandler<?> eventHandler = mock(EventHandler.class); when(mActxt.getEventHandler()).thenReturn(eventHandler); return mActxt; }
@VisibleForTesting protected void doUnregistration() throws YarnException, IOException, InterruptedException { FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED; JobImpl jobImpl = (JobImpl)job; if (jobImpl.getInternalState() == JobStateInternal.SUCCEEDED) { finishState = FinalApplicationStatus.SUCCEEDED; } else if (jobImpl.getInternalState() == JobStateInternal.KILLED || (jobImpl.getInternalState() == JobStateInternal.RUNNING && isSignalled)) { finishState = FinalApplicationStatus.KILLED; } else if (jobImpl.getInternalState() == JobStateInternal.FAILED || jobImpl.getInternalState() == JobStateInternal.ERROR) { finishState = FinalApplicationStatus.FAILED; } StringBuffer sb = new StringBuffer(); for (String s : job.getDiagnostics()) { sb.append(s).append("\n"); } LOG.info("Setting job diagnostics to " + sb.toString()); String historyUrl = MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(getConfig(), context.getApplicationID()); LOG.info("History url is " + historyUrl); FinishApplicationMasterRequest request = FinishApplicationMasterRequest.newInstance(finishState, sb.toString(), historyUrl); while (true) { FinishApplicationMasterResponse response = scheduler.finishApplicationMaster(request); if (response.getIsUnregistered()) { // When excepting ClientService, other services are already stopped, // it is safe to let clients know the final states. ClientService // should wait for some time so clients have enough time to know the // final states. RunningAppContext raContext = (RunningAppContext) context; raContext.markSuccessfulUnregistration(); break; } LOG.info("Waiting for application to be successfully unregistered."); Thread.sleep(rmPollInterval); } }