private static StubbedJob createStubbedJob(Configuration conf, Dispatcher dispatcher, int numSplits, AppContext appContext) { JobID jobID = JobID.forName("job_1234567890000_0001"); JobId jobId = TypeConverter.toYarn(jobID); if (appContext == null) { appContext = mock(AppContext.class); when(appContext.hasSuccessfullyUnregistered()).thenReturn(true); } StubbedJob job = new StubbedJob(jobId, ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0), conf,dispatcher.getEventHandler(), true, "somebody", numSplits, appContext); dispatcher.register(JobEventType.class, job); EventHandler mockHandler = mock(EventHandler.class); dispatcher.register(TaskEventType.class, mockHandler); dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class, mockHandler); dispatcher.register(JobFinishEvent.Type.class, mockHandler); return job; }
@Override public void handle(ContainerAllocatorEvent event) { ContainerId cId = ContainerId.newContainerId(getContext().getApplicationAttemptId(), containerCount++); NodeId nodeId = NodeId.newInstance(NM_HOST, NM_PORT); Resource resource = Resource.newInstance(1234, 2, 2); ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier(cId, nodeId.toString(), "user", resource, System.currentTimeMillis() + 10000, 42, 42, Priority.newInstance(0), 0); Token containerToken = newContainerToken(nodeId, "password".getBytes(), containerTokenIdentifier); Container container = Container.newInstance(cId, nodeId, NM_HOST + ":" + NM_HTTP_PORT, resource, null, containerToken); JobID id = TypeConverter.fromYarn(applicationId); JobId jobId = TypeConverter.toYarn(id); getContext().getEventHandler().handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.REDUCE, 100))); getContext().getEventHandler().handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.MAP, 100))); getContext().getEventHandler().handle( new TaskAttemptContainerAssignedEvent(event.getAttemptID(), container, null)); }
@Test public void testJobNamePercentEncoding() throws IOException { JobIndexInfo info = new JobIndexInfo(); JobID oldJobId = JobID.forName(JOB_ID); JobId jobId = TypeConverter.toYarn(oldJobId); info.setJobId(jobId); info.setSubmitTime(Long.parseLong(SUBMIT_TIME)); info.setUser(USER_NAME); info.setJobName(JOB_NAME_WITH_DELIMITER); info.setFinishTime(Long.parseLong(FINISH_TIME)); info.setNumMaps(Integer.parseInt(NUM_MAPS)); info.setNumReduces(Integer.parseInt(NUM_REDUCES)); info.setJobStatus(JOB_STATUS); info.setQueueName(QUEUE_NAME); info.setJobStartTime(Long.parseLong(JOB_START_TIME)); String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info); Assert.assertTrue("Job name not encoded correctly into job history file", jobHistoryFile.contains(JOB_NAME_WITH_DELIMITER_ESCAPE)); }
@Test public void testQueueNamePercentEncoding() throws IOException { JobIndexInfo info = new JobIndexInfo(); JobID oldJobId = JobID.forName(JOB_ID); JobId jobId = TypeConverter.toYarn(oldJobId); info.setJobId(jobId); info.setSubmitTime(Long.parseLong(SUBMIT_TIME)); info.setUser(USER_NAME); info.setJobName(JOB_NAME); info.setFinishTime(Long.parseLong(FINISH_TIME)); info.setNumMaps(Integer.parseInt(NUM_MAPS)); info.setNumReduces(Integer.parseInt(NUM_REDUCES)); info.setJobStatus(JOB_STATUS); info.setQueueName(QUEUE_NAME_WITH_DELIMITER); info.setJobStartTime(Long.parseLong(JOB_START_TIME)); String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info); Assert.assertTrue("Queue name not encoded correctly into job history file", jobHistoryFile.contains(QUEUE_NAME_WITH_DELIMITER_ESCAPE)); }
private void loadAllTasks() { if (tasksLoaded.get()) { return; } tasksLock.lock(); try { if (tasksLoaded.get()) { return; } for (Map.Entry<TaskID, TaskInfo> entry : jobInfo.getAllTasks().entrySet()) { TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey()); TaskInfo taskInfo = entry.getValue(); Task task = new CompletedTask(yarnTaskID, taskInfo); tasks.put(yarnTaskID, task); if (task.getType() == TaskType.MAP) { mapTasks.put(task.getID(), task); } else if (task.getType() == TaskType.REDUCE) { reduceTasks.put(task.getID(), task); } } tasksLoaded.set(true); } finally { tasksLock.unlock(); } }
/** * Child checking whether it can commit. * * <br> * Commit is a two-phased protocol. First the attempt informs the * ApplicationMaster that it is * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is * a legacy from the centralized commit protocol handling by the JobTracker. */ @Override public boolean canCommit(TaskAttemptID taskAttemptID) throws IOException { LOG.info("Commit go/no-go request from " + taskAttemptID.toString()); // An attempt is asking if it can commit its output. This can be decided // only by the task which is managing the multiple attempts. So redirect the // request there. org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); taskHeartbeatHandler.progressing(attemptID); // tell task to retry later if AM has not heard from RM within the commit // window to help avoid double-committing in a split-brain situation long now = context.getClock().getTime(); if (now - rmHeartbeatHandler.getLastHeartbeatTime() > commitWindowMs) { return false; } Job job = context.getJob(attemptID.getTaskId().getJobId()); Task task = job.getTask(attemptID.getTaskId()); return task.canCommit(attemptID); }
/** * TaskAttempt is reporting that it is in commit_pending and it is waiting for * the commit Response * * <br> * Commit it a two-phased protocol. First the attempt informs the * ApplicationMaster that it is * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is * a legacy from the centralized commit protocol handling by the JobTracker. */ @Override public void commitPending(TaskAttemptID taskAttemptID, TaskStatus taskStatsu) throws IOException, InterruptedException { LOG.info("Commit-pending state update from " + taskAttemptID.toString()); // An attempt is asking if it can commit its output. This can be decided // only by the task which is managing the multiple attempts. So redirect the // request there. org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); taskHeartbeatHandler.progressing(attemptID); //Ignorable TaskStatus? - since a task will send a LastStatusUpdate context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_COMMIT_PENDING)); }
@Override public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo) throws IOException { diagnosticInfo = StringInterner.weakIntern(diagnosticInfo); LOG.info("Diagnostics report from " + taskAttemptID.toString() + ": " + diagnosticInfo); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); taskHeartbeatHandler.progressing(attemptID); // This is mainly used for cases where we want to propagate exception traces // of tasks that fail. // This call exists as a hadoop mapreduce legacy wherein all changes in // counters/progress/phase/output-size are reported through statusUpdate() // call but not diagnosticInformation. context.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(attemptID, diagnosticInfo)); }
@Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); commitThreadCancelTimeoutMs = conf.getInt( MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS); commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS, MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS); try { fs = FileSystem.get(conf); JobID id = TypeConverter.fromYarn(context.getApplicationID()); JobId jobId = TypeConverter.toYarn(id); String user = UserGroupInformation.getCurrentUser().getShortUserName(); startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId); endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId); endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId); } catch (IOException e) { throw new YarnRuntimeException(e); } }
private static TaskAttemptUnsuccessfulCompletionEvent createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt, TaskAttemptStateInternal attemptState) { TaskAttemptUnsuccessfulCompletionEvent tauce = new TaskAttemptUnsuccessfulCompletionEvent( TypeConverter.fromYarn(taskAttempt.attemptId), TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId() .getTaskType()), attemptState.toString(), taskAttempt.finishTime, taskAttempt.container == null ? "UNKNOWN" : taskAttempt.container.getNodeId().getHost(), taskAttempt.container == null ? -1 : taskAttempt.container.getNodeId().getPort(), taskAttempt.nodeRackName == null ? "UNKNOWN" : taskAttempt.nodeRackName, StringUtils.join( LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt.getCounters(), taskAttempt .getProgressSplitBlock().burst()); return tauce; }
@SuppressWarnings("unchecked") private void sendLaunchedEvents() { JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId() .getJobId()); jce.addCounterUpdate(attemptId.getTaskId().getTaskType() == TaskType.MAP ? JobCounter.TOTAL_LAUNCHED_MAPS : JobCounter.TOTAL_LAUNCHED_REDUCES, 1); eventHandler.handle(jce); LOG.info("TaskAttempt: [" + attemptId + "] using containerId: [" + container.getId() + " on NM: [" + StringInterner.weakIntern(container.getNodeId().toString()) + "]"); TaskAttemptStartedEvent tase = new TaskAttemptStartedEvent(TypeConverter.fromYarn(attemptId), TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), launchTime, trackerName, httpPort, shufflePort, container.getId(), locality.toString(), avataar.toString()); eventHandler.handle( new JobHistoryEvent(attemptId.getTaskId().getJobId(), tase)); }
private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskStateInternal taskState, TaskAttemptId taId) { StringBuilder errorSb = new StringBuilder(); if (diag != null) { for (String d : diag) { errorSb.append(", ").append(d); } } TaskFailedEvent taskFailedEvent = new TaskFailedEvent( TypeConverter.fromYarn(task.taskId), // Hack since getFinishTime needs isFinished to be true and that doesn't happen till after the transition. task.getFinishTime(taId), TypeConverter.fromYarn(task.getType()), errorSb.toString(), taskState.toString(), taId == null ? null : TypeConverter.fromYarn(taId), task.getCounters()); return taskFailedEvent; }
public static FSDataInputStream getPreviousJobHistoryFileStream( Configuration conf, ApplicationAttemptId applicationAttemptId) throws IOException { FSDataInputStream in = null; Path historyFile = null; String jobId = TypeConverter.fromYarn(applicationAttemptId.getApplicationId()) .toString(); String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId); Path histDirPath = FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir)); FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf); // read the previous history file historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath, jobId, (applicationAttemptId.getAttemptId() - 1))); LOG.info("History file is at " + historyFile); in = fc.open(historyFile); return in; }
private void writeBadOutput(TaskAttempt attempt, Configuration conf) throws Exception { TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, TypeConverter.fromYarn(attempt.getID())); TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat(); RecordWriter theRecordWriter = theOutputFormat .getRecordWriter(tContext); NullWritable nullWritable = NullWritable.get(); try { theRecordWriter.write(key2, val2); theRecordWriter.write(null, nullWritable); theRecordWriter.write(null, val2); theRecordWriter.write(nullWritable, val1); theRecordWriter.write(key1, nullWritable); theRecordWriter.write(key2, null); theRecordWriter.write(null, null); theRecordWriter.write(key1, val1); } finally { theRecordWriter.close(tContext); } OutputFormat outputFormat = ReflectionUtils.newInstance( tContext.getOutputFormatClass(), conf); OutputCommitter committer = outputFormat.getOutputCommitter(tContext); committer.commitTask(tContext); }
private void writeOutput(TaskAttempt attempt, Configuration conf) throws Exception { TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, TypeConverter.fromYarn(attempt.getID())); TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat(); RecordWriter theRecordWriter = theOutputFormat .getRecordWriter(tContext); NullWritable nullWritable = NullWritable.get(); try { theRecordWriter.write(key1, val1); theRecordWriter.write(null, nullWritable); theRecordWriter.write(null, val1); theRecordWriter.write(nullWritable, val2); theRecordWriter.write(key2, nullWritable); theRecordWriter.write(key1, null); theRecordWriter.write(null, null); theRecordWriter.write(key2, val2); } finally { theRecordWriter.close(tContext); } OutputFormat outputFormat = ReflectionUtils.newInstance( tContext.getOutputFormatClass(), conf); OutputCommitter committer = outputFormat.getOutputCommitter(tContext); committer.commitTask(tContext); }
private void constructTaskReport() { loadAllTaskAttempts(); this.report = Records.newRecord(TaskReport.class); report.setTaskId(taskId); long minLaunchTime = Long.MAX_VALUE; for(TaskAttempt attempt: attempts.values()) { minLaunchTime = Math.min(minLaunchTime, attempt.getLaunchTime()); } minLaunchTime = minLaunchTime == Long.MAX_VALUE ? -1 : minLaunchTime; report.setStartTime(minLaunchTime); report.setFinishTime(taskInfo.getFinishTime()); report.setTaskState(getState()); report.setProgress(getProgress()); Counters counters = getCounters(); if (counters == null) { counters = EMPTY_COUNTERS; } report.setCounters(TypeConverter.toYarn(counters)); if (successfulAttempt != null) { report.setSuccessfulAttempt(successfulAttempt); } report.addAllDiagnostics(reportDiagnostics); report .addAllRunningAttempts(new ArrayList<TaskAttemptId>(attempts.keySet())); }
@Test public void testReportDiagnostics() throws Exception { JobID jobID = JobID.forName("job_1234567890000_0001"); JobId jobId = TypeConverter.toYarn(jobID); final String diagMsg = "some diagnostic message"; final JobDiagnosticsUpdateEvent diagUpdateEvent = new JobDiagnosticsUpdateEvent(jobId, diagMsg); MRAppMetrics mrAppMetrics = MRAppMetrics.create(); AppContext mockContext = mock(AppContext.class); when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true); JobImpl job = new JobImpl(jobId, Records .newRecord(ApplicationAttemptId.class), new Configuration(), mock(EventHandler.class), null, mock(JobTokenSecretManager.class), null, new SystemClock(), null, mrAppMetrics, null, true, null, 0, null, mockContext, null, null); job.handle(diagUpdateEvent); String diagnostics = job.getReport().getDiagnostics(); Assert.assertNotNull(diagnostics); Assert.assertTrue(diagnostics.contains(diagMsg)); job = new JobImpl(jobId, Records .newRecord(ApplicationAttemptId.class), new Configuration(), mock(EventHandler.class), null, mock(JobTokenSecretManager.class), null, new SystemClock(), null, mrAppMetrics, null, true, null, 0, null, mockContext, null, null); job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); job.handle(diagUpdateEvent); diagnostics = job.getReport().getDiagnostics(); Assert.assertNotNull(diagnostics); Assert.assertTrue(diagnostics.contains(diagMsg)); }
private boolean testUberDecision(Configuration conf) { JobID jobID = JobID.forName("job_1234567890000_0001"); JobId jobId = TypeConverter.toYarn(jobID); MRAppMetrics mrAppMetrics = MRAppMetrics.create(); JobImpl job = new JobImpl(jobId, ApplicationAttemptId.newInstance( ApplicationId.newInstance(0, 0), 0), conf, mock(EventHandler.class), null, new JobTokenSecretManager(), new Credentials(), null, null, mrAppMetrics, null, true, null, 0, null, null, null, null); InitTransition initTransition = getInitTransition(2); JobEvent mockJobEvent = mock(JobEvent.class); initTransition.transition(job, mockJobEvent); boolean isUber = job.isUber(); return isUber; }
public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, JobID jobId, MRClientProtocol historyServerProxy) { this.conf = new Configuration(conf); // Cloning for modifying. // For faster redirects from AM to HS. this.conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES, MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES)); this.conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES_ON_TIMEOUTS, MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES_ON_TIMEOUTS)); this.rm = rm; this.jobId = jobId; this.historyServerProxy = historyServerProxy; this.appId = TypeConverter.toYarn(jobId).getAppId(); notRunningJobs = new HashMap<JobState, HashMap<String, NotRunningJob>>(); }
public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0); GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class); request.setJobId(jobID); Counters cnt = ((GetCountersResponse) invoke("getCounters", GetCountersRequest.class, request)).getCounters(); return TypeConverter.fromYarn(cnt); }
public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter .toYarn(arg0); GetTaskAttemptCompletionEventsRequest request = recordFactory .newRecordInstance(GetTaskAttemptCompletionEventsRequest.class); request.setJobId(jobID); request.setFromEventId(arg1); request.setMaxEvents(arg2); List<org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent> list = ((GetTaskAttemptCompletionEventsResponse) invoke( "getTaskAttemptCompletionEvents", GetTaskAttemptCompletionEventsRequest.class, request)). getCompletionEventList(); return TypeConverter .fromYarn(list .toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0])); }
public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter .toYarn(arg0); GetDiagnosticsRequest request = recordFactory .newRecordInstance(GetDiagnosticsRequest.class); request.setTaskAttemptId(attemptID); List<String> list = ((GetDiagnosticsResponse) invoke("getDiagnostics", GetDiagnosticsRequest.class, request)).getDiagnosticsList(); String[] result = new String[list.size()]; int i = 0; for (String c : list) { result[i++] = c.toString(); } return result; }
public JobStatus getJobStatus(JobID oldJobID) throws IOException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter.toYarn(oldJobID); GetJobReportRequest request = recordFactory.newRecordInstance(GetJobReportRequest.class); request.setJobId(jobId); JobReport report = ((GetJobReportResponse) invoke("getJobReport", GetJobReportRequest.class, request)).getJobReport(); JobStatus jobStatus = null; if (report != null) { if (StringUtils.isEmpty(report.getJobFile())) { String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID); report.setJobFile(jobFile); } String historyTrackingUrl = report.getTrackingUrl(); String url = StringUtils.isNotEmpty(historyTrackingUrl) ? historyTrackingUrl : trackingUrl; jobStatus = TypeConverter.fromYarn(report, url); } return jobStatus; }
public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType) throws IOException{ org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter.toYarn(oldJobID); GetTaskReportsRequest request = recordFactory.newRecordInstance(GetTaskReportsRequest.class); request.setJobId(jobId); request.setTaskType(TypeConverter.toYarn(taskType)); List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports = ((GetTaskReportsResponse) invoke("getTaskReports", GetTaskReportsRequest.class, request)).getTaskReportList(); return TypeConverter.fromYarn (taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]); }
@Test public void testRemoteExceptionFromHistoryServer() throws Exception { MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow( new IOException("Job ID doesnot Exist")); ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId())) .thenReturn(null); ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( historyServerProxy, rm); try { clientServiceDelegate.getJobStatus(oldJobId); Assert.fail("Invoke should throw exception after retries."); } catch (IOException e) { Assert.assertTrue(e.getMessage().contains( "Job ID doesnot Exist")); } }
@Test public void testRetriesOnConnectionFailure() throws Exception { MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow( new RuntimeException("1")).thenThrow(new RuntimeException("2")) .thenReturn(getJobReportResponse()); ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId())) .thenReturn(null); ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( historyServerProxy, rm); JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); Assert.assertNotNull(jobStatus); verify(historyServerProxy, times(3)).getJobReport( any(GetJobReportRequest.class)); }
@Test public void testJobReportFromHistoryServer() throws Exception { MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn( getJobReportResponseFromHistoryServer()); ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId())) .thenReturn(null); ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( historyServerProxy, rm); JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); Assert.assertNotNull(jobStatus); Assert.assertEquals("TestJobFilePath", jobStatus.getJobFile()); Assert.assertEquals("http://TestTrackingUrl", jobStatus.getTrackingUrl()); Assert.assertEquals(1.0f, jobStatus.getMapProgress(), 0.0f); Assert.assertEquals(1.0f, jobStatus.getReduceProgress(), 0.0f); }
@Test public void testUserNamePercentEncoding() throws IOException { JobIndexInfo info = new JobIndexInfo(); JobID oldJobId = JobID.forName(JOB_ID); JobId jobId = TypeConverter.toYarn(oldJobId); info.setJobId(jobId); info.setSubmitTime(Long.parseLong(SUBMIT_TIME)); info.setUser(USER_NAME_WITH_DELIMITER); info.setJobName(JOB_NAME); info.setFinishTime(Long.parseLong(FINISH_TIME)); info.setNumMaps(Integer.parseInt(NUM_MAPS)); info.setNumReduces(Integer.parseInt(NUM_REDUCES)); info.setJobStatus(JOB_STATUS); info.setQueueName(QUEUE_NAME); info.setJobStartTime(Long.parseLong(JOB_START_TIME)); String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info); Assert.assertTrue("User name not encoded correctly into job history file", jobHistoryFile.contains(USER_NAME_WITH_DELIMITER_ESCAPE)); }
@Override public Task createRemoteTask() { //job file name is set in TaskAttempt, setting it null here ReduceTask reduceTask = new ReduceTask("", TypeConverter.fromYarn(getID()), partition, numMapTasks, 1); // YARN doesn't have the concept of slots per task, set it as 1. reduceTask.setUser(conf.get(MRJobConfig.USER_NAME)); reduceTask.setConf(conf); return reduceTask; }
@Override public Task createRemoteTask() { //job file name is set in TaskAttempt, setting it null here MapTask mapTask = new MapTask("", TypeConverter.fromYarn(getID()), partition, splitInfo.getSplitIndex(), 1); // YARN doesn't have the concept of slots per task, set it as 1. mapTask.setUser(conf.get(MRJobConfig.USER_NAME)); mapTask.setConf(conf); return mapTask; }
@Override public void done(TaskAttemptID taskAttemptID) throws IOException { LOG.info("Done acknowledgement from " + taskAttemptID.toString()); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); taskHeartbeatHandler.progressing(attemptID); context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); }
@Override public void fatalError(TaskAttemptID taskAttemptID, String msg) throws IOException { // This happens only in Child and in the Task. LOG.fatal("Task: " + taskAttemptID + " - exited : " + msg); reportDiagnosticInfo(taskAttemptID, "Error: " + msg); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); }
@Override public void fsError(TaskAttemptID taskAttemptID, String message) throws IOException { // This happens only in Child. LOG.fatal("Task: " + taskAttemptID + " - failed due to FSError: " + message); reportDiagnosticInfo(taskAttemptID, "FSError: " + message); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); }
private void loadAllTaskAttempts() { if (taskAttemptsLoaded.get()) { return; } taskAttemptsLock.lock(); try { if (taskAttemptsLoaded.get()) { return; } for (TaskAttemptInfo attemptHistory : taskInfo.getAllTaskAttempts() .values()) { CompletedTaskAttempt attempt = new CompletedTaskAttempt(taskId, attemptHistory); reportDiagnostics.addAll(attempt.getDiagnostics()); attempts.put(attempt.getID(), attempt); if (successfulAttempt == null && attemptHistory.getTaskStatus() != null && attemptHistory.getTaskStatus().equals( TaskState.SUCCEEDED.toString())) { successfulAttempt = TypeConverter.toYarn(attemptHistory.getAttemptId()); } } taskAttemptsLoaded.set(true); } finally { taskAttemptsLock.unlock(); } }
@Override public TaskAttemptReport getReport() { TaskAttemptReport result = recordFactory.newRecordInstance(TaskAttemptReport.class); readLock.lock(); try { result.setTaskAttemptId(attemptId); //take the LOCAL state of attempt //DO NOT take from reportedStatus result.setTaskAttemptState(getState()); result.setProgress(reportedStatus.progress); result.setStartTime(launchTime); result.setFinishTime(finishTime); result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime); result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics())); result.setPhase(reportedStatus.phase); result.setStateString(reportedStatus.stateString); result.setCounters(TypeConverter.toYarn(getCounters())); result.setContainerId(this.getAssignedContainerID()); result.setNodeManagerHost(trackerName); result.setNodeManagerHttpPort(httpPort); if (this.container != null) { result.setNodeManagerPort(this.container.getNodeId().getPort()); } return result; } finally { readLock.unlock(); } }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { TaskAttemptContext taskContext = new TaskAttemptContextImpl(taskAttempt.conf, TypeConverter.fromYarn(taskAttempt.attemptId)); taskAttempt.eventHandler.handle(new CommitterTaskAbortEvent( taskAttempt.attemptId, taskContext)); }
/** * Verify that all the events are flushed on stopping the HistoryHandler * @throws Exception */ @Test public void testEventsFlushOnStop() throws Exception { Configuration conf = new Configuration(); MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this .getClass().getName(), true); app.submit(conf); Job job = app.getContext().getAllJobs().values().iterator().next(); JobId jobId = job.getID(); LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); app.waitForState(job, JobState.SUCCEEDED); // make sure all events are flushed app.waitForState(Service.STATE.STOPPED); /* * Use HistoryContext to read logged events and verify the number of * completed maps */ HistoryContext context = new JobHistory(); ((JobHistory) context).init(conf); Job parsedJob = context.getJob(jobId); Assert.assertEquals("CompletedMaps not correct", 1, parsedJob .getCompletedMaps()); Map<TaskId, Task> tasks = parsedJob.getTasks(); Assert.assertEquals("No of tasks not correct", 1, tasks.size()); verifyTask(tasks.values().iterator().next()); Map<TaskId, Task> maps = parsedJob.getTasks(TaskType.MAP); Assert.assertEquals("No of maps not correct", 1, maps.size()); Assert.assertEquals("Job state not currect", JobState.SUCCEEDED, parsedJob.getState()); }
private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) { TaskFinishedEvent tfe = new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId), TypeConverter.fromYarn(task.successfulAttempt), task.getFinishTime(task.successfulAttempt), TypeConverter.fromYarn(task.taskId.getTaskType()), taskState.toString(), task.getCounters()); return tfe; }
@Override public GetCountersResponse getCounters(GetCountersRequest request) throws IOException { JobId jobId = request.getJobId(); Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB, true); GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class); response.setCounters(TypeConverter.toYarn(job.getAllCounters())); return response; }
@Override protected void serviceStart() throws Exception { scheduler= createSchedulerProxy(); JobID id = TypeConverter.fromYarn(this.applicationId); JobId jobId = TypeConverter.toYarn(id); job = context.getJob(jobId); register(); startAllocatorThread(); super.serviceStart(); }