/** * @param jobConfFile - URL pointing to job configuration (job_conf.xml) file * @param jobHistoryLogFile - URL pointing to job history log file * @param testsConfFile - file path for test configuration file (optional). * If not specified default path is:$HADOOP_HOME/contrib/vaidya/pxpd_tests_config.xml * @param reportFile - file path for storing report (optional) */ public PostExPerformanceDiagnoser (String jobConfFile, String jobHistoryFile, InputStream testsConfFileIs, String reportFile) throws Exception { this._jobHistoryFile = jobHistoryFile; this._testsConfFileIs = testsConfFileIs; this._reportFile = reportFile; this._jobConfFile = jobConfFile; /* * Read the job information necessary for post performance analysis */ JobConf jobConf = new JobConf(); JobInfo jobInfo = new JobInfo(""); readJobInformation(jobConf, jobInfo); this._jobExecutionStatistics = new JobStatistics(jobConf, jobInfo); }
/** * read and populate job statistics information. */ private void readJobInformation(JobConf jobConf, JobInfo jobInfo) throws Exception { /* * Convert the input strings to URL */ URL jobConfFileUrl = new URL(this._jobConfFile); URL jobHistoryFileUrl = new URL (this._jobHistoryFile); /* * Read the Job Configuration from the jobConfFile url */ jobConf.addResource(jobConfFileUrl); /* * Read JobHistoryFile and build job counters to evaluate diagnostic rules */ if (jobHistoryFileUrl.getProtocol().equals("hdfs")) { DefaultJobHistoryParser.parseJobTasks (jobHistoryFileUrl.getPath(), jobInfo, FileSystem.get(jobConf)); } else if (jobHistoryFileUrl.getProtocol().equals("file")) { DefaultJobHistoryParser.parseJobTasks (jobHistoryFileUrl.getPath(), jobInfo, FileSystem.getLocal(jobConf)); } else { throw new Exception("Malformed URL. Protocol: "+jobHistoryFileUrl.getProtocol()); } }
public static JobInfo getJobInfo(HttpServletRequest request, FileSystem fs) throws IOException { String jobid = request.getParameter("jobid"); String logFile = request.getParameter("logFile"); synchronized(jobHistoryCache) { JobInfo jobInfo = jobHistoryCache.remove(jobid); if (jobInfo == null) { jobInfo = new JobHistory.JobInfo(jobid); LOG.info("Loading Job History file "+jobid + ". Cache size is " + jobHistoryCache.size()); DefaultJobHistoryParser.parseJobTasks( logFile, jobInfo, fs) ; } jobHistoryCache.put(jobid, jobInfo); if (jobHistoryCache.size() > CACHE_SIZE) { Iterator<Map.Entry<String, JobInfo>> it = jobHistoryCache.entrySet().iterator(); String removeJobId = it.next().getKey(); it.remove(); LOG.info("Job History file removed form cache "+removeJobId); } return jobInfo; } }
/** * Read a job-history log file and construct the corresponding {@link JobInfo} * . Also cache the {@link JobInfo} for quick serving further requests. * * @param logFile * @param fs * @param jobTracker * @return JobInfo * @throws IOException */ static JobInfo getJobInfo(Path logFile, FileSystem fs, JobTracker jobTracker, String user) throws IOException { String jobid = getJobID(logFile.getName()); JobInfo jobInfo = null; synchronized(jobHistoryCache) { jobInfo = jobHistoryCache.remove(jobid); if (jobInfo == null) { jobInfo = new JobHistory.JobInfo(jobid); LOG.info("Loading Job History file "+jobid + ". Cache size is " + jobHistoryCache.size()); DefaultJobHistoryParser.parseJobTasks(logFile.toUri().getPath(), jobInfo, fs); } jobHistoryCache.put(jobid, jobInfo); int CACHE_SIZE = jobTracker.conf.getInt("mapred.job.tracker.jobhistory.lru.cache.size", 5); if (jobHistoryCache.size() > CACHE_SIZE) { Iterator<Map.Entry<String, JobInfo>> it = jobHistoryCache.entrySet().iterator(); String removeJobId = it.next().getKey(); it.remove(); LOG.info("Job History file removed form cache "+removeJobId); } } UserGroupInformation currentUser; if (user == null) { currentUser = UserGroupInformation.getCurrentUser(); } else { currentUser = UserGroupInformation.createRemoteUser(user); } // Authorize the user for view access of this job jobTracker.getACLsManager().checkAccess(jobid, currentUser, jobInfo.getJobQueue(), Operation.VIEW_JOB_DETAILS, jobInfo.get(Keys.USER), jobInfo.getJobACLs().get(JobACL.VIEW_JOB)); return jobInfo; }
/** * Check the access for users to view job-history pages. * * @param request * @param response * @param jobTracker * @param fs * @param logFile * @return the job if authorization is disabled or if the authorization checks * pass. Otherwise return null. * @throws IOException * @throws InterruptedException * @throws ServletException */ static JobInfo checkAccessAndGetJobInfo(HttpServletRequest request, HttpServletResponse response, final JobTracker jobTracker, final FileSystem fs, final Path logFile) throws IOException, InterruptedException, ServletException { String jobid = getJobID(logFile.getName()); String user = request.getRemoteUser(); JobInfo job = null; if (user != null) { try { job = JSPUtil.getJobInfo(logFile, fs, jobTracker, user); } catch (AccessControlException e) { String errMsg = String.format( "User %s failed to view %s!<br><br>%s" + "<hr>" + "<a href=\"jobhistory.jsp\">Go back to JobHistory</a><br>" + "<a href=\"jobtracker.jsp\">Go back to JobTracker</a>", user, jobid, e.getMessage()); JSPUtil.setErrorAndForward(errMsg, request, response); return null; } } else { // no authorization needed job = JSPUtil.getJobInfo(logFile, fs, jobTracker, null); } return job; }
/** * Tests the JobHistory parser with different versions of job history files */ public void testJobHistoryVersion() throws IOException { // If new job history version comes up, the modified parser may fail for // the history file created by writeHistoryFile(). for (long version = 0; version <= JobHistory.VERSION; version++) { JobConf conf = new JobConf(); FileSystem fs = FileSystem.getLocal(conf); // cleanup fs.delete(TEST_DIR, true); Path historyPath = new Path(TEST_DIR + "/_logs/history/" + FILENAME + version); fs.delete(historyPath, false); FSDataOutputStream out = fs.create(historyPath); writeHistoryFile(out, version); out.close(); JobInfo job = new JobHistory.JobInfo(JOB); DefaultJobHistoryParser.parseJobTasks(historyPath.toString(), job, fs); assertTrue("Failed to parse jobhistory files of version " + version, job.getAllTasks().size() > 0); // cleanup fs.delete(TEST_DIR, true); } }
public JobStatistics (JobConf jobConf, JobInfo jobInfo) throws ParseException { this._jobConf = jobConf; this._jobInfo = jobInfo; this._job = new Hashtable<Enum, String>(); populate_Job(this._job, this._jobInfo.getValues()); populate_MapReduceTaskLists(this._mapTaskList, this._reduceTaskList, this._jobInfo.getAllTasks()); // Add the Job Type: MAP_REDUCE, MAP_ONLY if (getLongValue(JobKeys.TOTAL_REDUCES) == 0) { this._job.put(JobKeys.JOBTYPE,"MAP_ONLY"); } else { this._job.put(JobKeys.JOBTYPE,"MAP_REDUCE"); } }
public static void parseJobHistory(Configuration jobConf, JobInfo jobInfo, MRJobInfo value) { value.job.clear(); populateJob(jobInfo.getValues(), value.job); value.mapTask.clear(); value.reduceTask.clear(); populateMapReduceTaskLists(value, jobInfo.getAllTasks()); }
/** * Read a job-history log file and construct the corresponding {@link JobInfo} * . Also cache the {@link JobInfo} for quick serving further requests. * * @param logFile * @param fs * @return JobInfo * @throws IOException */ static JobInfo getJobInfo(Path logFile, FileSystem fs, JobConf jobConf, ACLsManager acLsManager, String user) throws IOException { String jobid = getJobID(logFile.getName()); JobInfo jobInfo = null; synchronized(jobHistoryCache) { jobInfo = jobHistoryCache.remove(jobid); if (jobInfo == null) { jobInfo = new JobHistory.JobInfo(jobid); LOG.info("Loading Job History file "+jobid + ". Cache size is " + jobHistoryCache.size()); DefaultJobHistoryParser.parseJobTasks(logFile.toUri().getPath(), jobInfo, fs); } jobHistoryCache.put(jobid, jobInfo); int CACHE_SIZE = jobConf.getInt("mapred.job.tracker.jobhistory.lru.cache.size", 5); if (jobHistoryCache.size() > CACHE_SIZE) { Iterator<Map.Entry<String, JobInfo>> it = jobHistoryCache.entrySet().iterator(); String removeJobId = it.next().getKey(); it.remove(); LOG.info("Job History file removed form cache "+removeJobId); } } UserGroupInformation currentUser; if (user == null) { currentUser = UserGroupInformation.getCurrentUser(); } else { currentUser = UserGroupInformation.createRemoteUser(user); } // Authorize the user for view access of this job acLsManager.checkAccess(jobid, currentUser, jobInfo.getJobQueue(), Operation.VIEW_JOB_DETAILS, jobInfo.get(Keys.USER), jobInfo.getJobACLs().get(JobACL.VIEW_JOB)); return jobInfo; }
/** * Check the access for users to view job-history pages. * * @param request * @param response * @param fs * @param logFile * @return the job if authorization is disabled or if the authorization checks * pass. Otherwise return null. * @throws IOException * @throws InterruptedException * @throws ServletException */ static JobInfo checkAccessAndGetJobInfo(HttpServletRequest request, HttpServletResponse response, final JobConf jobConf, final ACLsManager acLsManager, final FileSystem fs, final Path logFile) throws IOException, InterruptedException, ServletException { String jobid = getJobID(logFile.getName()); String user = request.getRemoteUser(); JobInfo job = null; if (user != null) { try { job = JSPUtil.getJobInfo(logFile, fs, jobConf, acLsManager, user); } catch (AccessControlException e) { String trackerAddress = jobConf.get("mapred.job.tracker.http.address"); String errMsg = String.format( "User %s failed to view %s!<br><br>%s" + "<hr>" + "<a href=\"jobhistory.jsp\">Go back to JobHistory</a><br>" + "<a href=\"http://" + trackerAddress + "/jobtracker.jsp\">Go back to JobTracker</a>", user, jobid, e.getMessage()); JSPUtil.setErrorAndForward(errMsg, request, response); return null; } } else { // no authorization needed job = JSPUtil.getJobInfo(logFile, fs, jobConf, acLsManager, null); } return job; }
public JobStatistics (JobConf jobConf, JobInfo jobInfo) throws ParseException { this._jobConf = jobConf; this._jobInfo = jobInfo; this._job = new Hashtable<Enum, String>(); populate_Job(this._job, this._jobInfo.getValues()); populate_MapReduceTaskLists(this._mapTaskList, this._reduceTaskList, this._jobInfo.getAllTasks()); }