Java 类org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo 实例源码

项目:ditb    文件:ExportSnapshot.java   
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
  Configuration conf = context.getConfiguration();
  Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
  FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);

  List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
  int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
  if (mappers == 0 && snapshotFiles.size() > 0) {
    mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
    mappers = Math.min(mappers, snapshotFiles.size());
    conf.setInt(CONF_NUM_SPLITS, mappers);
    conf.setInt(MR_NUM_MAPS, mappers);
  }

  List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
  List<InputSplit> splits = new ArrayList(groups.size());
  for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
    splits.add(new ExportSnapshotInputSplit(files));
  }
  return splits;
}
项目:pbase    文件:ExportSnapshot.java   
/**
 * Returns the location where the inputPath will be copied.
 */
private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
  Path path = null;
  switch (inputInfo.getType()) {
    case HFILE:
      Path inputPath = new Path(inputInfo.getHfile());
      String family = inputPath.getParent().getName();
      TableName table =HFileLink.getReferencedTableName(inputPath.getName());
      String region = HFileLink.getReferencedRegionName(inputPath.getName());
      String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
      path = new Path(FSUtils.getTableDir(new Path("./"), table),
          new Path(region, new Path(family, hfile)));
      break;
    case WAL:
      Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME);
      path = new Path(oldLogsDir, inputInfo.getWalName());
      break;
    default:
      throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
  }
  return new Path(outputArchive, path);
}
项目:pbase    文件:ExportSnapshot.java   
private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
    throws IOException {
  if (testFailures) {
    if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
      if (random == null) {
        random = new Random();
      }

      // FLAKY-TEST-WARN: lower is better, we can get some runs without the
      // retry, but at least we reduce the number of test failures due to
      // this test exception from the same map task.
      if (random.nextFloat() < 0.03) {
        throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
                              + " time=" + System.currentTimeMillis());
      }
    } else {
      context.getCounter(Counter.COPY_FAILED).increment(1);
      throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
    }
  }
}
项目:pbase    文件:ExportSnapshot.java   
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
  Configuration conf = context.getConfiguration();
  String snapshotName = conf.get(CONF_SNAPSHOT_NAME);
  Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
  FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);

  List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
  int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
  if (mappers == 0 && snapshotFiles.size() > 0) {
    mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
    mappers = Math.min(mappers, snapshotFiles.size());
    conf.setInt(CONF_NUM_SPLITS, mappers);
    conf.setInt(MR_NUM_MAPS, mappers);
  }

  List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
  List<InputSplit> splits = new ArrayList(groups.size());
  for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
    splits.add(new ExportSnapshotInputSplit(files));
  }
  return splits;
}
项目:PyroDB    文件:ExportSnapshot.java   
/**
 * Returns the location where the inputPath will be copied.
 */
private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
  Path path = null;
  switch (inputInfo.getType()) {
    case HFILE:
      Path inputPath = new Path(inputInfo.getHfile());
      String family = inputPath.getParent().getName();
      TableName table =HFileLink.getReferencedTableName(inputPath.getName());
      String region = HFileLink.getReferencedRegionName(inputPath.getName());
      String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
      path = new Path(FSUtils.getTableDir(new Path("./"), table),
          new Path(region, new Path(family, hfile)));
      break;
    case WAL:
      Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME);
      path = new Path(oldLogsDir, inputInfo.getWalName());
      break;
    default:
      throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
  }
  return new Path(outputArchive, path);
}
项目:PyroDB    文件:ExportSnapshot.java   
private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
    throws IOException {
  if (testFailures) {
    if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
      if (random == null) {
        random = new Random();
      }

      // FLAKY-TEST-WARN: lower is better, we can get some runs without the
      // retry, but at least we reduce the number of test failures due to
      // this test exception from the same map task.
      if (random.nextFloat() < 0.03) {
        throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
                              + " time=" + System.currentTimeMillis());
      }
    } else {
      context.getCounter(Counter.COPY_FAILED).increment(1);
      throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
    }
  }
}
项目:ditb    文件:ExportSnapshot.java   
@Override
public void map(BytesWritable key, NullWritable value, Context context)
    throws InterruptedException, IOException {
  SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
  Path outputPath = getOutputPath(inputInfo);

  copyFile(context, inputInfo, outputPath);
}
项目:ditb    文件:ExportSnapshot.java   
public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
  this.files = new ArrayList(snapshotFiles.size());
  for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
    this.files.add(new Pair<BytesWritable, Long>(
      new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
    this.length += fileInfo.getSecond();
  }
}
项目:ditb    文件:TestExportSnapshot.java   
/**
 * Verfy the result of getBalanceSplits() method.
 * The result are groups of files, used as input list for the "export" mappers.
 * All the groups should have similar amount of data.
 *
 * The input list is a pair of file path and length.
 * The getBalanceSplits() function sort it by length,
 * and assign to each group a file, going back and forth through the groups.
 */
@Test
public void testBalanceSplit() throws Exception {
  // Create a list of files
  List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
  for (long i = 0; i <= 20; i++) {
    SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
      .setType(SnapshotFileInfo.Type.HFILE)
      .setHfile("file-" + i)
      .build();
    files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, i));
  }

  // Create 5 groups (total size 210)
  //    group 0: 20, 11, 10,  1 (total size: 42)
  //    group 1: 19, 12,  9,  2 (total size: 42)
  //    group 2: 18, 13,  8,  3 (total size: 42)
  //    group 3: 17, 12,  7,  4 (total size: 42)
  //    group 4: 16, 11,  6,  5 (total size: 42)
  List<List<Pair<SnapshotFileInfo, Long>>> splits = ExportSnapshot.getBalancedSplits(files, 5);
  assertEquals(5, splits.size());

  String[] split0 = new String[] {"file-20", "file-11", "file-10", "file-1", "file-0"};
  verifyBalanceSplit(splits.get(0), split0, 42);
  String[] split1 = new String[] {"file-19", "file-12", "file-9",  "file-2"};
  verifyBalanceSplit(splits.get(1), split1, 42);
  String[] split2 = new String[] {"file-18", "file-13", "file-8",  "file-3"};
  verifyBalanceSplit(splits.get(2), split2, 42);
  String[] split3 = new String[] {"file-17", "file-14", "file-7",  "file-4"};
  verifyBalanceSplit(splits.get(3), split3, 42);
  String[] split4 = new String[] {"file-16", "file-15", "file-6",  "file-5"};
  verifyBalanceSplit(splits.get(4), split4, 42);
}
项目:ditb    文件:TestExportSnapshot.java   
private void verifyBalanceSplit(final List<Pair<SnapshotFileInfo, Long>> split,
    final String[] expected, final long expectedSize) {
  assertEquals(expected.length, split.size());
  long totalSize = 0;
  for (int i = 0; i < expected.length; ++i) {
    Pair<SnapshotFileInfo, Long> fileInfo = split.get(i);
    assertEquals(expected[i], fileInfo.getFirst().getHfile());
    totalSize += fileInfo.getSecond();
  }
  assertEquals(expectedSize, totalSize);
}
项目:pbase    文件:ExportSnapshot.java   
@Override
public void map(BytesWritable key, NullWritable value, Context context)
    throws InterruptedException, IOException {
  SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
  Path outputPath = getOutputPath(inputInfo);

  copyFile(context, inputInfo, outputPath);
}
项目:pbase    文件:ExportSnapshot.java   
public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
  this.files = new ArrayList(snapshotFiles.size());
  for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
    this.files.add(new Pair<BytesWritable, Long>(
      new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
    this.length += fileInfo.getSecond();
  }
}
项目:pbase    文件:TestExportSnapshot.java   
/**
 * Verfy the result of getBalanceSplits() method.
 * The result are groups of files, used as input list for the "export" mappers.
 * All the groups should have similar amount of data.
 *
 * The input list is a pair of file path and length.
 * The getBalanceSplits() function sort it by length,
 * and assign to each group a file, going back and forth through the groups.
 */
@Test
public void testBalanceSplit() throws Exception {
  // Create a list of files
  List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
  for (long i = 0; i <= 20; i++) {
    SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
      .setType(SnapshotFileInfo.Type.HFILE)
      .setHfile("file-" + i)
      .build();
    files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, i));
  }

  // Create 5 groups (total size 210)
  //    group 0: 20, 11, 10,  1 (total size: 42)
  //    group 1: 19, 12,  9,  2 (total size: 42)
  //    group 2: 18, 13,  8,  3 (total size: 42)
  //    group 3: 17, 12,  7,  4 (total size: 42)
  //    group 4: 16, 11,  6,  5 (total size: 42)
  List<List<Pair<SnapshotFileInfo, Long>>> splits = ExportSnapshot.getBalancedSplits(files, 5);
  assertEquals(5, splits.size());

  String[] split0 = new String[] {"file-20", "file-11", "file-10", "file-1", "file-0"};
  verifyBalanceSplit(splits.get(0), split0, 42);
  String[] split1 = new String[] {"file-19", "file-12", "file-9",  "file-2"};
  verifyBalanceSplit(splits.get(1), split1, 42);
  String[] split2 = new String[] {"file-18", "file-13", "file-8",  "file-3"};
  verifyBalanceSplit(splits.get(2), split2, 42);
  String[] split3 = new String[] {"file-17", "file-14", "file-7",  "file-4"};
  verifyBalanceSplit(splits.get(3), split3, 42);
  String[] split4 = new String[] {"file-16", "file-15", "file-6",  "file-5"};
  verifyBalanceSplit(splits.get(4), split4, 42);
}
项目:pbase    文件:TestExportSnapshot.java   
private void verifyBalanceSplit(final List<Pair<SnapshotFileInfo, Long>> split,
    final String[] expected, final long expectedSize) {
  assertEquals(expected.length, split.size());
  long totalSize = 0;
  for (int i = 0; i < expected.length; ++i) {
    Pair<SnapshotFileInfo, Long> fileInfo = split.get(i);
    assertEquals(expected[i], fileInfo.getFirst().getHfile());
    totalSize += fileInfo.getSecond();
  }
  assertEquals(expectedSize, totalSize);
}
项目:PyroDB    文件:ExportSnapshot.java   
@Override
public void map(BytesWritable key, NullWritable value, Context context)
    throws InterruptedException, IOException {
  SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
  Path outputPath = getOutputPath(inputInfo);

  copyFile(context, inputInfo, outputPath);
}
项目:PyroDB    文件:ExportSnapshot.java   
/**
 * Create the input files, with the path to copy, for the MR job.
 * Each input files contains n files, and each input file has a similar amount data to copy.
 * The number of input files created are based on the number of mappers provided as argument
 * and the number of the files to copy.
 */
private static Path[] createInputFiles(final Configuration conf, final Path inputFolderPath,
    final List<Pair<SnapshotFileInfo, Long>> snapshotFiles, int mappers)
    throws IOException, InterruptedException {
  FileSystem fs = inputFolderPath.getFileSystem(conf);
  LOG.debug("Input folder location: " + inputFolderPath);

  List<List<SnapshotFileInfo>> splits = getBalancedSplits(snapshotFiles, mappers);
  Path[] inputFiles = new Path[splits.size()];

  BytesWritable key = new BytesWritable();
  for (int i = 0; i < inputFiles.length; i++) {
    List<SnapshotFileInfo> files = splits.get(i);
    inputFiles[i] = new Path(inputFolderPath, String.format("export-%d.seq", i));
    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inputFiles[i],
      BytesWritable.class, NullWritable.class);
    LOG.debug("Input split: " + i);
    try {
      for (SnapshotFileInfo file: files) {
        byte[] pbFileInfo = file.toByteArray();
        key.set(pbFileInfo, 0, pbFileInfo.length);
        writer.append(key, NullWritable.get());
      }
    } finally {
      writer.close();
    }
  }

  return inputFiles;
}
项目:PyroDB    文件:TestExportSnapshot.java   
/**
 * Verfy the result of getBalanceSplits() method.
 * The result are groups of files, used as input list for the "export" mappers.
 * All the groups should have similar amount of data.
 *
 * The input list is a pair of file path and length.
 * The getBalanceSplits() function sort it by length,
 * and assign to each group a file, going back and forth through the groups.
 */
@Test
public void testBalanceSplit() throws Exception {
  // Create a list of files
  List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
  for (long i = 0; i <= 20; i++) {
    SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
      .setType(SnapshotFileInfo.Type.HFILE)
      .setHfile("file-" + i)
      .build();
    files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, i));
  }

  // Create 5 groups (total size 210)
  //    group 0: 20, 11, 10,  1 (total size: 42)
  //    group 1: 19, 12,  9,  2 (total size: 42)
  //    group 2: 18, 13,  8,  3 (total size: 42)
  //    group 3: 17, 12,  7,  4 (total size: 42)
  //    group 4: 16, 11,  6,  5 (total size: 42)
  List<List<SnapshotFileInfo>> splits = ExportSnapshot.getBalancedSplits(files, 5);
  assertEquals(5, splits.size());

  String[] split0 = new String[] {"file-20", "file-11", "file-10", "file-1", "file-0"};
  verifyBalanceSplit(splits.get(0), split0);
  String[] split1 = new String[] {"file-19", "file-12", "file-9",  "file-2"};
  verifyBalanceSplit(splits.get(1), split1);
  String[] split2 = new String[] {"file-18", "file-13", "file-8",  "file-3"};
  verifyBalanceSplit(splits.get(2), split2);
  String[] split3 = new String[] {"file-17", "file-14", "file-7",  "file-4"};
  verifyBalanceSplit(splits.get(3), split3);
  String[] split4 = new String[] {"file-16", "file-15", "file-6",  "file-5"};
  verifyBalanceSplit(splits.get(4), split4);
}
项目:ditb    文件:ExportSnapshot.java   
/**
 * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
 * The groups created will have similar amounts of bytes.
 * <p>
 * The algorithm used is pretty straightforward; the file list is sorted by size,
 * and then each group fetch the bigger file available, iterating through groups
 * alternating the direction.
 */
static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
    final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
  // Sort files by size, from small to big
  Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
    public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
      long r = a.getSecond() - b.getSecond();
      return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
    }
  });

  // create balanced groups
  List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
    new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
  long[] sizeGroups = new long[ngroups];
  int hi = files.size() - 1;
  int lo = 0;

  List<Pair<SnapshotFileInfo, Long>> group;
  int dir = 1;
  int g = 0;

  while (hi >= lo) {
    if (g == fileGroups.size()) {
      group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
      fileGroups.add(group);
    } else {
      group = fileGroups.get(g);
    }

    Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);

    // add the hi one
    sizeGroups[g] += fileInfo.getSecond();
    group.add(fileInfo);

    // change direction when at the end or the beginning
    g += dir;
    if (g == ngroups) {
      dir = -1;
      g = ngroups - 1;
    } else if (g < 0) {
      dir = 1;
      g = 0;
    }
  }

  if (LOG.isDebugEnabled()) {
    for (int i = 0; i < sizeGroups.length; ++i) {
      LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
    }
  }

  return fileGroups;
}
项目:pbase    文件:ExportSnapshot.java   
/**
 * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
 * The groups created will have similar amounts of bytes.
 * <p>
 * The algorithm used is pretty straightforward; the file list is sorted by size,
 * and then each group fetch the bigger file available, iterating through groups
 * alternating the direction.
 */
static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
    final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
  // Sort files by size, from small to big
  Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
    public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
      long r = a.getSecond() - b.getSecond();
      return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
    }
  });

  // create balanced groups
  List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
    new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
  long[] sizeGroups = new long[ngroups];
  int hi = files.size() - 1;
  int lo = 0;

  List<Pair<SnapshotFileInfo, Long>> group;
  int dir = 1;
  int g = 0;

  while (hi >= lo) {
    if (g == fileGroups.size()) {
      group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
      fileGroups.add(group);
    } else {
      group = fileGroups.get(g);
    }

    Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);

    // add the hi one
    sizeGroups[g] += fileInfo.getSecond();
    group.add(fileInfo);

    // change direction when at the end or the beginning
    g += dir;
    if (g == ngroups) {
      dir = -1;
      g = ngroups - 1;
    } else if (g < 0) {
      dir = 1;
      g = 0;
    }
  }

  if (LOG.isDebugEnabled()) {
    for (int i = 0; i < sizeGroups.length; ++i) {
      LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
    }
  }

  return fileGroups;
}
项目:PyroDB    文件:ExportSnapshot.java   
/**
 * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
 * The groups created will have similar amounts of bytes.
 * <p>
 * The algorithm used is pretty straightforward; the file list is sorted by size,
 * and then each group fetch the bigger file available, iterating through groups
 * alternating the direction.
 */
static List<List<SnapshotFileInfo>> getBalancedSplits(
    final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
  // Sort files by size, from small to big
  Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
    public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
      long r = a.getSecond() - b.getSecond();
      return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
    }
  });

  // create balanced groups
  List<List<SnapshotFileInfo>> fileGroups = new LinkedList<List<SnapshotFileInfo>>();
  long[] sizeGroups = new long[ngroups];
  int hi = files.size() - 1;
  int lo = 0;

  List<SnapshotFileInfo> group;
  int dir = 1;
  int g = 0;

  while (hi >= lo) {
    if (g == fileGroups.size()) {
      group = new LinkedList<SnapshotFileInfo>();
      fileGroups.add(group);
    } else {
      group = fileGroups.get(g);
    }

    Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);

    // add the hi one
    sizeGroups[g] += fileInfo.getSecond();
    group.add(fileInfo.getFirst());

    // change direction when at the end or the beginning
    g += dir;
    if (g == ngroups) {
      dir = -1;
      g = ngroups - 1;
    } else if (g < 0) {
      dir = 1;
      g = 0;
    }
  }

  if (LOG.isDebugEnabled()) {
    for (int i = 0; i < sizeGroups.length; ++i) {
      LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
    }
  }

  return fileGroups;
}
项目:PyroDB    文件:ExportSnapshot.java   
/**
 * Run Map-Reduce Job to perform the files copy.
 */
private void runCopyJob(final Path inputRoot, final Path outputRoot,
    final List<Pair<SnapshotFileInfo, Long>> snapshotFiles, final boolean verifyChecksum,
    final String filesUser, final String filesGroup, final int filesMode,
    final int mappers, final int bandwidthMB)
        throws IOException, InterruptedException, ClassNotFoundException {
  Configuration conf = getConf();
  if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
  if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
  conf.setInt(CONF_FILES_MODE, filesMode);
  conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
  conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
  conf.set(CONF_INPUT_ROOT, inputRoot.toString());
  conf.setInt("mapreduce.job.maps", mappers);
  conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);

  Job job = new Job(conf);
  job.setJobName("ExportSnapshot");
  job.setJarByClass(ExportSnapshot.class);
  TableMapReduceUtil.addDependencyJars(job);
  job.setMapperClass(ExportMapper.class);
  job.setInputFormatClass(SequenceFileInputFormat.class);
  job.setOutputFormatClass(NullOutputFormat.class);
  job.setMapSpeculativeExecution(false);
  job.setNumReduceTasks(0);

  // Create MR Input
  Path inputFolderPath = getInputFolderPath(conf);
  for (Path path: createInputFiles(conf, inputFolderPath, snapshotFiles, mappers)) {
    LOG.debug("Add Input Path=" + path);
    SequenceFileInputFormat.addInputPath(job, path);
  }

  try {
    // Acquire the delegation Tokens
    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
      new Path[] { inputRoot, outputRoot }, conf);

    // Run the MR Job
    if (!job.waitForCompletion(true)) {
      // TODO: Replace the fixed string with job.getStatus().getFailureInfo()
      // when it will be available on all the supported versions.
      throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
    }
  } finally {
    // Remove MR Input
    try {
      inputFolderPath.getFileSystem(conf).delete(inputFolderPath, true);
    } catch (IOException e) {
      LOG.warn("Unable to remove MR input folder: " + inputFolderPath, e);
    }
  }
}
项目:PyroDB    文件:TestExportSnapshot.java   
private void verifyBalanceSplit(final List<SnapshotFileInfo> split, final String[] expected) {
  assertEquals(expected.length, split.size());
  for (int i = 0; i < expected.length; ++i) {
    assertEquals(expected[i], split.get(i).getHfile());
  }
}