@Override public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo) throws IOException { diagnosticInfo = StringInterner.weakIntern(diagnosticInfo); LOG.info("Diagnostics report from " + taskAttemptID.toString() + ": " + diagnosticInfo); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); taskHeartbeatHandler.progressing(attemptID); // This is mainly used for cases where we want to propagate exception traces // of tasks that fail. // This call exists as a hadoop mapreduce legacy wherein all changes in // counters/progress/phase/output-size are reported through statusUpdate() // call but not diagnosticInformation. context.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(attemptID, diagnosticInfo)); }
@SuppressWarnings("unchecked") @Override public KillTaskAttemptResponse killTaskAttempt( KillTaskAttemptRequest request) throws IOException { TaskAttemptId taskAttemptId = request.getTaskAttemptId(); UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser(); String message = "Kill task attempt " + taskAttemptId + " received from " + callerUGI + " at " + Server.getRemoteAddress(); LOG.info(message); verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB); appContext.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); appContext.getEventHandler().handle( new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_KILL)); KillTaskAttemptResponse response = recordFactory.newRecordInstance(KillTaskAttemptResponse.class); return response; }
@SuppressWarnings("unchecked") @Override public FailTaskAttemptResponse failTaskAttempt( FailTaskAttemptRequest request) throws IOException { TaskAttemptId taskAttemptId = request.getTaskAttemptId(); UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser(); String message = "Fail task attempt " + taskAttemptId + " received from " + callerUGI + " at " + Server.getRemoteAddress(); LOG.info(message); verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB); appContext.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); appContext.getEventHandler().handle( new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILMSG)); FailTaskAttemptResponse response = recordFactory. newRecordInstance(FailTaskAttemptResponse.class); return response; }
@SuppressWarnings("unchecked") @Override public FailTaskAttemptResponse failTaskAttempt( FailTaskAttemptRequest request) throws IOException { TaskAttemptId taskAttemptId = request.getTaskAttemptId(); UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser(); String message = "Fail task attempt " + taskAttemptId + " received from " + callerUGI + " at " + Server.getRemoteAddress(); LOG.info(message); verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB); appContext.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); appContext.getEventHandler().handle( new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILMSG_BY_CLIENT)); FailTaskAttemptResponse response = recordFactory. newRecordInstance(FailTaskAttemptResponse.class); return response; }
@Test public void testTaskAttemptDiagnosticEventOnFinishing() throws Exception { MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); taImpl.handle(new TaskAttemptEvent(taImpl.getID(), TaskAttemptEventType.TA_DONE)); assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), TaskAttemptState.SUCCEEDED); assertEquals("Task attempt's internal state is not " + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); // TA_DIAGNOSTICS_UPDATE doesn't change state taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(taImpl.getID(), "Task got updated")); assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), TaskAttemptState.SUCCEEDED); assertEquals("Task attempt's internal state is not " + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); assertFalse("InternalError occurred", eventHandler.internalError); }
@SuppressWarnings("unchecked") @Override public KillTaskAttemptResponse killTaskAttempt( KillTaskAttemptRequest request) throws IOException { TaskAttemptId taskAttemptId = request.getTaskAttemptId(); String message = "Kill task attempt received from client " + taskAttemptId; LOG.info(message); verifyAndGetAttempt(taskAttemptId, true); appContext.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); appContext.getEventHandler().handle( new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_KILL)); KillTaskAttemptResponse response = recordFactory.newRecordInstance(KillTaskAttemptResponse.class); return response; }
@SuppressWarnings("unchecked") @Override public FailTaskAttemptResponse failTaskAttempt( FailTaskAttemptRequest request) throws IOException { TaskAttemptId taskAttemptId = request.getTaskAttemptId(); String message = "Fail task attempt received from client " + taskAttemptId; LOG.info(message); verifyAndGetAttempt(taskAttemptId, true); appContext.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); appContext.getEventHandler().handle( new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILMSG)); FailTaskAttemptResponse response = recordFactory. newRecordInstance(FailTaskAttemptResponse.class); return response; }
@SuppressWarnings("unchecked") @VisibleForTesting void processFinishedContainer(ContainerStatus container) { LOG.info("Received completed container " + container.getContainerId()); TaskAttemptId attemptID = assignedRequests.get(container.getContainerId()); if (attemptID == null) { LOG.error("Container complete event for unknown container " + container.getContainerId()); } else { pendingRelease.remove(container.getContainerId()); assignedRequests.remove(attemptID); // Send the diagnostics String diagnostic = StringInterner.weakIntern(container.getDiagnostics()); eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID, diagnostic)); // send the container completed event to Task attempt eventHandler.handle(createContainerFinishedEvent(container, attemptID)); } }
/** * MAPREDUCE-6771. Test if RMContainerAllocator generates the events in the * right order while processing finished containers. */ @Test public void testHandlingFinishedContainers() { EventHandler eventHandler = mock(EventHandler.class); AppContext context = mock(MRAppMaster.RunningAppContext.class); when(context.getClock()).thenReturn(new ControlledClock()); when(context.getClusterInfo()).thenReturn( new ClusterInfo(Resource.newInstance(10240, 1))); when(context.getEventHandler()).thenReturn(eventHandler); RMContainerAllocator containerAllocator = new RMContainerAllocatorForFinishedContainer(null, context); ContainerStatus finishedContainer = ContainerStatus.newInstance( mock(ContainerId.class), ContainerState.COMPLETE, "", 0); containerAllocator.processFinishedContainer(finishedContainer); InOrder inOrder = inOrder(eventHandler); inOrder.verify(eventHandler).handle( isA(TaskAttemptDiagnosticsUpdateEvent.class)); inOrder.verify(eventHandler).handle(isA(TaskAttemptEvent.class)); inOrder.verifyNoMoreInteractions(); }
@Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { TaskAttemptDiagnosticsUpdateEvent diagEvent = (TaskAttemptDiagnosticsUpdateEvent) event; LOG.info("Diagnostics report from " + taskAttempt.attemptId + ": " + diagEvent.getDiagnosticInfo()); taskAttempt.addDiagnosticInfo(diagEvent.getDiagnosticInfo()); }
@SuppressWarnings("unchecked") void sendContainerLaunchFailedMsg(TaskAttemptId taskAttemptID, String message) { LOG.error(message); context.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message)); context.getEventHandler().handle( new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)); }
@Override public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator = runningAttempts.entrySet().iterator(); // avoid calculating current time everytime in loop long currentTime = clock.getTime(); while (iterator.hasNext()) { Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next(); boolean taskTimedOut = (taskTimeOut > 0) && (currentTime > (entry.getValue().getLastProgress() + taskTimeOut)); if(taskTimedOut) { // task is lost, remove from the list and raise lost event iterator.remove(); eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry .getKey(), "AttemptID:" + entry.getKey().toString() + " Timed out after " + taskTimeOut / 1000 + " secs")); eventHandler.handle(new TaskAttemptEvent(entry.getKey(), TaskAttemptEventType.TA_TIMED_OUT)); } } try { Thread.sleep(taskTimeOutCheckInterval); } catch (InterruptedException e) { LOG.info("TaskHeartbeatHandler thread interrupted"); break; } } }
@Override protected void attemptLaunched(TaskAttemptId attemptID) { getContext().getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(attemptID, "Test Diagnostic Event")); getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); }