@Override public synchronized JobStatus submitJob(JobID jobId) throws IOException { boolean loggingEnabled = LOG.isDebugEnabled(); if (loggingEnabled) { LOG.debug("submitJob for jobname = " + jobId); } if (jobs.containsKey(jobId)) { // job already running, don't start twice if (loggingEnabled) { LOG.debug("Job '" + jobId.getId() + "' already present "); } return jobs.get(jobId).getStatus(); } JobStory jobStory = SimulatorJobCache.get(jobId); if (jobStory == null) { throw new IllegalArgumentException("Job not found in SimulatorJobCache: "+jobId); } validateAndSetClock(jobStory.getSubmissionTime()); SimulatorJobInProgress job = new SimulatorJobInProgress(jobId, this, this.conf, jobStory); return addJob(jobId, job); }
@Override void updateTaskStatuses(TaskTrackerStatus status) { boolean loggingEnabled = LOG.isDebugEnabled(); String trackerName = status.getTrackerName(); // loop through the list of task statuses if (loggingEnabled) { LOG.debug("Updating task statuses for tracker " + trackerName); } for (TaskStatus report : status.getTaskReports()) { report.setTaskTracker(trackerName); TaskAttemptID taskAttemptId = report.getTaskID(); JobID jobid = taskAttemptId.getJobID(); if (loggingEnabled) { LOG.debug("Updating status for job " + jobid + " for task = " + taskAttemptId + " status=" + report.getProgress() + " for tracker: " + trackerName); } SimulatorJobInProgress job = getSimulatorJob(taskAttemptId.getJobID()); if(job ==null) { // This job bas completed before. Set<JobID> jobsToCleanup = trackerToJobsToCleanup.get(trackerName); if (jobsToCleanup == null) { jobsToCleanup = new HashSet<JobID>(); trackerToJobsToCleanup.put(trackerName, jobsToCleanup); } jobsToCleanup.add(taskAttemptId.getJobID()); continue; } TaskInProgress tip = taskidToTIPMap.get(taskAttemptId); JobStatus prevStatus = (JobStatus) job.getStatus().clone(); job.updateTaskStatus(tip, (TaskStatus) report.clone()); JobStatus newStatus = (JobStatus) job.getStatus().clone(); if (tip.isComplete()) { if (loggingEnabled) { LOG.debug("Completed task attempt " + taskAttemptId + " tracker:" + trackerName + " time=" + getClock().getTime()); } } if (prevStatus.getRunState() != newStatus.getRunState()) { if (loggingEnabled) { LOG.debug("Informing Listeners of job " + jobid + " of newStatus " + JobStatus.getJobRunState(newStatus.getRunState())); } JobStatusChangeEvent event = new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, newStatus); updateJobInProgressListeners(event); } } }
/** * Return the SimulatorJob object given a jobID. * @param jobid * @return */ private SimulatorJobInProgress getSimulatorJob(JobID jobid) { return (SimulatorJobInProgress)jobs.get(jobid); }