private void addCreatedSplit(List<InputSplit> splitList, Collection<String> locations, ArrayList<OneBlockInfo> validBlocks) { // create an input split Path[] fl = new Path[validBlocks.size()]; long[] offset = new long[validBlocks.size()]; long[] length = new long[validBlocks.size()]; for (int i = 0; i < validBlocks.size(); i++) { fl[i] = validBlocks.get(i).onepath; offset[i] = validBlocks.get(i).offset; length[i] = validBlocks.get(i).length; } // add this split to the list that is returned CombineFileSplit thissplit = new CombineFileSplit(fl, offset, length, locations.toArray(new String[0])); splitList.add(thissplit); }
public CombineFileLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException { this.path = split.getPath(index); fs = this.path.getFileSystem(context.getConfiguration()); this.startOffset = split.getOffset(index); this.end = startOffset + split.getLength(index); boolean skipFirstLine = false; //open the file fileIn =; if (startOffset != 0) { skipFirstLine = true; --startOffset;; } reader = new LineReader(fileIn); if (skipFirstLine) { // skip first line and re-establish "startOffset". startOffset += reader.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - startOffset)); } this.pos = startOffset; }
public GridmixSplit(CombineFileSplit cfsplit, int maps, int id, long inputBytes, long inputRecords, long outputBytes, long outputRecords, double[] reduceBytes, double[] reduceRecords, long[] reduceOutputBytes, long[] reduceOutputRecords) throws IOException { super(cfsplit); = id; this.maps = maps; reduces = reduceBytes.length; this.inputRecords = inputRecords; this.outputBytes = outputBytes; this.outputRecords = outputRecords; this.reduceBytes = reduceBytes; this.reduceRecords = reduceRecords; nSpec = reduceOutputBytes.length; this.reduceOutputBytes = reduceOutputBytes; this.reduceOutputRecords = reduceOutputRecords; }
public LoadSplit(CombineFileSplit cfsplit, int maps, int id, long inputBytes, long inputRecords, long outputBytes, long outputRecords, double[] reduceBytes, double[] reduceRecords, long[] reduceOutputBytes, long[] reduceOutputRecords, ResourceUsageMetrics metrics, ResourceUsageMetrics[] rMetrics) throws IOException { super(cfsplit); = id; this.maps = maps; reduces = reduceBytes.length; this.inputRecords = inputRecords; this.outputBytes = outputBytes; this.outputRecords = outputRecords; this.reduceBytes = reduceBytes; this.reduceRecords = reduceRecords; nSpec = reduceOutputBytes.length; this.reduceOutputBytes = reduceOutputBytes; this.reduceOutputRecords = reduceOutputRecords; this.mapMetrics = metrics; this.reduceMetrics = rMetrics; }
@Test public void testRepeat() throws Exception { final Configuration conf = new Configuration(); Arrays.fill(loc, ""); Arrays.fill(start, 0L); Arrays.fill(len, BLOCK); final ByteArrayOutputStream out = fillVerif(); final FileQueue q = new FileQueue(new CombineFileSplit(paths, start, len, loc), conf); final byte[] verif = out.toByteArray(); final byte[] check = new byte[2 * NFILES * BLOCK];, 0, NFILES * BLOCK); assertArrayEquals(verif, Arrays.copyOf(check, NFILES * BLOCK)); final byte[] verif2 = new byte[2 * NFILES * BLOCK]; System.arraycopy(verif, 0, verif2, 0, verif.length); System.arraycopy(verif, 0, verif2, verif.length, verif.length);, 0, 2 * NFILES * BLOCK); assertArrayEquals(verif2, check); }
@Test public void testUneven() throws Exception { final Configuration conf = new Configuration(); Arrays.fill(loc, ""); Arrays.fill(start, 0L); Arrays.fill(len, BLOCK); final int B2 = BLOCK / 2; for (int i = 0; i < NFILES; i += 2) { start[i] += B2; len[i] -= B2; } final FileQueue q = new FileQueue(new CombineFileSplit(paths, start, len, loc), conf); final ByteArrayOutputStream out = fillVerif(); final byte[] verif = out.toByteArray(); final byte[] check = new byte[NFILES / 2 * BLOCK + NFILES / 2 * B2];, 0, verif.length); assertArrayEquals(verif, Arrays.copyOf(check, verif.length));, 0, verif.length); assertArrayEquals(verif, Arrays.copyOf(check, verif.length)); }
private LoadSplit getLoadSplit() throws Exception { Path[] files = {new Path("one"), new Path("two")}; long[] start = {1, 2}; long[] lengths = {100, 200}; String[] locations = {"locOne", "loctwo"}; CombineFileSplit cfSplit = new CombineFileSplit(files, start, lengths, locations); ResourceUsageMetrics metrics = new ResourceUsageMetrics(); metrics.setCumulativeCpuUsage(200); ResourceUsageMetrics[] rMetrics = {metrics}; double[] reduceBytes = {8.1d, 8.2d}; double[] reduceRecords = {9.1d, 9.2d}; long[] reduceOutputBytes = {101L, 102L}; long[] reduceOutputRecords = {111L, 112L}; return new LoadSplit(cfSplit, 2, 1, 4L, 5L, 6L, 7L, reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords, metrics, rMetrics); }
@Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException { Configuration conf = context.getConfiguration(); CombineFileSplit cSplit = (CombineFileSplit) split; Path[] path = cSplit.getPaths(); long[] start = cSplit.getStartOffsets(); long[] len = cSplit.getLengths(); FileSystem fs = cSplit.getPath(0).getFileSystem(conf); long startTS = conf.getLong(RowInputFormat.START_TIME_MILLIS, 0l); long endTS = conf.getLong(RowInputFormat.END_TIME_MILLIS, 0l); this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, startTS, endTS); instantiateGfxdLoner(conf); }
public HDFSSplitIterator(FileSystem fs, Path[] paths, long[] offsets, long[] lengths, long startTime, long endTime) throws IOException { this.fs = fs; this.split = new CombineFileSplit(paths, offsets, lengths, null); while(currentHopIndex < split.getNumPaths() && !fs.exists(split.getPath(currentHopIndex))){ logger.warning(LocalizedStrings.HOPLOG_CLEANED_UP_BY_JANITOR, split.getPath(currentHopIndex)); currentHopIndex++; } if(currentHopIndex == split.getNumPaths()){ this.hoplog = null; iterator = null; } else { this.hoplog = getHoplog(fs,split.getPath(currentHopIndex)); iterator = hoplog.getReader().scan(split.getOffset(currentHopIndex), split.getLength(currentHopIndex)); } this.startTime = startTime; this.endTime = endTime; }
public CombineFileLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException { fs = FileSystem.get(context.getConfiguration()); this.path = split.getPath(index); this.startOffset = split.getOffset(index); this.end = startOffset + split.getLength(index); boolean skipFirstLine = false; //open the file fileIn =; if (startOffset != 0) { skipFirstLine = true; --startOffset;; } reader = new LineReader(fileIn); if (skipFirstLine) { // skip first line and re-establish "startOffset". startOffset += reader.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - startOffset)); } this.pos = startOffset; }
/** * Set the number of locations in the split to SPLIT_MAX_NUM_LOCATIONS if it is larger than * SPLIT_MAX_NUM_LOCATIONS (MAPREDUCE-5186). */ private List<InputSplit> cleanSplits(List<InputSplit> splits) throws IOException { List<InputSplit> cleanedSplits = Lists.newArrayList(); for (int i = 0; i < splits.size(); i++) { CombineFileSplit oldSplit = (CombineFileSplit) splits.get(i); String[] locations = oldSplit.getLocations(); Preconditions.checkNotNull(locations, "CombineFileSplit.getLocations() returned null"); if (locations.length > SPLIT_MAX_NUM_LOCATIONS) { locations = Arrays.copyOf(locations, SPLIT_MAX_NUM_LOCATIONS); } cleanedSplits .add(new CombineFileSplit(oldSplit.getPaths(), oldSplit.getStartOffsets(), oldSplit.getLengths(), locations)); } return cleanedSplits; }
/** * Constructor */ public BinaryFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context, Integer pathToProcess) { processed = false; fileToRead = fileSplit.getPath(pathToProcess); fileLength = fileSplit.getLength(pathToProcess); config = context.getConfiguration(); assert 0 == fileSplit.getOffset(pathToProcess); try { FileSystem dfs = FileSystem.get(config); assert dfs.getFileStatus(fileToRead).getLen() == fileLength; } catch(Exception e) { e.printStackTrace(); } key = new Text(Path.getPathWithoutSchemeAndAuthority(fileToRead).toString()); value = new BytesWritable(); }