/** * Safely clean-up all data structures at the end of the * job (success/failure/killed). In addition to performing the tasks that the * original finalizeJob does, we also inform the SimulatorEngine about the * completion of this job. * * @param job completed job. */ @Override synchronized void finalizeJob(JobInProgress job) { // Let the SimulatorEngine know that the job is done JobStatus cloneStatus = (JobStatus)job.getStatus().clone(); engine.markCompletedJob(cloneStatus, SimulatorJobTracker.getClock().getTime()); JobID jobId = job.getStatus().getJobID(); LOG.info("Finished job " + jobId + " endtime = " + getClock().getTime() + " with status: " + JobStatus.getJobRunState(job.getStatus().getRunState())); // for updating the metrics and JobHistory, invoke the original // finalizeJob. super.finalizeJob(job); // now placing this job in queue for future nuking cleanupJob(job); }
public void testLastTaskSpeculation() throws Exception { corona = new MiniCoronaCluster.Builder().numTaskTrackers(2).build(); JobConf conf = corona.createJobConf(); conf.setSpeculativeExecution(true); conf.setMapSpeculativeLag(1L); conf.setReduceSpeculativeLag(1L); conf.setLong(JobInProgress.REFRESH_TIMEOUT, 100L); conf.setLong(CoronaTaskTracker.HEART_BEAT_INTERVAL_KEY, 100L); conf.setLong(CoronaJobTracker.HEART_BEAT_INTERVAL_KEY, 100L); long start = System.currentTimeMillis(); SleepJob sleepJob = new SleepJob(); ToolRunner.run(conf, sleepJob, new String[]{ "-m", "1", "-r", "1", "-mt", "5000", "-rt", "5000", "-speculation"}); long end = System.currentTimeMillis(); verifyLaunchedTasks(sleepJob, 2, 2); new ClusterManagerMetricsVerifier(corona.getClusterManager(), 2, 2, 2, 2, 2, 2, 0, 0).verifyAll(); LOG.info("Time spent for testOneTaskWithOneTaskTracker:" + (end - start)); }
/** * Return the TaskLogsUrl of a particular TaskAttempt * * @param attempt * @return the taskLogsUrl. null if http-port or tracker-name or * task-attempt-id are unavailable. */ public static String getTaskLogsUrl( JobHistoryParser.TaskAttemptInfo attempt) { if (attempt.getHttpPort() == -1 || attempt.getTrackerName().equals("") || attempt.getAttemptId() == null) { return null; } String taskTrackerName = JobInProgress.convertTrackerNameToHostName( attempt.getTrackerName()); return TaskLogServlet.getTaskLogUrl(taskTrackerName, Integer.toString(attempt.getHttpPort()), attempt.getAttemptId().toString()); }
/** * Submits a sleep job with 1 map task that runs for a long time(60 sec) and * wait for the job to go into RUNNING state. * @param clusterConf * @param user the jobOwner * @return Job that is started * @throws IOException * @throws InterruptedException */ private Job submitJobAsUser(final Configuration clusterConf, String user) throws IOException, InterruptedException { UserGroupInformation ugi = UserGroupInformation.createUserForTesting(user, new String[] {}); Job job = (Job) ugi.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { SleepJob sleepJob = new SleepJob(); sleepJob.setConf(clusterConf); // Disable setup/cleanup tasks at the job level sleepJob.getConf().setBoolean(MRJobConfig.SETUP_CLEANUP_NEEDED, false); Job myJob = sleepJob.createJob(1, 0, 60000, 1, 1, 1); myJob.submit(); return myJob; } }); // Make the job go into RUNNING state by forceful initialization. JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); JobInProgress jip = jt.getJob(org.apache.hadoop.mapred.JobID.downgrade(job.getJobID())); jt.initJob(jip); return job; }
/** * Returns an XML-formatted table of the jobs in the list. * This is called repeatedly for different lists of jobs (e.g., running, completed, failed). */ public void generateJobTable(JspWriter out, String label, List<JobInProgress> jobs) throws IOException { if (jobs.size() > 0) { for (JobInProgress job : jobs) { JobProfile profile = job.getProfile(); JobStatus status = job.getStatus(); JobID jobid = profile.getJobID(); int desiredMaps = job.desiredMaps(); int desiredReduces = job.desiredReduces(); int completedMaps = job.finishedMaps(); int completedReduces = job.finishedReduces(); String name = profile.getJobName(); out.print("<" + label + "_job jobid=\"" + jobid + "\">\n"); out.print(" <jobid>" + jobid + "</jobid>\n"); out.print(" <user>" + profile.getUser() + "</user>\n"); out.print(" <name>" + ("".equals(name) ? " " : name) + "</name>\n"); out.print(" <map_complete>" + StringUtils.formatPercent(status.mapProgress(), 2) + "</map_complete>\n"); out.print(" <map_total>" + desiredMaps + "</map_total>\n"); out.print(" <maps_completed>" + completedMaps + "</maps_completed>\n"); out.print(" <reduce_complete>" + StringUtils.formatPercent(status.reduceProgress(), 2) + "</reduce_complete>\n"); out.print(" <reduce_total>" + desiredReduces + "</reduce_total>\n"); out.print(" <reduces_completed>" + completedReduces + "</reduces_completed>\n"); out.print("</" + label + "_job>\n"); } } }
/** * Reserve specified number of slots for a given <code>job</code>. * @param taskType {@link TaskType} of the task * @param job the job for which slots on this <code>TaskTracker</code> * are to be reserved * @param numSlots number of slots to be reserved */ public void reserveSlots(TaskType taskType, JobInProgress job, int numSlots) { JobID jobId = job.getJobID(); if (taskType == TaskType.MAP) { if (jobForFallowMapSlot != null && !jobForFallowMapSlot.getJobID().equals(jobId)) { throw new RuntimeException(trackerName + " already has " + "slots reserved for " + jobForFallowMapSlot + "; being" + " asked to reserve " + numSlots + " for " + jobId); } jobForFallowMapSlot = job; } else if (taskType == TaskType.REDUCE){ if (jobForFallowReduceSlot != null && !jobForFallowReduceSlot.getJobID().equals(jobId)) { throw new RuntimeException(trackerName + " already has " + "slots reserved for " + jobForFallowReduceSlot + "; being" + " asked to reserve " + numSlots + " for " + jobId); } jobForFallowReduceSlot = job; } job.reserveTaskTracker(this, taskType, numSlots); LOG.info(trackerName + ": Reserved " + numSlots + " " + taskType + " slots for " + jobId); }
/** * Free map slots on this <code>TaskTracker</code> which were reserved for * <code>taskType</code>. * @param taskType {@link TaskType} of the task * @param job job whose slots are being un-reserved */ public void unreserveSlots(TaskType taskType, JobInProgress job) { JobID jobId = job.getJobID(); if (taskType == TaskType.MAP) { if (jobForFallowMapSlot == null || !jobForFallowMapSlot.getJobID().equals(jobId)) { throw new RuntimeException(trackerName + " already has " + "slots reserved for " + jobForFallowMapSlot + "; being" + " asked to un-reserve for " + jobId); } jobForFallowMapSlot = null; } else { if (jobForFallowReduceSlot == null || !jobForFallowReduceSlot.getJobID().equals(jobId)) { throw new RuntimeException(trackerName + " already has " + "slots reserved for " + jobForFallowReduceSlot + "; being" + " asked to un-reserve for " + jobId); } jobForFallowReduceSlot = null; } job.unreserveTaskTracker(this, taskType); LOG.info(trackerName + ": Unreserved " + taskType + " slots for " + jobId); }
/** * The cleanupJob method maintains the queue cleanQueue. When a job is finalized, * it is added to the cleanupQueue. Jobs are removed from the cleanupQueue * so that its size is maintained to be less than that specified by * JOBS_IN_MUMAK_MEMORY. * @param job : The JobInProgress object that was just finalized and is * going to be added to the cleanupQueue. */ private void cleanupJob(JobInProgress job) { cleanupQueue.add(job.getJobID()); while(cleanupQueue.size()> JOBS_IN_MUMAK_MEMORY) { JobID removedJob = cleanupQueue.poll(); // retireJob(removedJob, ""); } }
public void incCMClientRetryCounter () { if (iface instanceof CoronaJobTracker) { Counters jobCounters = ((CoronaJobTracker)iface).getJobCounters(); if (jobCounters != null) { LOG.info("inc retry session counter"); jobCounters.incrCounter(JobInProgress.Counter.NUM_SESSION_DRIVER_CM_CLIENT_RETRY, 1); } } }
private void verifyLaunchedTasks(SleepJob sleepJob, int maps, int reduces) throws IOException { Counters jobCounters = sleepJob.getRunningJob().getCounters(); long launchedMaps = jobCounters.findCounter( JobInProgress.Counter.TOTAL_LAUNCHED_MAPS).getValue(); long launchedReduces = jobCounters.findCounter( JobInProgress.Counter.TOTAL_LAUNCHED_REDUCES).getValue(); Assert.assertEquals(maps, launchedMaps); Assert.assertEquals(reduces, launchedReduces); }
/** * The cleanupJob method maintains the queue cleanQueue. When a job is finalized, * it is added to the cleanupQueue. Jobs are removed from the cleanupQueue * so that its size is maintained to be less than that specified by * JOBS_IN_MUMAK_MEMORY. * @param job : The JobInProgress object that was just finalized and is * going to be added to the cleanupQueue. */ private void cleanupJob(JobInProgress job) { cleanupQueue.add(job.getJobID()); while(cleanupQueue.size()> JOBS_IN_MUMAK_MEMORY) { JobID removedJob = cleanupQueue.poll(); retireJob(removedJob, ""); } }
/** Checks if the map-reduce job has completed. * * @return true if the job completed, false otherwise. * @throws IOException */ public boolean checkComplete() throws IOException { JobID jobID = runningJob.getID(); if (runningJob.isComplete()) { // delete job directory final String jobdir = jobconf.get(JOB_DIR_LABEL); if (jobdir != null) { final Path jobpath = new Path(jobdir); jobpath.getFileSystem(jobconf).delete(jobpath, true); } if (runningJob.isSuccessful()) { LOG.info("Job Complete(Succeeded): " + jobID); } else { LOG.info("Job Complete(Failed): " + jobID); } raidPolicyPathPairList.clear(); Counters ctrs = runningJob.getCounters(); if (ctrs != null) { RaidNodeMetrics metrics = RaidNodeMetrics.getInstance(RaidNodeMetrics.DEFAULT_NAMESPACE_ID); if (ctrs.findCounter(Counter.FILES_FAILED) != null) { long filesFailed = ctrs.findCounter(Counter.FILES_FAILED).getValue(); metrics.raidFailures.inc(filesFailed); } long slotSeconds = ctrs.findCounter( JobInProgress.Counter.SLOTS_MILLIS_MAPS).getValue() / 1000; metrics.raidSlotSeconds.inc(slotSeconds); } return true; } else { String report = (" job " + jobID + " map " + StringUtils.formatPercent(runningJob.mapProgress(), 0)+ " reduce " + StringUtils.formatPercent(runningJob.reduceProgress(), 0)); if (!report.equals(lastReport)) { LOG.info(report); lastReport = report; } TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents(jobEventCounter); jobEventCounter += events.length; for(TaskCompletionEvent event : events) { if (event.getTaskStatus() == TaskCompletionEvent.Status.FAILED) { LOG.info(" Job " + jobID + " " + event.toString()); } } return false; } }
/** * Get the {@link JobInProgress} for which the fallow slot(s) are held. * @param taskType {@link TaskType} of the task * @return the task for which the fallow slot(s) are held, * <code>null</code> if there are no fallow slots */ public JobInProgress getJobForFallowSlot(TaskType taskType) { return (taskType == TaskType.MAP) ? jobForFallowMapSlot : jobForFallowReduceSlot; }