Java 类org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils 实例源码

项目:hadoop    文件:HistoryFileManager.java   
private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
  String serialPart = serialDirPath.getName();
  String timeStampPart = JobHistoryUtils
      .getTimestampPartFromPath(serialDirPath.toString());
  if (timeStampPart == null) {
    LOG.warn("Could not find timestamp portion from path: "
        + serialDirPath.toString() + ". Continuing with next");
    return;
  }
  if (serialPart == null) {
    LOG.warn("Could not find serial portion from path: "
        + serialDirPath.toString() + ". Continuing with next");
    return;
  }
  serialNumberIndex.remove(serialPart, timeStampPart);
}
项目:hadoop    文件:JobHistoryEventHandler.java   
private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
  // check if path exists, in case of retries it may not exist
  if (stagingDirFS.exists(fromPath)) {
    LOG.info("Copying " + fromPath.toString() + " to " + toPath.toString());
    // TODO temporarily removing the existing dst
    if (doneDirFS.exists(toPath)) {
      doneDirFS.delete(toPath, true);
    }
    boolean copied = FileUtil.copy(stagingDirFS, fromPath, doneDirFS, toPath,
        false, getConfig());

    if (copied)
      LOG.info("Copied to done location: " + toPath);
    else 
      LOG.info("copy failed");
    doneDirFS.setPermission(toPath, new FsPermission(
        JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
  }
}
项目:hadoop    文件:JobHistoryCopyService.java   
public static FSDataInputStream getPreviousJobHistoryFileStream(
    Configuration conf, ApplicationAttemptId applicationAttemptId)
    throws IOException {
  FSDataInputStream in = null;
  Path historyFile = null;
  String jobId =
      TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
        .toString();
  String jobhistoryDir =
      JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
  Path histDirPath =
      FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
  FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
  // read the previous history file
  historyFile =
      fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
        jobId, (applicationAttemptId.getAttemptId() - 1)));
  LOG.info("History file is at " + historyFile);
  in = fc.open(historyFile);
  return in;
}
项目:hadoop    文件:TestJobHistoryEventHandler.java   
@Test
public void testGetHistoryIntermediateDoneDirForUser() throws IOException {
  // Test relative path
  Configuration conf = new Configuration();
  conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
      "/mapred/history/done_intermediate");
  conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
  String pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
  Assert.assertEquals("/mapred/history/done_intermediate/" +
      System.getProperty("user.name"), pathStr);

  // Test fully qualified path
  // Create default configuration pointing to the minicluster
  conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
      dfsCluster.getURI().toString());
  FileOutputStream os = new FileOutputStream(coreSitePath);
  conf.writeXml(os);
  os.close();
  // Simulate execution under a non-default namenode
  conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
          "file:///");
  pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
  Assert.assertEquals(dfsCluster.getURI().toString() +
      "/mapred/history/done_intermediate/" + System.getProperty("user.name"),
      pathStr);
}
项目:hadoop    文件:HistoryFileManager.java   
private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Adding " + serialDirPath + " to serial index");
  }
  String serialPart = serialDirPath.getName();
  String timestampPart = JobHistoryUtils
      .getTimestampPartFromPath(serialDirPath.toString());
  if (timestampPart == null) {
    LOG.warn("Could not find timestamp portion from path: " + serialDirPath
        + ". Continuing with next");
    return;
  }
  if (serialPart == null) {
    LOG.warn("Could not find serial portion from path: "
        + serialDirPath.toString() + ". Continuing with next");
  } else {
    serialNumberIndex.add(serialPart, timestampPart);
  }
}
项目:hadoop    文件:HistoryFileManager.java   
private void addDirectoryToJobListCache(Path path) throws IOException {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Adding " + path + " to job list cache.");
  }
  List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
      doneDirFc);
  for (FileStatus fs : historyFileList) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Adding in history for " + fs.getPath());
    }
    JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
        .getName());
    String confFileName = JobHistoryUtils
        .getIntermediateConfFileName(jobIndexInfo.getJobId());
    String summaryFileName = JobHistoryUtils
        .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
    HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs
        .getPath().getParent(), confFileName), new Path(fs.getPath()
        .getParent(), summaryFileName), jobIndexInfo, true);
    jobListCache.addIfAbsent(fileInfo);
  }
}
项目:hadoop    文件:HistoryFileManager.java   
/**
 * Scans the intermediate directory to find user directories. Scans these for
 * history files if the modification time for the directory has changed. Once
 * it finds history files it starts the process of moving them to the done 
 * directory.
 * 
 * @throws IOException
 *           if there was a error while scanning
 */
void scanIntermediateDirectory() throws IOException {
  // TODO it would be great to limit how often this happens, except in the
  // case where we are looking for a particular job.
  List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
      intermediateDoneDirFc, intermediateDoneDirPath, "");
  LOG.debug("Scanning intermediate dirs");
  for (FileStatus userDir : userDirList) {
    String name = userDir.getPath().getName();
    UserLogDir dir = userDirModificationTimeMap.get(name);
    if(dir == null) {
      dir = new UserLogDir();
      UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
      if(old != null) {
        dir = old;
      }
    }
    dir.scanIfNeeded(userDir);
  }
}
项目:hadoop    文件:HistoryFileManager.java   
/**
 * Searches the job history file FileStatus list for the specified JobId.
 * 
 * @param fileStatusList
 *          fileStatus list of Job History Files.
 * @param jobId
 *          The JobId to find.
 * @return A FileInfo object for the jobId, null if not found.
 * @throws IOException
 */
private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
    JobId jobId) throws IOException {
  for (FileStatus fs : fileStatusList) {
    JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
        .getName());
    if (jobIndexInfo.getJobId().equals(jobId)) {
      String confFileName = JobHistoryUtils
          .getIntermediateConfFileName(jobIndexInfo.getJobId());
      String summaryFileName = JobHistoryUtils
          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
      HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(
          fs.getPath().getParent(), confFileName), new Path(fs.getPath()
          .getParent(), summaryFileName), jobIndexInfo, true);
      return fileInfo;
    }
  }
  return null;
}
项目:hadoop    文件:HistoryFileManager.java   
/**
 * Scans old directories known by the idToDateString map for the specified
 * jobId. If the number of directories is higher than the supported size of
 * the idToDateString cache, the jobId will not be found.
 * 
 * @param jobId
 *          the jobId.
 * @return
 * @throws IOException
 */
private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
  String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent(
      jobId, serialNumberFormat);
  Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
  if (dateStringSet == null) {
    return null;
  }
  for (String timestampPart : dateStringSet) {
    Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
        doneDirFc);
    HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId);
    if (fileInfo != null) {
      return fileInfo;
    }
  }
  return null;
}
项目:hadoop    文件:HistoryFileManager.java   
private void makeDoneSubdir(Path path) throws IOException {
  try {
    doneDirFc.getFileStatus(path);
    existingDoneSubdirs.add(path);
  } catch (FileNotFoundException fnfE) {
    try {
      FsPermission fsp = new FsPermission(
          JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
      doneDirFc.mkdir(path, fsp, true);
      FileStatus fsStatus = doneDirFc.getFileStatus(path);
      LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
          + ", Expected: " + fsp.toShort());
      if (fsStatus.getPermission().toShort() != fsp.toShort()) {
        LOG.info("Explicitly setting permissions to : " + fsp.toShort()
            + ", " + fsp);
        doneDirFc.setPermission(path, fsp);
      }
      existingDoneSubdirs.add(path);
    } catch (FileAlreadyExistsException faeE) { // Nothing to do.
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:JobHistoryEventHandler.java   
private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
  // check if path exists, in case of retries it may not exist
  if (stagingDirFS.exists(fromPath)) {
    LOG.info("Copying " + fromPath.toString() + " to " + toPath.toString());
    // TODO temporarily removing the existing dst
    if (doneDirFS.exists(toPath)) {
      doneDirFS.delete(toPath, true);
    }
    boolean copied = FileUtil.copy(stagingDirFS, fromPath, doneDirFS, toPath,
        false, getConfig());

    if (copied)
      LOG.info("Copied to done location: " + toPath);
    else 
      LOG.info("copy failed");
    doneDirFS.setPermission(toPath, new FsPermission(
        JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
  }
}
项目:aliyun-oss-hadoop-fs    文件:JobHistoryCopyService.java   
public static FSDataInputStream getPreviousJobHistoryFileStream(
    Configuration conf, ApplicationAttemptId applicationAttemptId)
    throws IOException {
  FSDataInputStream in = null;
  Path historyFile = null;
  String jobId =
      TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
        .toString();
  String jobhistoryDir =
      JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
  Path histDirPath =
      FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
  FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
  // read the previous history file
  historyFile =
      fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
        jobId, (applicationAttemptId.getAttemptId() - 1)));
  LOG.info("History file is at " + historyFile);
  in = fc.open(historyFile);
  return in;
}
项目:aliyun-oss-hadoop-fs    文件:TestJobHistoryEventHandler.java   
@Test
public void testGetHistoryIntermediateDoneDirForUser() throws IOException {
  // Test relative path
  Configuration conf = new Configuration();
  conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
      "/mapred/history/done_intermediate");
  conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
  String pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
  Assert.assertEquals("/mapred/history/done_intermediate/" +
      System.getProperty("user.name"), pathStr);

  // Test fully qualified path
  // Create default configuration pointing to the minicluster
  conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
      dfsCluster.getURI().toString());
  FileOutputStream os = new FileOutputStream(coreSitePath);
  conf.writeXml(os);
  os.close();
  // Simulate execution under a non-default namenode
  conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
          "file:///");
  pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
  Assert.assertEquals(dfsCluster.getURI().toString() +
      "/mapred/history/done_intermediate/" + System.getProperty("user.name"),
      pathStr);
}
项目:aliyun-oss-hadoop-fs    文件:HistoryFileManager.java   
private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
  String serialPart = serialDirPath.getName();
  String timeStampPart = JobHistoryUtils
      .getTimestampPartFromPath(serialDirPath.toString());
  if (timeStampPart == null) {
    LOG.warn("Could not find timestamp portion from path: "
        + serialDirPath.toString() + ". Continuing with next");
    return;
  }
  if (serialPart == null) {
    LOG.warn("Could not find serial portion from path: "
        + serialDirPath.toString() + ". Continuing with next");
    return;
  }
  serialNumberIndex.remove(serialPart, timeStampPart);
}
项目:aliyun-oss-hadoop-fs    文件:HistoryFileManager.java   
private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Adding " + serialDirPath + " to serial index");
  }
  String serialPart = serialDirPath.getName();
  String timestampPart = JobHistoryUtils
      .getTimestampPartFromPath(serialDirPath.toString());
  if (timestampPart == null) {
    LOG.warn("Could not find timestamp portion from path: " + serialDirPath
        + ". Continuing with next");
    return;
  }
  if (serialPart == null) {
    LOG.warn("Could not find serial portion from path: "
        + serialDirPath.toString() + ". Continuing with next");
  } else {
    serialNumberIndex.add(serialPart, timestampPart);
  }
}
项目:aliyun-oss-hadoop-fs    文件:HistoryFileManager.java   
private void addDirectoryToJobListCache(Path path) throws IOException {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Adding " + path + " to job list cache.");
  }
  List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
      doneDirFc);
  for (FileStatus fs : historyFileList) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Adding in history for " + fs.getPath());
    }
    JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
        .getName());
    String confFileName = JobHistoryUtils
        .getIntermediateConfFileName(jobIndexInfo.getJobId());
    String summaryFileName = JobHistoryUtils
        .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
    HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
        .getPath().getParent(), confFileName), new Path(fs.getPath()
        .getParent(), summaryFileName), jobIndexInfo, true);
    jobListCache.addIfAbsent(fileInfo);
  }
}
项目:aliyun-oss-hadoop-fs    文件:HistoryFileManager.java   
/**
 * Scans the intermediate directory to find user directories. Scans these for
 * history files if the modification time for the directory has changed. Once
 * it finds history files it starts the process of moving them to the done 
 * directory.
 * 
 * @throws IOException
 *           if there was a error while scanning
 */
void scanIntermediateDirectory() throws IOException {
  // TODO it would be great to limit how often this happens, except in the
  // case where we are looking for a particular job.
  List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
      intermediateDoneDirFc, intermediateDoneDirPath, "");
  LOG.debug("Scanning intermediate dirs");
  for (FileStatus userDir : userDirList) {
    String name = userDir.getPath().getName();
    UserLogDir dir = userDirModificationTimeMap.get(name);
    if(dir == null) {
      dir = new UserLogDir();
      UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
      if(old != null) {
        dir = old;
      }
    }
    dir.scanIfNeeded(userDir);
  }
}
项目:aliyun-oss-hadoop-fs    文件:HistoryFileManager.java   
/**
 * Searches the job history file FileStatus list for the specified JobId.
 * 
 * @param fileStatusList
 *          fileStatus list of Job History Files.
 * @param jobId
 *          The JobId to find.
 * @return A FileInfo object for the jobId, null if not found.
 * @throws IOException
 */
private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
    JobId jobId) throws IOException {
  for (FileStatus fs : fileStatusList) {
    JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
        .getName());
    if (jobIndexInfo.getJobId().equals(jobId)) {
      String confFileName = JobHistoryUtils
          .getIntermediateConfFileName(jobIndexInfo.getJobId());
      String summaryFileName = JobHistoryUtils
          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(
          fs.getPath().getParent(), confFileName), new Path(fs.getPath()
          .getParent(), summaryFileName), jobIndexInfo, true);
      return fileInfo;
    }
  }
  return null;
}
项目:aliyun-oss-hadoop-fs    文件:HistoryFileManager.java   
/**
 * Scans old directories known by the idToDateString map for the specified
 * jobId. If the number of directories is higher than the supported size of
 * the idToDateString cache, the jobId will not be found.
 * 
 * @param jobId
 *          the jobId.
 * @return
 * @throws IOException
 */
private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
  String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent(
      jobId, serialNumberFormat);
  Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
  if (dateStringSet == null) {
    return null;
  }
  for (String timestampPart : dateStringSet) {
    Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
        doneDirFc);
    HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId);
    if (fileInfo != null) {
      return fileInfo;
    }
  }
  return null;
}
项目:aliyun-oss-hadoop-fs    文件:HistoryFileManager.java   
private void makeDoneSubdir(Path path) throws IOException {
  try {
    doneDirFc.getFileStatus(path);
    existingDoneSubdirs.add(path);
  } catch (FileNotFoundException fnfE) {
    try {
      FsPermission fsp = new FsPermission(
          JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
      doneDirFc.mkdir(path, fsp, true);
      FileStatus fsStatus = doneDirFc.getFileStatus(path);
      LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
          + ", Expected: " + fsp.toShort());
      if (fsStatus.getPermission().toShort() != fsp.toShort()) {
        LOG.info("Explicitly setting permissions to : " + fsp.toShort()
            + ", " + fsp);
        doneDirFc.setPermission(path, fsp);
      }
      existingDoneSubdirs.add(path);
    } catch (FileAlreadyExistsException faeE) { // Nothing to do.
    }
  }
}
项目:big-c    文件:JobHistoryEventHandler.java   
private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
  // check if path exists, in case of retries it may not exist
  if (stagingDirFS.exists(fromPath)) {
    LOG.info("Copying " + fromPath.toString() + " to " + toPath.toString());
    // TODO temporarily removing the existing dst
    if (doneDirFS.exists(toPath)) {
      doneDirFS.delete(toPath, true);
    }
    boolean copied = FileUtil.copy(stagingDirFS, fromPath, doneDirFS, toPath,
        false, getConfig());

    if (copied)
      LOG.info("Copied to done location: " + toPath);
    else 
      LOG.info("copy failed");
    doneDirFS.setPermission(toPath, new FsPermission(
        JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
  }
}
项目:big-c    文件:JobHistoryCopyService.java   
public static FSDataInputStream getPreviousJobHistoryFileStream(
    Configuration conf, ApplicationAttemptId applicationAttemptId)
    throws IOException {
  FSDataInputStream in = null;
  Path historyFile = null;
  String jobId =
      TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
        .toString();
  String jobhistoryDir =
      JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
  Path histDirPath =
      FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
  FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
  // read the previous history file
  historyFile =
      fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
        jobId, (applicationAttemptId.getAttemptId() - 1)));
  LOG.info("History file is at " + historyFile);
  in = fc.open(historyFile);
  return in;
}
项目:big-c    文件:TestJobHistoryEventHandler.java   
@Test
public void testGetHistoryIntermediateDoneDirForUser() throws IOException {
  // Test relative path
  Configuration conf = new Configuration();
  conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
      "/mapred/history/done_intermediate");
  conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
  String pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
  Assert.assertEquals("/mapred/history/done_intermediate/" +
      System.getProperty("user.name"), pathStr);

  // Test fully qualified path
  // Create default configuration pointing to the minicluster
  conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
      dfsCluster.getURI().toString());
  FileOutputStream os = new FileOutputStream(coreSitePath);
  conf.writeXml(os);
  os.close();
  // Simulate execution under a non-default namenode
  conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
          "file:///");
  pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
  Assert.assertEquals(dfsCluster.getURI().toString() +
      "/mapred/history/done_intermediate/" + System.getProperty("user.name"),
      pathStr);
}
项目:big-c    文件:HistoryFileManager.java   
private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
  String serialPart = serialDirPath.getName();
  String timeStampPart = JobHistoryUtils
      .getTimestampPartFromPath(serialDirPath.toString());
  if (timeStampPart == null) {
    LOG.warn("Could not find timestamp portion from path: "
        + serialDirPath.toString() + ". Continuing with next");
    return;
  }
  if (serialPart == null) {
    LOG.warn("Could not find serial portion from path: "
        + serialDirPath.toString() + ". Continuing with next");
    return;
  }
  serialNumberIndex.remove(serialPart, timeStampPart);
}
项目:big-c    文件:HistoryFileManager.java   
private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Adding " + serialDirPath + " to serial index");
  }
  String serialPart = serialDirPath.getName();
  String timestampPart = JobHistoryUtils
      .getTimestampPartFromPath(serialDirPath.toString());
  if (timestampPart == null) {
    LOG.warn("Could not find timestamp portion from path: " + serialDirPath
        + ". Continuing with next");
    return;
  }
  if (serialPart == null) {
    LOG.warn("Could not find serial portion from path: "
        + serialDirPath.toString() + ". Continuing with next");
  } else {
    serialNumberIndex.add(serialPart, timestampPart);
  }
}
项目:big-c    文件:HistoryFileManager.java   
private void addDirectoryToJobListCache(Path path) throws IOException {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Adding " + path + " to job list cache.");
  }
  List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
      doneDirFc);
  for (FileStatus fs : historyFileList) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Adding in history for " + fs.getPath());
    }
    JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
        .getName());
    String confFileName = JobHistoryUtils
        .getIntermediateConfFileName(jobIndexInfo.getJobId());
    String summaryFileName = JobHistoryUtils
        .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
    HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
        .getPath().getParent(), confFileName), new Path(fs.getPath()
        .getParent(), summaryFileName), jobIndexInfo, true);
    jobListCache.addIfAbsent(fileInfo);
  }
}
项目:big-c    文件:HistoryFileManager.java   
/**
 * Scans the intermediate directory to find user directories. Scans these for
 * history files if the modification time for the directory has changed. Once
 * it finds history files it starts the process of moving them to the done 
 * directory.
 * 
 * @throws IOException
 *           if there was a error while scanning
 */
void scanIntermediateDirectory() throws IOException {
  // TODO it would be great to limit how often this happens, except in the
  // case where we are looking for a particular job.
  List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
      intermediateDoneDirFc, intermediateDoneDirPath, "");
  LOG.debug("Scanning intermediate dirs");
  for (FileStatus userDir : userDirList) {
    String name = userDir.getPath().getName();
    UserLogDir dir = userDirModificationTimeMap.get(name);
    if(dir == null) {
      dir = new UserLogDir();
      UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
      if(old != null) {
        dir = old;
      }
    }
    dir.scanIfNeeded(userDir);
  }
}
项目:big-c    文件:HistoryFileManager.java   
/**
 * Searches the job history file FileStatus list for the specified JobId.
 * 
 * @param fileStatusList
 *          fileStatus list of Job History Files.
 * @param jobId
 *          The JobId to find.
 * @return A FileInfo object for the jobId, null if not found.
 * @throws IOException
 */
private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
    JobId jobId) throws IOException {
  for (FileStatus fs : fileStatusList) {
    JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
        .getName());
    if (jobIndexInfo.getJobId().equals(jobId)) {
      String confFileName = JobHistoryUtils
          .getIntermediateConfFileName(jobIndexInfo.getJobId());
      String summaryFileName = JobHistoryUtils
          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(
          fs.getPath().getParent(), confFileName), new Path(fs.getPath()
          .getParent(), summaryFileName), jobIndexInfo, true);
      return fileInfo;
    }
  }
  return null;
}
项目:big-c    文件:HistoryFileManager.java   
/**
 * Scans old directories known by the idToDateString map for the specified
 * jobId. If the number of directories is higher than the supported size of
 * the idToDateString cache, the jobId will not be found.
 * 
 * @param jobId
 *          the jobId.
 * @return
 * @throws IOException
 */
private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
  String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent(
      jobId, serialNumberFormat);
  Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
  if (dateStringSet == null) {
    return null;
  }
  for (String timestampPart : dateStringSet) {
    Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
        doneDirFc);
    HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId);
    if (fileInfo != null) {
      return fileInfo;
    }
  }
  return null;
}
项目:big-c    文件:HistoryFileManager.java   
private void makeDoneSubdir(Path path) throws IOException {
  try {
    doneDirFc.getFileStatus(path);
    existingDoneSubdirs.add(path);
  } catch (FileNotFoundException fnfE) {
    try {
      FsPermission fsp = new FsPermission(
          JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
      doneDirFc.mkdir(path, fsp, true);
      FileStatus fsStatus = doneDirFc.getFileStatus(path);
      LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
          + ", Expected: " + fsp.toShort());
      if (fsStatus.getPermission().toShort() != fsp.toShort()) {
        LOG.info("Explicitly setting permissions to : " + fsp.toShort()
            + ", " + fsp);
        doneDirFc.setPermission(path, fsp);
      }
      existingDoneSubdirs.add(path);
    } catch (FileAlreadyExistsException faeE) { // Nothing to do.
    }
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:JobHistoryEventHandler.java   
private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
  // check if path exists, in case of retries it may not exist
  if (stagingDirFS.exists(fromPath)) {
    LOG.info("Copying " + fromPath.toString() + " to " + toPath.toString());
    // TODO temporarily removing the existing dst
    if (doneDirFS.exists(toPath)) {
      doneDirFS.delete(toPath, true);
    }
    boolean copied = FileUtil.copy(stagingDirFS, fromPath, doneDirFS, toPath,
        false, getConfig());

    if (copied)
      LOG.info("Copied to done location: " + toPath);
    else 
      LOG.info("copy failed");
    doneDirFS.setPermission(toPath, new FsPermission(
        JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:JobHistoryCopyService.java   
public static FSDataInputStream getPreviousJobHistoryFileStream(
    Configuration conf, ApplicationAttemptId applicationAttemptId)
    throws IOException {
  FSDataInputStream in = null;
  Path historyFile = null;
  String jobId =
      TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
        .toString();
  String jobhistoryDir =
      JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
  Path histDirPath =
      FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
  FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
  // read the previous history file
  historyFile =
      fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
        jobId, (applicationAttemptId.getAttemptId() - 1)));
  LOG.info("History file is at " + historyFile);
  in = fc.open(historyFile);
  return in;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestJobHistoryEventHandler.java   
@Test
public void testGetHistoryIntermediateDoneDirForUser() throws IOException {
  // Test relative path
  Configuration conf = new Configuration();
  conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
      "/mapred/history/done_intermediate");
  conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
  String pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
  Assert.assertEquals("/mapred/history/done_intermediate/" +
      System.getProperty("user.name"), pathStr);

  // Test fully qualified path
  // Create default configuration pointing to the minicluster
  conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
      dfsCluster.getURI().toString());
  FileOutputStream os = new FileOutputStream(coreSitePath);
  conf.writeXml(os);
  os.close();
  // Simulate execution under a non-default namenode
  conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
          "file:///");
  pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
  Assert.assertEquals(dfsCluster.getURI().toString() +
      "/mapred/history/done_intermediate/" + System.getProperty("user.name"),
      pathStr);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:HistoryFileManager.java   
private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
  String serialPart = serialDirPath.getName();
  String timeStampPart = JobHistoryUtils
      .getTimestampPartFromPath(serialDirPath.toString());
  if (timeStampPart == null) {
    LOG.warn("Could not find timestamp portion from path: "
        + serialDirPath.toString() + ". Continuing with next");
    return;
  }
  if (serialPart == null) {
    LOG.warn("Could not find serial portion from path: "
        + serialDirPath.toString() + ". Continuing with next");
    return;
  }
  serialNumberIndex.remove(serialPart, timeStampPart);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:HistoryFileManager.java   
private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Adding " + serialDirPath + " to serial index");
  }
  String serialPart = serialDirPath.getName();
  String timestampPart = JobHistoryUtils
      .getTimestampPartFromPath(serialDirPath.toString());
  if (timestampPart == null) {
    LOG.warn("Could not find timestamp portion from path: " + serialDirPath
        + ". Continuing with next");
    return;
  }
  if (serialPart == null) {
    LOG.warn("Could not find serial portion from path: "
        + serialDirPath.toString() + ". Continuing with next");
  } else {
    serialNumberIndex.add(serialPart, timestampPart);
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:HistoryFileManager.java   
private void addDirectoryToJobListCache(Path path) throws IOException {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Adding " + path + " to job list cache.");
  }
  List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
      doneDirFc);
  for (FileStatus fs : historyFileList) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Adding in history for " + fs.getPath());
    }
    JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
        .getName());
    String confFileName = JobHistoryUtils
        .getIntermediateConfFileName(jobIndexInfo.getJobId());
    String summaryFileName = JobHistoryUtils
        .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
    HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
        .getPath().getParent(), confFileName), new Path(fs.getPath()
        .getParent(), summaryFileName), jobIndexInfo, true);
    jobListCache.addIfAbsent(fileInfo);
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:HistoryFileManager.java   
/**
 * Scans the intermediate directory to find user directories. Scans these for
 * history files if the modification time for the directory has changed. Once
 * it finds history files it starts the process of moving them to the done 
 * directory.
 * 
 * @throws IOException
 *           if there was a error while scanning
 */
void scanIntermediateDirectory() throws IOException {
  // TODO it would be great to limit how often this happens, except in the
  // case where we are looking for a particular job.
  List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
      intermediateDoneDirFc, intermediateDoneDirPath, "");
  LOG.debug("Scanning intermediate dirs");
  for (FileStatus userDir : userDirList) {
    String name = userDir.getPath().getName();
    UserLogDir dir = userDirModificationTimeMap.get(name);
    if(dir == null) {
      dir = new UserLogDir();
      UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
      if(old != null) {
        dir = old;
      }
    }
    dir.scanIfNeeded(userDir);
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:HistoryFileManager.java   
/**
 * Searches the job history file FileStatus list for the specified JobId.
 * 
 * @param fileStatusList
 *          fileStatus list of Job History Files.
 * @param jobId
 *          The JobId to find.
 * @return A FileInfo object for the jobId, null if not found.
 * @throws IOException
 */
private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
    JobId jobId) throws IOException {
  for (FileStatus fs : fileStatusList) {
    JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
        .getName());
    if (jobIndexInfo.getJobId().equals(jobId)) {
      String confFileName = JobHistoryUtils
          .getIntermediateConfFileName(jobIndexInfo.getJobId());
      String summaryFileName = JobHistoryUtils
          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(
          fs.getPath().getParent(), confFileName), new Path(fs.getPath()
          .getParent(), summaryFileName), jobIndexInfo, true);
      return fileInfo;
    }
  }
  return null;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:HistoryFileManager.java   
/**
 * Scans old directories known by the idToDateString map for the specified
 * jobId. If the number of directories is higher than the supported size of
 * the idToDateString cache, the jobId will not be found.
 * 
 * @param jobId
 *          the jobId.
 * @return
 * @throws IOException
 */
private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
  String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent(
      jobId, serialNumberFormat);
  Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
  if (dateStringSet == null) {
    return null;
  }
  for (String timestampPart : dateStringSet) {
    Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
        doneDirFc);
    HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId);
    if (fileInfo != null) {
      return fileInfo;
    }
  }
  return null;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:HistoryFileManager.java   
private void makeDoneSubdir(Path path) throws IOException {
  try {
    doneDirFc.getFileStatus(path);
    existingDoneSubdirs.add(path);
  } catch (FileNotFoundException fnfE) {
    try {
      FsPermission fsp = new FsPermission(
          JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
      doneDirFc.mkdir(path, fsp, true);
      FileStatus fsStatus = doneDirFc.getFileStatus(path);
      LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
          + ", Expected: " + fsp.toShort());
      if (fsStatus.getPermission().toShort() != fsp.toShort()) {
        LOG.info("Explicitly setting permissions to : " + fsp.toShort()
            + ", " + fsp);
        doneDirFc.setPermission(path, fsp);
      }
      existingDoneSubdirs.add(path);
    } catch (FileAlreadyExistsException faeE) { // Nothing to do.
    }
  }
}