@Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); int mappers = conf.getInt(CONF_NUM_SPLITS, 0); if (mappers == 0 && snapshotFiles.size() > 0) { mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); mappers = Math.min(mappers, snapshotFiles.size()); conf.setInt(CONF_NUM_SPLITS, mappers); conf.setInt(MR_NUM_MAPS, mappers); } List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers); List<InputSplit> splits = new ArrayList(groups.size()); for (List<Pair<SnapshotFileInfo, Long>> files: groups) { splits.add(new ExportSnapshotInputSplit(files)); } return splits; }
/** * Returns the location where the inputPath will be copied. */ private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException { Path path = null; switch (inputInfo.getType()) { case HFILE: Path inputPath = new Path(inputInfo.getHfile()); String family = inputPath.getParent().getName(); TableName table =HFileLink.getReferencedTableName(inputPath.getName()); String region = HFileLink.getReferencedRegionName(inputPath.getName()); String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); path = new Path(FSUtils.getTableDir(new Path("./"), table), new Path(region, new Path(family, hfile))); break; case WAL: Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME); path = new Path(oldLogsDir, inputInfo.getWalName()); break; default: throw new IOException("Invalid File Type: " + inputInfo.getType().toString()); } return new Path(outputArchive, path); }
private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo) throws IOException { if (testFailures) { if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) { if (random == null) { random = new Random(); } // FLAKY-TEST-WARN: lower is better, we can get some runs without the // retry, but at least we reduce the number of test failures due to // this test exception from the same map task. if (random.nextFloat() < 0.03) { throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo + " time=" + System.currentTimeMillis()); } } else { context.getCounter(Counter.COPY_FAILED).increment(1); throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo); } } }
@Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); String snapshotName = conf.get(CONF_SNAPSHOT_NAME); Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); int mappers = conf.getInt(CONF_NUM_SPLITS, 0); if (mappers == 0 && snapshotFiles.size() > 0) { mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); mappers = Math.min(mappers, snapshotFiles.size()); conf.setInt(CONF_NUM_SPLITS, mappers); conf.setInt(MR_NUM_MAPS, mappers); } List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers); List<InputSplit> splits = new ArrayList(groups.size()); for (List<Pair<SnapshotFileInfo, Long>> files: groups) { splits.add(new ExportSnapshotInputSplit(files)); } return splits; }
@Override public void map(BytesWritable key, NullWritable value, Context context) throws InterruptedException, IOException { SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes()); Path outputPath = getOutputPath(inputInfo); copyFile(context, inputInfo, outputPath); }
public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) { this.files = new ArrayList(snapshotFiles.size()); for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) { this.files.add(new Pair<BytesWritable, Long>( new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); this.length += fileInfo.getSecond(); } }
/** * Verfy the result of getBalanceSplits() method. * The result are groups of files, used as input list for the "export" mappers. * All the groups should have similar amount of data. * * The input list is a pair of file path and length. * The getBalanceSplits() function sort it by length, * and assign to each group a file, going back and forth through the groups. */ @Test public void testBalanceSplit() throws Exception { // Create a list of files List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>(); for (long i = 0; i <= 20; i++) { SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder() .setType(SnapshotFileInfo.Type.HFILE) .setHfile("file-" + i) .build(); files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, i)); } // Create 5 groups (total size 210) // group 0: 20, 11, 10, 1 (total size: 42) // group 1: 19, 12, 9, 2 (total size: 42) // group 2: 18, 13, 8, 3 (total size: 42) // group 3: 17, 12, 7, 4 (total size: 42) // group 4: 16, 11, 6, 5 (total size: 42) List<List<Pair<SnapshotFileInfo, Long>>> splits = ExportSnapshot.getBalancedSplits(files, 5); assertEquals(5, splits.size()); String[] split0 = new String[] {"file-20", "file-11", "file-10", "file-1", "file-0"}; verifyBalanceSplit(splits.get(0), split0, 42); String[] split1 = new String[] {"file-19", "file-12", "file-9", "file-2"}; verifyBalanceSplit(splits.get(1), split1, 42); String[] split2 = new String[] {"file-18", "file-13", "file-8", "file-3"}; verifyBalanceSplit(splits.get(2), split2, 42); String[] split3 = new String[] {"file-17", "file-14", "file-7", "file-4"}; verifyBalanceSplit(splits.get(3), split3, 42); String[] split4 = new String[] {"file-16", "file-15", "file-6", "file-5"}; verifyBalanceSplit(splits.get(4), split4, 42); }
private void verifyBalanceSplit(final List<Pair<SnapshotFileInfo, Long>> split, final String[] expected, final long expectedSize) { assertEquals(expected.length, split.size()); long totalSize = 0; for (int i = 0; i < expected.length; ++i) { Pair<SnapshotFileInfo, Long> fileInfo = split.get(i); assertEquals(expected[i], fileInfo.getFirst().getHfile()); totalSize += fileInfo.getSecond(); } assertEquals(expectedSize, totalSize); }
/** * Create the input files, with the path to copy, for the MR job. * Each input files contains n files, and each input file has a similar amount data to copy. * The number of input files created are based on the number of mappers provided as argument * and the number of the files to copy. */ private static Path[] createInputFiles(final Configuration conf, final Path inputFolderPath, final List<Pair<SnapshotFileInfo, Long>> snapshotFiles, int mappers) throws IOException, InterruptedException { FileSystem fs = inputFolderPath.getFileSystem(conf); LOG.debug("Input folder location: " + inputFolderPath); List<List<SnapshotFileInfo>> splits = getBalancedSplits(snapshotFiles, mappers); Path[] inputFiles = new Path[splits.size()]; BytesWritable key = new BytesWritable(); for (int i = 0; i < inputFiles.length; i++) { List<SnapshotFileInfo> files = splits.get(i); inputFiles[i] = new Path(inputFolderPath, String.format("export-%d.seq", i)); SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inputFiles[i], BytesWritable.class, NullWritable.class); LOG.debug("Input split: " + i); try { for (SnapshotFileInfo file: files) { byte[] pbFileInfo = file.toByteArray(); key.set(pbFileInfo, 0, pbFileInfo.length); writer.append(key, NullWritable.get()); } } finally { writer.close(); } } return inputFiles; }
/** * Verfy the result of getBalanceSplits() method. * The result are groups of files, used as input list for the "export" mappers. * All the groups should have similar amount of data. * * The input list is a pair of file path and length. * The getBalanceSplits() function sort it by length, * and assign to each group a file, going back and forth through the groups. */ @Test public void testBalanceSplit() throws Exception { // Create a list of files List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>(); for (long i = 0; i <= 20; i++) { SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder() .setType(SnapshotFileInfo.Type.HFILE) .setHfile("file-" + i) .build(); files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, i)); } // Create 5 groups (total size 210) // group 0: 20, 11, 10, 1 (total size: 42) // group 1: 19, 12, 9, 2 (total size: 42) // group 2: 18, 13, 8, 3 (total size: 42) // group 3: 17, 12, 7, 4 (total size: 42) // group 4: 16, 11, 6, 5 (total size: 42) List<List<SnapshotFileInfo>> splits = ExportSnapshot.getBalancedSplits(files, 5); assertEquals(5, splits.size()); String[] split0 = new String[] {"file-20", "file-11", "file-10", "file-1", "file-0"}; verifyBalanceSplit(splits.get(0), split0); String[] split1 = new String[] {"file-19", "file-12", "file-9", "file-2"}; verifyBalanceSplit(splits.get(1), split1); String[] split2 = new String[] {"file-18", "file-13", "file-8", "file-3"}; verifyBalanceSplit(splits.get(2), split2); String[] split3 = new String[] {"file-17", "file-14", "file-7", "file-4"}; verifyBalanceSplit(splits.get(3), split3); String[] split4 = new String[] {"file-16", "file-15", "file-6", "file-5"}; verifyBalanceSplit(splits.get(4), split4); }
/** * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible. * The groups created will have similar amounts of bytes. * <p> * The algorithm used is pretty straightforward; the file list is sorted by size, * and then each group fetch the bigger file available, iterating through groups * alternating the direction. */ static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits( final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) { // Sort files by size, from small to big Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() { public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) { long r = a.getSecond() - b.getSecond(); return (r < 0) ? -1 : ((r > 0) ? 1 : 0); } }); // create balanced groups List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<List<Pair<SnapshotFileInfo, Long>>>(); long[] sizeGroups = new long[ngroups]; int hi = files.size() - 1; int lo = 0; List<Pair<SnapshotFileInfo, Long>> group; int dir = 1; int g = 0; while (hi >= lo) { if (g == fileGroups.size()) { group = new LinkedList<Pair<SnapshotFileInfo, Long>>(); fileGroups.add(group); } else { group = fileGroups.get(g); } Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--); // add the hi one sizeGroups[g] += fileInfo.getSecond(); group.add(fileInfo); // change direction when at the end or the beginning g += dir; if (g == ngroups) { dir = -1; g = ngroups - 1; } else if (g < 0) { dir = 1; g = 0; } } if (LOG.isDebugEnabled()) { for (int i = 0; i < sizeGroups.length; ++i) { LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i])); } } return fileGroups; }
/** * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible. * The groups created will have similar amounts of bytes. * <p> * The algorithm used is pretty straightforward; the file list is sorted by size, * and then each group fetch the bigger file available, iterating through groups * alternating the direction. */ static List<List<SnapshotFileInfo>> getBalancedSplits( final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) { // Sort files by size, from small to big Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() { public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) { long r = a.getSecond() - b.getSecond(); return (r < 0) ? -1 : ((r > 0) ? 1 : 0); } }); // create balanced groups List<List<SnapshotFileInfo>> fileGroups = new LinkedList<List<SnapshotFileInfo>>(); long[] sizeGroups = new long[ngroups]; int hi = files.size() - 1; int lo = 0; List<SnapshotFileInfo> group; int dir = 1; int g = 0; while (hi >= lo) { if (g == fileGroups.size()) { group = new LinkedList<SnapshotFileInfo>(); fileGroups.add(group); } else { group = fileGroups.get(g); } Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--); // add the hi one sizeGroups[g] += fileInfo.getSecond(); group.add(fileInfo.getFirst()); // change direction when at the end or the beginning g += dir; if (g == ngroups) { dir = -1; g = ngroups - 1; } else if (g < 0) { dir = 1; g = 0; } } if (LOG.isDebugEnabled()) { for (int i = 0; i < sizeGroups.length; ++i) { LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i])); } } return fileGroups; }
/** * Run Map-Reduce Job to perform the files copy. */ private void runCopyJob(final Path inputRoot, final Path outputRoot, final List<Pair<SnapshotFileInfo, Long>> snapshotFiles, final boolean verifyChecksum, final String filesUser, final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = getConf(); if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); conf.setInt(CONF_FILES_MODE, filesMode); conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum); conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); conf.set(CONF_INPUT_ROOT, inputRoot.toString()); conf.setInt("mapreduce.job.maps", mappers); conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); Job job = new Job(conf); job.setJobName("ExportSnapshot"); job.setJarByClass(ExportSnapshot.class); TableMapReduceUtil.addDependencyJars(job); job.setMapperClass(ExportMapper.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(NullOutputFormat.class); job.setMapSpeculativeExecution(false); job.setNumReduceTasks(0); // Create MR Input Path inputFolderPath = getInputFolderPath(conf); for (Path path: createInputFiles(conf, inputFolderPath, snapshotFiles, mappers)) { LOG.debug("Add Input Path=" + path); SequenceFileInputFormat.addInputPath(job, path); } try { // Acquire the delegation Tokens TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot, outputRoot }, conf); // Run the MR Job if (!job.waitForCompletion(true)) { // TODO: Replace the fixed string with job.getStatus().getFailureInfo() // when it will be available on all the supported versions. throw new ExportSnapshotException("Copy Files Map-Reduce Job failed"); } } finally { // Remove MR Input try { inputFolderPath.getFileSystem(conf).delete(inputFolderPath, true); } catch (IOException e) { LOG.warn("Unable to remove MR input folder: " + inputFolderPath, e); } } }
private void verifyBalanceSplit(final List<SnapshotFileInfo> split, final String[] expected) { assertEquals(expected.length, split.size()); for (int i = 0; i < expected.length; ++i) { assertEquals(expected[i], split.get(i).getHfile()); } }