/** * Logs launch time of job. * * @param startTime start time of job. * @param totalMaps total maps assigned by jobtracker. * @param totalReduces total reduces. */ public void logInited(long startTime, int totalMaps, int totalReduces) { if (disableHistory) { return; } if (null != writers) { log(writers, RecordTypes.Job, new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES, Keys.JOB_STATUS}, new String[] {jobId.toString(), String.valueOf(startTime), String.valueOf(totalMaps), String.valueOf(totalReduces), Values.PREP.name()}); } }
/** * Logs job failed event. Closes the job history log file. * @param timestamp time when job failure was detected in ms. * @param finishedMaps no finished map tasks. * @param finishedReduces no of finished reduce tasks. */ public void logFailed(long timestamp, int finishedMaps, int finishedReduces, Counters counters) { if (disableHistory) { return; } if (null != writers) { log(writers, RecordTypes.Job, new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES, Keys.COUNTERS}, new String[] {jobId.toString(), String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps), String.valueOf(finishedReduces), counters.makeEscapedCompactString()}, true); closeAndClear(writers); } }
/** * Logs job killed event. Closes the job history log file. * * @param timestamp * time when job killed was issued in ms. * @param finishedMaps * no finished map tasks. * @param finishedReduces * no of finished reduce tasks. */ public void logKilled(long timestamp, int finishedMaps, int finishedReduces, Counters counters) { if (disableHistory) { return; } if (null != writers) { log(writers, RecordTypes.Job, new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES, Keys.COUNTERS }, new String[] {jobId.toString(), String.valueOf(timestamp), Values.KILLED.name(), String.valueOf(finishedMaps), String.valueOf(finishedReduces), counters.makeEscapedCompactString()}, true); closeAndClear(writers); } }
/** * Log start time of task (TIP). * @param taskId task id * @param taskType MAP or REDUCE * @param startTime startTime of tip. */ public void logTaskStarted(TaskID taskId, String taskType, long startTime, String splitLocations) { if (disableHistory) { return; } JobID id = taskId.getJobID(); if (!this.jobId.equals(id)) { throw new RuntimeException("JobId from task: " + id + " does not match expected: " + jobId); } if (null != writers) { log(writers, RecordTypes.Task, new Keys[]{Keys.TASKID, Keys.TASK_TYPE , Keys.START_TIME, Keys.SPLITS}, new String[]{taskId.toString(), taskType, String.valueOf(startTime), splitLocations}); } }
/** * Log finish time of task. * @param taskId task id * @param taskType MAP or REDUCE * @param finishTime finish timeof task in ms */ public void logTaskFinished(TaskID taskId, String taskType, long finishTime, Counters counters) { if (disableHistory) { return; } JobID id = taskId.getJobID(); if (!this.jobId.equals(id)) { throw new RuntimeException("JobId from task: " + id + " does not match expected: " + jobId); } if (null != writers) { log(writers, RecordTypes.Task, new Keys[]{Keys.TASKID, Keys.TASK_TYPE, Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.COUNTERS}, new String[]{ taskId.toString(), taskType, Values.SUCCESS.name(), String.valueOf(finishTime), counters.makeEscapedCompactString()}); } }
/** * Update the finish time of task. * @param taskId task id * @param finishTime finish time of task in ms */ public void logTaskUpdates(TaskID taskId, long finishTime) { if (disableHistory) { return; } JobID id = taskId.getJobID(); if (!this.jobId.equals(id)) { throw new RuntimeException("JobId from task: " + id + " does not match expected: " + jobId); } if (null != writers) { log(writers, RecordTypes.Task, new Keys[]{Keys.TASKID, Keys.FINISH_TIME}, new String[]{ taskId.toString(), String.valueOf(finishTime)}); } }
/** * Log a number of keys and values with the record. This method allows to do * it in a synchronous fashion * @param writers the writers to send the data to * @param recordType the type to log * @param keys keys to log * @param values values to log * @param sync if true - will block until the data is written */ private void log(ArrayList<PrintWriter> writers, RecordTypes recordType, Keys[] keys, String[] values, boolean sync) { StringBuffer buf = new StringBuffer(recordType.name()); buf.append(JobHistory.DELIMITER); for (int i = 0; i < keys.length; i++) { buf.append(keys[i]); buf.append("=\""); values[i] = JobHistory.escapeString(values[i]); buf.append(values[i]); buf.append("\""); buf.append(JobHistory.DELIMITER); } buf.append(JobHistory.LINE_DELIMITER_CHAR); for (PrintWriter out : writers) { LogTask task = new LogTask(out, buf.toString()); if (sync) { task.run(); } else { fileManager.addWriteTask(task); } } }
/** * Logs job as running */ public void logStarted() { if (disableHistory) { return; } if (null != writers) { log(writers, RecordTypes.Job, new Keys[] {Keys.JOBID, Keys.JOB_STATUS}, new String[] {jobId.toString(), Values.RUNNING.name()}); } }
/** * Log job finished. closes the job file in history. * @param finishTime finish time of job in ms. * @param finishedMaps no of maps successfully finished. * @param finishedReduces no of reduces finished sucessfully. * @param failedMaps no of failed map tasks. (includes killed) * @param failedReduces no of failed reduce tasks. (includes killed) * @param killedMaps no of killed map tasks. * @param killedReduces no of killed reduce tasks. * @param counters the counters from the job */ public void logFinished(long finishTime, int finishedMaps, int finishedReduces, int failedMaps, int failedReduces, int killedMaps, int killedReduces, Counters mapCounters, Counters reduceCounters, Counters counters) { if (disableHistory) { return; } if (null != writers) { log(writers, RecordTypes.Job, new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES, Keys.FAILED_MAPS, Keys.FAILED_REDUCES, Keys.KILLED_MAPS, Keys.KILLED_REDUCES, Keys.MAP_COUNTERS, Keys.REDUCE_COUNTERS, Keys.COUNTERS}, new String[] {jobId.toString(), Long.toString(finishTime), Values.SUCCESS.name(), String.valueOf(finishedMaps), String.valueOf(finishedReduces), String.valueOf(failedMaps), String.valueOf(failedReduces), String.valueOf(killedMaps), String.valueOf(killedReduces), mapCounters.makeEscapedCompactString(), reduceCounters.makeEscapedCompactString(), counters.makeEscapedCompactString()}, true); closeAndClear(writers); } // NOTE: history cleaning stuff deleted from here. We should do that // somewhere else! }
/** * Log job's priority. * @param priority Jobs priority */ public void logJobPriority(JobID jobid, JobPriority priority) { if (disableHistory) { return; } if (null != writers) { log(writers, RecordTypes.Job, new Keys[] {Keys.JOBID, Keys.JOB_PRIORITY}, new String[] {jobId.toString(), priority.toString()}); } }
/** * Log the task failure * * @param taskId the task that failed * @param taskType the type of the task * @param time the time of the failure * @param error the error the task failed with * @param failedDueToAttempt The attempt that caused the failure, if any */ public void logTaskFailed(TaskID taskId, String taskType, long time, String error, TaskAttemptID failedDueToAttempt) { if (disableHistory) { return; } JobID id = taskId.getJobID(); if (!this.jobId.equals(id)) { throw new RuntimeException("JobId from task: " + id + " does not match expected: " + jobId); } if (null != writers) { String failedAttempt = failedDueToAttempt == null ? "" : failedDueToAttempt.toString(); log(writers, RecordTypes.Task, new Keys[]{Keys.TASKID, Keys.TASK_TYPE, Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.ERROR, Keys.TASK_ATTEMPT_ID}, new String[]{ taskId.toString(), taskType, Values.FAILED.name(), String.valueOf(time) , error, failedAttempt}); } }
/** * Log start time of this map task attempt. * * @param taskAttemptId task attempt id * @param startTime start time of task attempt as reported by task tracker. * @param trackerName name of the tracker executing the task attempt. * @param httpPort http port of the task tracker executing the task attempt * @param taskType Whether the attempt is cleanup or setup or map */ public void logMapTaskStarted(TaskAttemptID taskAttemptId, long startTime, String trackerName, int httpPort, String taskType) { if (disableHistory) { return; } JobID id = taskAttemptId.getJobID(); if (!this.jobId.equals(id)) { throw new RuntimeException("JobId from task: " + id + " does not match expected: " + jobId); } if (null != writers) { log(writers, RecordTypes.MapAttempt, new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.TRACKER_NAME, Keys.HTTP_PORT}, new String[]{taskType, taskAttemptId.getTaskID().toString(), taskAttemptId.toString(), String.valueOf(startTime), trackerName, httpPort == -1 ? "" : String.valueOf(httpPort)}); } }
/** * Log finish time of map task attempt. * * @param taskAttemptId task attempt id * @param finishTime finish time * @param hostName host name * @param taskType Whether the attempt is cleanup or setup or map * @param stateString state string of the task attempt * @param counter counters of the task attempt */ public void logMapTaskFinished(TaskAttemptID taskAttemptId, long finishTime, String hostName, String taskType, String stateString, Counters counter) { if (disableHistory) { return; } JobID id = taskAttemptId.getJobID(); if (!this.jobId.equals(id)) { throw new RuntimeException("JobId from task: " + id + " does not match expected: " + jobId); } if (null != writers) { log(writers, RecordTypes.MapAttempt, new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.HOSTNAME, Keys.STATE_STRING, Keys.COUNTERS}, new String[]{taskType, taskAttemptId.getTaskID().toString(), taskAttemptId.toString(), Values.SUCCESS.name(), String.valueOf(finishTime), hostName, stateString, counter.makeEscapedCompactString()}); } }
/** * Log task attempt failed event. * * @param taskAttemptId task attempt id * @param timestamp timestamp * @param hostName hostname of this task attempt. * @param error error message if any for this task attempt. * @param taskType Whether the attempt is cleanup or setup or map */ public void logMapTaskFailed(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error, String taskType) { if (disableHistory) { return; } JobID id = taskAttemptId.getJobID(); if (!this.jobId.equals(id)) { throw new RuntimeException("JobId from task: " + id + " does not match expected: " + jobId); } if (null != writers) { log(writers, RecordTypes.MapAttempt, new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR}, new String[]{ taskType, taskAttemptId.getTaskID().toString(), taskAttemptId.toString(), Values.FAILED.name(), String.valueOf(timestamp), hostName, error}); } }
/** * Log task attempt killed event. * * @param taskAttemptId task attempt id * @param timestamp timestamp * @param hostName hostname of this task attempt. * @param error error message if any for this task attempt. * @param taskType Whether the attempt is cleanup or setup or map */ public void logMapTaskKilled(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error, String taskType) { if (disableHistory) { return; } JobID id = taskAttemptId.getJobID(); if (!this.jobId.equals(id)) { throw new RuntimeException("JobId from task: " + id + " does not match expected: " + jobId); } if (null != writers) { log(writers, RecordTypes.MapAttempt, new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR}, new String[]{ taskType, taskAttemptId.getTaskID().toString(), taskAttemptId.toString(), Values.KILLED.name(), String.valueOf(timestamp), hostName, error}); } }
/** * Log start time of Reduce task attempt. * * @param taskAttemptId task attempt id * @param startTime start time * @param trackerName tracker name * @param httpPort the http port of the tracker executing the task attempt * @param taskType Whether the attempt is cleanup or setup or reduce */ public void logReduceTaskStarted(TaskAttemptID taskAttemptId, long startTime, String trackerName, int httpPort, String taskType) { if (disableHistory) { return; } JobID id = taskAttemptId.getJobID(); if (!this.jobId.equals(id)) { throw new RuntimeException("JobId from task: " + id + " does not match expected: " + jobId); } if (null != writers) { log(writers, RecordTypes.ReduceAttempt, new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.TRACKER_NAME, Keys.HTTP_PORT}, new String[]{taskType, taskAttemptId.getTaskID().toString(), taskAttemptId.toString(), String.valueOf(startTime), trackerName, httpPort == -1 ? "" : String.valueOf(httpPort)}); } }
/** * Log finished event of this task. * * @param taskAttemptId task attempt id * @param shuffleFinished shuffle finish time * @param sortFinished sort finish time * @param finishTime finish time of task * @param hostName host name where task attempt executed * @param taskType Whether the attempt is cleanup or setup or reduce * @param stateString the state string of the attempt * @param counter counters of the attempt */ public void logReduceTaskFinished(TaskAttemptID taskAttemptId, long shuffleFinished, long sortFinished, long finishTime, String hostName, String taskType, String stateString, Counters counter) { if (disableHistory) { return; } JobID id = taskAttemptId.getJobID(); if (!this.jobId.equals(id)) { throw new RuntimeException("JobId from task: " + id + " does not match expected: " + jobId); } if (null != writers) { log(writers, RecordTypes.ReduceAttempt, new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED, Keys.FINISH_TIME, Keys.HOSTNAME, Keys.STATE_STRING, Keys.COUNTERS}, new String[]{taskType, taskAttemptId.getTaskID().toString(), taskAttemptId.toString(), Values.SUCCESS.name(), String.valueOf(shuffleFinished), String.valueOf(sortFinished), String.valueOf(finishTime), hostName, stateString, counter.makeEscapedCompactString()}); } }
/** * Log failed reduce task attempt. * * @param taskAttemptId task attempt id * @param timestamp time stamp when task failed * @param hostName host name of the task attempt. * @param error error message of the task. * @param taskType Whether the attempt is cleanup or setup or reduce */ public void logReduceTaskFailed(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error, String taskType) { if (disableHistory) { return; } JobID id = taskAttemptId.getJobID(); if (!this.jobId.equals(id)) { throw new RuntimeException("JobId from task: " + id + " does not match expected: " + jobId); } if (null != writers) { log(writers, RecordTypes.ReduceAttempt, new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR }, new String[]{ taskType, taskAttemptId.getTaskID().toString(), taskAttemptId.toString(), Values.FAILED.name(), String.valueOf(timestamp), hostName, error }); } }
/** * Log killed reduce task attempt. * * @param taskAttemptId task attempt id * @param timestamp time stamp when task failed * @param hostName host name of the task attempt. * @param error error message of the task. * @param taskType Whether the attempt is cleanup or setup or reduce */ public void logReduceTaskKilled(TaskAttemptID taskAttemptId, long timestamp, String hostName, String error, String taskType) { if (disableHistory) { return; } JobID id = taskAttemptId.getJobID(); if (!this.jobId.equals(id)) { throw new RuntimeException("JobId from task: " + id + " does not match expected: " + jobId); } if (null != writers) { log(writers, RecordTypes.ReduceAttempt, new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR }, new String[]{ taskType, taskAttemptId.getTaskID().toString(), taskAttemptId.toString(), Values.KILLED.name(), String.valueOf(timestamp), hostName, error }); } }
/** * Creates a job history file of a given specific version. This method should * change if format/content of future versions of job history file changes. */ private void writeHistoryFile(FSDataOutputStream out, long version) throws IOException { String delim = "\n"; // '\n' for version 0 String counters = COUNTERS; String jobConf = "job.xml"; if (version > 0) { // line delimeter should be '.' for later versions // Change the delimiter delim = DELIM + delim; // Write the version line out.writeBytes(RecordTypes.Meta.name() + " VERSION=\"" + JobHistory.VERSION + "\" " + delim); jobConf = JobHistory.escapeString(jobConf); counters = JobHistory.escapeString(counters); } // Write the job-start line out.writeBytes("Job JOBID=\"" + JOB + "\" JOBNAME=\"" + JOBNAME + "\"" + " USER=\"" + USER + "\" SUBMIT_TIME=\"" + TIME + "\"" + " JOBCONF=\"" + jobConf + "\" " + delim); // Write the job-launch line out.writeBytes("Job JOBID=\"" + JOB + "\" LAUNCH_TIME=\"" + TIME + "\"" + " TOTAL_MAPS=\"1\" TOTAL_REDUCES=\"0\" " + delim); // Write the task start line out.writeBytes("Task TASKID=\"" + TASK_ID + "\" TASK_TYPE=\"MAP\"" + " START_TIME=\"" + TIME + "\" SPLITS=\"\"" + " TOTAL_MAPS=\"1\" TOTAL_REDUCES=\"0\" " + delim); // Write the attempt start line out.writeBytes("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"" + TASK_ID + "\"" + " TASK_ATTEMPT_ID=\"" + TASK_ATTEMPT_ID + "\"" + " START_TIME=\"" + TIME + "\"" + " HOSTNAME=\"" + HOSTNAME + "\" " + delim); // Write the attempt finish line out.writeBytes("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"" + TASK_ID + "\"" + " TASK_ATTEMPT_ID=\"" + TASK_ATTEMPT_ID + "\"" + " FINISH_TIME=\"" + TIME + "\"" + " TASK_STATUS=\"SUCCESS\"" + " HOSTNAME=\"" + HOSTNAME + "\" " + delim); // Write the task finish line out.writeBytes("Task TASKID=\"" + TASK_ID + "\" TASK_TYPE=\"MAP\"" + " TASK_STATUS=\"SUCCESS\"" + " FINISH_TIME=\"" + TIME + "\"" + " COUNTERS=\"" + counters + "\" " + delim); // Write the job-finish line out.writeBytes("Job JOBID=\"" + JOB + "\" FINISH_TIME=\"" + TIME + "\"" + " TOTAL_MAPS=\"1\" TOTAL_REDUCES=\"0\"" + " JOB_STATUS=\"SUCCESS\" FINISHED_MAPS=\"1\"" + " FINISHED_REDUCES=\"0\" FAILED_MAPS=\"0\"" + " FAILED_REDUCES=\"0\"" + " COUNTERS=\"" + counters + "\" " + delim); }