/** * read and populate job statistics information. */ private void readJobInformation(JobConf jobConf, JobInfo jobInfo) throws Exception { /* * Convert the input strings to URL */ URL jobConfFileUrl = new URL(this._jobConfFile); URL jobHistoryFileUrl = new URL (this._jobHistoryFile); /* * Read the Job Configuration from the jobConfFile url */ jobConf.addResource(jobConfFileUrl); /* * Read JobHistoryFile and build job counters to evaluate diagnostic rules */ if (jobHistoryFileUrl.getProtocol().equals("hdfs")) { DefaultJobHistoryParser.parseJobTasks (jobHistoryFileUrl.getPath(), jobInfo, FileSystem.get(jobConf)); } else if (jobHistoryFileUrl.getProtocol().equals("file")) { DefaultJobHistoryParser.parseJobTasks (jobHistoryFileUrl.getPath(), jobInfo, FileSystem.getLocal(jobConf)); } else { throw new Exception("Malformed URL. Protocol: "+jobHistoryFileUrl.getProtocol()); } }
public static JobHistory.JobInfo getJobInfoFromHdfsOutputDir(String outputDir, Configuration conf) throws IOException { Path output = new Path(outputDir); Path historyLogDir = new Path(output, "_logs/history"); FileSystem fs = output.getFileSystem(conf); if (!fs.exists(output)) { throw new IOException("History directory " + historyLogDir.toString() + " does not exist"); } Path[] jobFiles = FileUtil.stat2Paths(fs.listStatus(historyLogDir, jobLogFileFilter)); if (jobFiles.length == 0) { throw new IOException("Not a valid history directory " + historyLogDir.toString()); } String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(jobFiles[0].getName()). split("_"); String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4]; JobHistory.JobInfo job = new JobHistory.JobInfo(jobId); DefaultJobHistoryParser.parseJobTasks(jobFiles[0].toString(), job, fs); return job; }
public static JobHistory.JobInfo getJobInfoFromLocalFile(String outputFile, Configuration conf) throws IOException { FileSystem fs = FileSystem.getLocal(conf); Path outputFilePath = new Path(outputFile); String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(outputFilePath.getName()). split("_"); String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4]; JobHistory.JobInfo job = new JobHistory.JobInfo(jobId); DefaultJobHistoryParser.parseJobTasks(outputFile, job, fs); return job; }
@Override public boolean nextKeyValue() throws IOException, InterruptedException { if (location != null) { LOG.info("load: " + location); Path full = new Path(location); String[] jobDetails = JobInfo.decodeJobHistoryFileName(full.getName()).split("_"); String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4]; JobHistory.JobInfo job = new JobHistory.JobInfo(jobId); value = new MRJobInfo(); FileSystem fs = full.getFileSystem(conf); FileStatus fstat = fs.getFileStatus(full); LOG.info("file size: " + fstat.getLen()); DefaultJobHistoryParser.parseJobTasks(location, job, full.getFileSystem(conf)); LOG.info("job history parsed sucessfully"); HadoopJobHistoryLoader.parseJobHistory(conf, job, value); LOG.info("get parsed job history"); // parse Hadoop job xml file Path parent = full.getParent(); String jobXml = jobDetails[0] + "_" + jobDetails[1] + "_" + jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4] + "_conf.xml"; Path p = new Path(parent, jobXml); FSDataInputStream fileIn = fs.open(p); Map<String, String> val = HadoopJobHistoryLoader .parseJobXML(fileIn); for (String key : val.keySet()) { value.job.put(key, val.get(key)); } location = null; return true; } value = null; return false; }
@Override public boolean nextKeyValue() throws IOException, InterruptedException { if (location != null) { LOG.info("load: " + location); Path full = new Path(location); String[] jobDetails = JobInfo.decodeJobHistoryFileName(full.getName()).split("_"); String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4]; JobHistory.JobInfo job = new JobHistory.JobInfo(jobId); value = new MRJobInfo(); FileSystem fs = full.getFileSystem(conf); FileStatus fstat = fs.getFileStatus(full); LOG.info("file size: " + fstat.getLen()); DefaultJobHistoryParser.parseJobTasks(location, job, full.getFileSystem(conf)); LOG.info("job history parsed sucessfully"); HadoopJobHistoryLoader.parseJobHistory(conf, job, value); LOG.info("get parsed job history"); // parse Hadoop job xml file Path parent = full.getParent(); String jobXml = jobDetails[0] + "_" + jobDetails[1] + "_" + jobDetails[2] + "_conf.xml"; Path p = new Path(parent, jobXml); FSDataInputStream fileIn = fs.open(p); Map<String, String> val = HadoopJobHistoryLoader .parseJobXML(fileIn); for (String key : val.keySet()) { value.job.put(key, val.get(key)); } location = null; return true; } value = null; return false; }