private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) { String serialPart = serialDirPath.getName(); String timeStampPart = JobHistoryUtils .getTimestampPartFromPath(serialDirPath.toString()); if (timeStampPart == null) { LOG.warn("Could not find timestamp portion from path: " + serialDirPath.toString() + ". Continuing with next"); return; } if (serialPart == null) { LOG.warn("Could not find serial portion from path: " + serialDirPath.toString() + ". Continuing with next"); return; } serialNumberIndex.remove(serialPart, timeStampPart); }
private void moveToDoneNow(Path fromPath, Path toPath) throws IOException { // check if path exists, in case of retries it may not exist if (stagingDirFS.exists(fromPath)) { LOG.info("Copying " + fromPath.toString() + " to " + toPath.toString()); // TODO temporarily removing the existing dst if (doneDirFS.exists(toPath)) { doneDirFS.delete(toPath, true); } boolean copied = FileUtil.copy(stagingDirFS, fromPath, doneDirFS, toPath, false, getConfig()); if (copied) LOG.info("Copied to done location: " + toPath); else LOG.info("copy failed"); doneDirFS.setPermission(toPath, new FsPermission( JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS)); } }
public static FSDataInputStream getPreviousJobHistoryFileStream( Configuration conf, ApplicationAttemptId applicationAttemptId) throws IOException { FSDataInputStream in = null; Path historyFile = null; String jobId = TypeConverter.fromYarn(applicationAttemptId.getApplicationId()) .toString(); String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId); Path histDirPath = FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir)); FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf); // read the previous history file historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath, jobId, (applicationAttemptId.getAttemptId() - 1))); LOG.info("History file is at " + historyFile); in = fc.open(historyFile); return in; }
@Test public void testGetHistoryIntermediateDoneDirForUser() throws IOException { // Test relative path Configuration conf = new Configuration(); conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "/mapred/history/done_intermediate"); conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name")); String pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf); Assert.assertEquals("/mapred/history/done_intermediate/" + System.getProperty("user.name"), pathStr); // Test fully qualified path // Create default configuration pointing to the minicluster conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, dfsCluster.getURI().toString()); FileOutputStream os = new FileOutputStream(coreSitePath); conf.writeXml(os); os.close(); // Simulate execution under a non-default namenode conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///"); pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf); Assert.assertEquals(dfsCluster.getURI().toString() + "/mapred/history/done_intermediate/" + System.getProperty("user.name"), pathStr); }
private void addDirectoryToSerialNumberIndex(Path serialDirPath) { if (LOG.isDebugEnabled()) { LOG.debug("Adding " + serialDirPath + " to serial index"); } String serialPart = serialDirPath.getName(); String timestampPart = JobHistoryUtils .getTimestampPartFromPath(serialDirPath.toString()); if (timestampPart == null) { LOG.warn("Could not find timestamp portion from path: " + serialDirPath + ". Continuing with next"); return; } if (serialPart == null) { LOG.warn("Could not find serial portion from path: " + serialDirPath.toString() + ". Continuing with next"); } else { serialNumberIndex.add(serialPart, timestampPart); } }
private void addDirectoryToJobListCache(Path path) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Adding " + path + " to job list cache."); } List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path, doneDirFc); for (FileStatus fs : historyFileList) { if (LOG.isDebugEnabled()) { LOG.debug("Adding in history for " + fs.getPath()); } JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() .getName()); String confFileName = JobHistoryUtils .getIntermediateConfFileName(jobIndexInfo.getJobId()); String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs .getPath().getParent(), confFileName), new Path(fs.getPath() .getParent(), summaryFileName), jobIndexInfo, true); jobListCache.addIfAbsent(fileInfo); } }
/** * Scans the intermediate directory to find user directories. Scans these for * history files if the modification time for the directory has changed. Once * it finds history files it starts the process of moving them to the done * directory. * * @throws IOException * if there was a error while scanning */ void scanIntermediateDirectory() throws IOException { // TODO it would be great to limit how often this happens, except in the // case where we are looking for a particular job. List<FileStatus> userDirList = JobHistoryUtils.localGlobber( intermediateDoneDirFc, intermediateDoneDirPath, ""); LOG.debug("Scanning intermediate dirs"); for (FileStatus userDir : userDirList) { String name = userDir.getPath().getName(); UserLogDir dir = userDirModificationTimeMap.get(name); if(dir == null) { dir = new UserLogDir(); UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir); if(old != null) { dir = old; } } dir.scanIfNeeded(userDir); } }
/** * Searches the job history file FileStatus list for the specified JobId. * * @param fileStatusList * fileStatus list of Job History Files. * @param jobId * The JobId to find. * @return A FileInfo object for the jobId, null if not found. * @throws IOException */ private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList, JobId jobId) throws IOException { for (FileStatus fs : fileStatusList) { JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() .getName()); if (jobIndexInfo.getJobId().equals(jobId)) { String confFileName = JobHistoryUtils .getIntermediateConfFileName(jobIndexInfo.getJobId()); String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path( fs.getPath().getParent(), confFileName), new Path(fs.getPath() .getParent(), summaryFileName), jobIndexInfo, true); return fileInfo; } } return null; }
/** * Scans old directories known by the idToDateString map for the specified * jobId. If the number of directories is higher than the supported size of * the idToDateString cache, the jobId will not be found. * * @param jobId * the jobId. * @return * @throws IOException */ private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException { String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent( jobId, serialNumberFormat); Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber); if (dateStringSet == null) { return null; } for (String timestampPart : dateStringSet) { Path logDir = canonicalHistoryLogPath(jobId, timestampPart); List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, doneDirFc); HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId); if (fileInfo != null) { return fileInfo; } } return null; }
private void makeDoneSubdir(Path path) throws IOException { try { doneDirFc.getFileStatus(path); existingDoneSubdirs.add(path); } catch (FileNotFoundException fnfE) { try { FsPermission fsp = new FsPermission( JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION); doneDirFc.mkdir(path, fsp, true); FileStatus fsStatus = doneDirFc.getFileStatus(path); LOG.info("Perms after creating " + fsStatus.getPermission().toShort() + ", Expected: " + fsp.toShort()); if (fsStatus.getPermission().toShort() != fsp.toShort()) { LOG.info("Explicitly setting permissions to : " + fsp.toShort() + ", " + fsp); doneDirFc.setPermission(path, fsp); } existingDoneSubdirs.add(path); } catch (FileAlreadyExistsException faeE) { // Nothing to do. } } }
private void addDirectoryToJobListCache(Path path) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Adding " + path + " to job list cache."); } List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path, doneDirFc); for (FileStatus fs : historyFileList) { if (LOG.isDebugEnabled()) { LOG.debug("Adding in history for " + fs.getPath()); } JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() .getName()); String confFileName = JobHistoryUtils .getIntermediateConfFileName(jobIndexInfo.getJobId()); String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs .getPath().getParent(), confFileName), new Path(fs.getPath() .getParent(), summaryFileName), jobIndexInfo, true); jobListCache.addIfAbsent(fileInfo); } }
/** * Searches the job history file FileStatus list for the specified JobId. * * @param fileStatusList * fileStatus list of Job History Files. * @param jobId * The JobId to find. * @return A FileInfo object for the jobId, null if not found. * @throws IOException */ private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList, JobId jobId) throws IOException { for (FileStatus fs : fileStatusList) { JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() .getName()); if (jobIndexInfo.getJobId().equals(jobId)) { String confFileName = JobHistoryUtils .getIntermediateConfFileName(jobIndexInfo.getJobId()); String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path( fs.getPath().getParent(), confFileName), new Path(fs.getPath() .getParent(), summaryFileName), jobIndexInfo, true); return fileInfo; } } return null; }