public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException { List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen); List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen); JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif); JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2); JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2); zips.foreach( splits -> { Path path = splits._1.getPath(); FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1); FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2); writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir+"/"+path.getParent().getName()+"_"+splits._1.getStart()+".fq"); }); }
private static void splitFastq(FileStatus fst, String fqPath, String splitDir, int splitlen, JavaSparkContext sc) throws IOException { Path fqpath = new Path(fqPath); String fqname = fqpath.getName(); String[] ns = fqname.split("\\."); //TODO: Handle also compressed files List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen); JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif); splitRDD.foreach( split -> { FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), split); writeFastqFile(fqreader, new Configuration(), splitDir + "/split_" + split.getStart() + "." + ns[1]); }); }
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException { String[] ns = fst.getPath().getName().split("\\."); //TODO: Handle also compressed files List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen); List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen); JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif); JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2); JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2); zips.foreach( splits -> { Path path = splits._1.getPath(); FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1); FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2); writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir, path.getParent().getName()+"_"+splits._1.getStart()+".fq"); }); }
/** * 初始化读取资源以及相关的参数也可以放到initialize()方法中去执行 * @param inputSplit * @param context * @throws IOException */ public XMLRecordReader(InputSplit inputSplit, Configuration context) throws IOException { /** * 获取开传入的开始和结束标签 */ startTag = context.get(START_TAG_KEY).getBytes("UTF-8"); endTag = context.get(END_TAG_KEY).getBytes("UTF-8"); FileSplit fileSplit = (FileSplit) inputSplit; /** * 获取分片的开始位置和结束的位置 */ start = fileSplit.getStart(); end = start + fileSplit.getLength(); Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(context); /** * 根据分片打开一个HDFS的文件输入流 */ fsin = fs.open(fileSplit.getPath()); /** * 定位到分片开始的位置 */ fsin.seek(start); }
private List<InputSplit> createSplits(JobContext jobContext, List<DynamicInputChunk> chunks) throws IOException { int numMaps = getNumMapTasks(jobContext.getConfiguration()); final int nSplits = Math.min(numMaps, chunks.size()); List<InputSplit> splits = new ArrayList<>(nSplits); for (int i = 0; i < nSplits; ++i) { TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i); chunks.get(i).assignTo(taskId); splits.add(new FileSplit(chunks.get(i).getPath(), 0, // Setting non-zero length for FileSplit size, to avoid a possible // future when 0-sized file-splits are considered "empty" and skipped // over. getMinRecordsPerChunk(jobContext.getConfiguration()), null)); } ConfigurationUtil.publish(jobContext.getConfiguration(), CONF_LABEL_NUM_SPLITS, splits.size()); return splits; }
@Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); keyColName = conf.get(MergeJob.MERGE_KEY_COL_KEY); InputSplit is = context.getInputSplit(); FileSplit fs = (FileSplit) is; Path splitPath = fs.getPath(); if (splitPath.toString().startsWith( conf.get(MergeJob.MERGE_NEW_PATH_KEY))) { this.isNew = true; } else if (splitPath.toString().startsWith( conf.get(MergeJob.MERGE_OLD_PATH_KEY))) { this.isNew = false; } else { throw new IOException("File " + splitPath + " is not under new path " + conf.get(MergeJob.MERGE_NEW_PATH_KEY) + " or old path " + conf.get(MergeJob.MERGE_OLD_PATH_KEY)); } }
@Test public void testMaxBlockLocationsNewSplits() throws Exception { TEST_DIR.mkdirs(); try { Configuration conf = new Configuration(); conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4); Path submitDir = new Path(TEST_DIR.getAbsolutePath()); FileSystem fs = FileSystem.getLocal(conf); FileSplit split = new FileSplit(new Path("/some/path"), 0, 1, new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" }); JobSplitWriter.createSplitFiles(submitDir, conf, fs, new FileSplit[] { split }); JobSplit.TaskSplitMetaInfo[] infos = SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf, submitDir); assertEquals("unexpected number of splits", 1, infos.length); assertEquals("unexpected number of split locations", 4, infos[0].getLocations().length); } finally { FileUtil.fullyDelete(TEST_DIR); } }
@Test public void testMaxBlockLocationsOldSplits() throws Exception { TEST_DIR.mkdirs(); try { Configuration conf = new Configuration(); conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4); Path submitDir = new Path(TEST_DIR.getAbsolutePath()); FileSystem fs = FileSystem.getLocal(conf); org.apache.hadoop.mapred.FileSplit split = new org.apache.hadoop.mapred.FileSplit(new Path("/some/path"), 0, 1, new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" }); JobSplitWriter.createSplitFiles(submitDir, conf, fs, new org.apache.hadoop.mapred.InputSplit[] { split }); JobSplit.TaskSplitMetaInfo[] infos = SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf, submitDir); assertEquals("unexpected number of splits", 1, infos.length); assertEquals("unexpected number of split locations", 4, infos[0].getLocations().length); } finally { FileUtil.fullyDelete(TEST_DIR); } }
@Override public List<InputSplit> getSplits(JobContext job) throws IOException { if (job == lastContext) { return lastResult; } long t1, t2, t3; t1 = System.currentTimeMillis(); lastContext = job; lastResult = super.getSplits(job); t2 = System.currentTimeMillis(); System.out.println("Spent " + (t2 - t1) + "ms computing base-splits."); if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) { TeraScheduler scheduler = new TeraScheduler( lastResult.toArray(new FileSplit[0]), job.getConfiguration()); lastResult = scheduler.getNewFileSplits(); t3 = System.currentTimeMillis(); System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits."); } return lastResult; }
public TeraScheduler(FileSplit[] realSplits, Configuration conf) throws IOException { this.realSplits = realSplits; this.slotsPerHost = conf.getInt(TTConfig.TT_MAP_SLOTS, 4); Map<String, Host> hostTable = new HashMap<String, Host>(); splits = new Split[realSplits.length]; for(FileSplit realSplit: realSplits) { Split split = new Split(realSplit.getPath().toString()); splits[remainingSplits++] = split; for(String hostname: realSplit.getLocations()) { Host host = hostTable.get(hostname); if (host == null) { host = new Host(hostname); hostTable.put(hostname, host); hosts.add(host); } host.splits.add(split); split.locations.add(host); } } }
/** * Solve the schedule and modify the FileSplit array to reflect the new * schedule. It will move placed splits to front and unplacable splits * to the end. * @return a new list of FileSplits that are modified to have the * best host as the only host. * @throws IOException */ public List<InputSplit> getNewFileSplits() throws IOException { solve(); FileSplit[] result = new FileSplit[realSplits.length]; int left = 0; int right = realSplits.length - 1; for(int i=0; i < splits.length; ++i) { if (splits[i].isAssigned) { // copy the split and fix up the locations String[] newLocations = {splits[i].locations.get(0).hostname}; realSplits[i] = new FileSplit(realSplits[i].getPath(), realSplits[i].getStart(), realSplits[i].getLength(), newLocations); result[left++] = realSplits[i]; } else { result[right--] = realSplits[i]; } } List<InputSplit> ret = new ArrayList<InputSplit>(); for (FileSplit fs : result) { ret.add(fs); } return ret; }
private List<InputSplit> createSplits(JobContext jobContext, List<DynamicInputChunk> chunks) throws IOException { int numMaps = getNumMapTasks(jobContext.getConfiguration()); final int nSplits = Math.min(numMaps, chunks.size()); List<InputSplit> splits = new ArrayList<InputSplit>(nSplits); for (int i=0; i< nSplits; ++i) { TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i); chunks.get(i).assignTo(taskId); splits.add(new FileSplit(chunks.get(i).getPath(), 0, // Setting non-zero length for FileSplit size, to avoid a possible // future when 0-sized file-splits are considered "empty" and skipped // over. getMinRecordsPerChunk(jobContext.getConfiguration()), null)); } DistCpUtils.publish(jobContext.getConfiguration(), CONF_LABEL_NUM_SPLITS, splits.size()); return splits; }
private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException { long lastEnd = 0; //Verify if each split's start is matching with the previous end and //we are not missing anything for (InputSplit split : splits) { FileSplit fileSplit = (FileSplit) split; long start = fileSplit.getStart(); Assert.assertEquals(lastEnd, start); lastEnd = start + fileSplit.getLength(); } //Verify there is nothing more to read from the input file SequenceFile.Reader reader = new SequenceFile.Reader(cluster.getFileSystem().getConf(), SequenceFile.Reader.file(listFile)); try { reader.seek(lastEnd); CopyListingFileStatus srcFileStatus = new CopyListingFileStatus(); Text srcRelPath = new Text(); Assert.assertFalse(reader.next(srcRelPath, srcFileStatus)); } finally { IOUtils.closeStream(reader); } }
@Override public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException { final JobConf jobConf = new JobConf(jobCtxt.getConfiguration()); final JobClient client = new JobClient(jobConf); ClusterStatus stat = client.getClusterStatus(true); int numTrackers = stat.getTaskTrackers(); final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1); // Total size of distributed cache files to be generated final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1); // Get the path of the special file String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST); if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) { throw new RuntimeException("Invalid metadata: #files (" + fileCount + "), total_size (" + totalSize + "), filelisturi (" + distCacheFileList + ")"); } Path sequenceFile = new Path(distCacheFileList); FileSystem fs = sequenceFile.getFileSystem(jobConf); FileStatus srcst = fs.getFileStatus(sequenceFile); // Consider the number of TTs * mapSlotsPerTracker as number of mappers. int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2); int numSplits = numTrackers * numMapSlotsPerTracker; List<InputSplit> splits = new ArrayList<InputSplit>(numSplits); LongWritable key = new LongWritable(); BytesWritable value = new BytesWritable(); // Average size of data to be generated by each map task final long targetSize = Math.max(totalSize / numSplits, DistributedCacheEmulator.AVG_BYTES_PER_MAP); long splitStartPosition = 0L; long splitEndPosition = 0L; long acc = 0L; long bytesRemaining = srcst.getLen(); SequenceFile.Reader reader = null; try { reader = new SequenceFile.Reader(fs, sequenceFile, jobConf); while (reader.next(key, value)) { // If adding this file would put this split past the target size, // cut the last split and put this file in the next split. if (acc + key.get() > targetSize && acc != 0) { long splitSize = splitEndPosition - splitStartPosition; splits.add(new FileSplit( sequenceFile, splitStartPosition, splitSize, (String[])null)); bytesRemaining -= splitSize; splitStartPosition = splitEndPosition; acc = 0L; } acc += key.get(); splitEndPosition = reader.getPosition(); } } finally { if (reader != null) { reader.close(); } } if (bytesRemaining != 0) { splits.add(new FileSplit( sequenceFile, splitStartPosition, bytesRemaining, (String[])null)); } return splits; }
public SingleFastqRecordReader(Configuration conf, FileSplit split) throws IOException { file = split.getPath(); start = split.getStart(); end = start + split.getLength(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream fileIn = fs.open(file); CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); CompressionCodec codec = codecFactory.getCodec(file); if (codec == null) { // no codec. Uncompressed file. positionAtFirstRecord(fileIn); inputStream = fileIn; } else { // compressed file if (start != 0) { throw new RuntimeException("Start position for compressed file is not 0! (found " + start + ")"); } inputStream = codec.createInputStream(fileIn); end = Long.MAX_VALUE; // read until the end of the file } lineReader = new LineReader(inputStream); }
@Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { // what follows is very questionable but a quick test // the file is read from HDFS and copied to a temporary location FileSplit split = (FileSplit)inputSplit; Configuration job = context.getConfiguration(); Path file = split.getPath(); FileSystem fs = file.getFileSystem(job); java.nio.file.Path tmpFile = Files.createTempFile("tmp", ".zip"); // consider using job and task IDs? FSDataInputStream fileIn = fs.open(file); FileOutputStream fileOut = new FileOutputStream(tmpFile.toFile()); LOG.info("Copying from {} to {}", file, tmpFile); IOUtils.copyBytes(fileIn, fileOut, 100000, true); // having copied the file out of HDFS onto the local FS in a temp folder, we prepare it (sorts files) java.nio.file.Path tmpSpace = Files.createTempDirectory("tmp-" + context.getTaskAttemptID().getJobID().getId() + ":" + context.getTaskAttemptID().getId()); reader = new DwCAReader(tmpFile.toAbsolutePath().toString(), tmpSpace.toAbsolutePath().toString()); nextKeyValue(); }
/** * {@inheritDoc} */ protected void map(final Object key, final OrcStruct value, final Context context) throws IOException, InterruptedException { if (value!= null && value.toString() != null && value.toString().isEmpty()) { return; } // Mapper sends data with parent directory path as keys to retain directory structure final FileSplit fileSplit = (FileSplit) context.getInputSplit(); final Path filePath = fileSplit.getPath(); final String parentFilePath = String.format("%s/", filePath.getParent().toString()); log.debug("Parent file path {}", parentFilePath); if (!fileSizesMap.containsKey(filePath.toString())) { if (fileSystem == null){ final URI uri = URI.create(filePath.toString()); fileSystem = FileSystem.get(uri, configuration); } final FileStatus[] listStatuses = fileSystem.listStatus(filePath); for (FileStatus fileStatus : listStatuses) { if (!fileStatus.isDirectory()) { fileSizesMap.put(fileStatus.getPath().toString(), fileStatus.getLen()); log.info("Entry added to fileSizes Map {} {}", fileStatus.getPath().toString(), fileStatus.getLen()); } } } final Text parentFilePathKey = new Text(parentFilePath); final Text filePathKey = new Text(filePath.toString()); final OrcValue orcValue = new OrcValue(); orcValue.value = value; final Long fileSize = fileSizesMap.get(filePath.toString()); if (fileSize < threshold) { context.write(parentFilePathKey, orcValue); } else { context.write(filePathKey, orcValue); } }
/** * Returns a split for each store files directory using the block location * of each file as locality reference. */ @Override public List<InputSplit> getSplits(JobContext job) throws IOException { List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); Text key = new Text(); for (FileStatus file: files) { Path path = file.getPath(); FileSystem fs = path.getFileSystem(job.getConfiguration()); LineReader reader = new LineReader(fs.open(path)); long pos = 0; int n; try { while ((n = reader.readLine(key)) > 0) { String[] hosts = getStoreDirHosts(fs, path); splits.add(new FileSplit(path, pos, n, hosts)); pos += n; } } finally { reader.close(); } } return splits; }
private static SortedSet<byte[]> readFileToSearch(final Configuration conf, final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException, InterruptedException { SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR); // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is // what is missing. TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr = new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) { InputSplit is = new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {}); rr.initialize(is, context); while (rr.nextKeyValue()) { rr.getCurrentKey(); BytesWritable bw = rr.getCurrentValue(); if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) { byte[] key = new byte[rr.getCurrentKey().getLength()]; System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey() .getLength()); result.add(key); } } } return result; }
@Test public void testPartialXML2() throws Exception { File f = createFile(xml3); // Create FileSplit Path p = new Path(f.toURI().toString()); WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); reader.initialize(split, ctx); assertTrue(reader.nextKeyValue()); testXML(reader.getCurrentValue(), "A", "B", ""); assertTrue(reader.nextKeyValue()); testXML(reader.getCurrentValue(), "C", "D", ""); assertTrue(reader.nextKeyValue()); try { testXML(reader.getCurrentValue(), "E", "", ""); fail("Fragment returned, and it somehow passed XML parsing."); } catch (SAXParseException e) { // ignore } assertTrue(!reader.nextKeyValue()); }
@Override public List<InputSplit> getSplits(JobContext job) throws IOException { if (job == lastContext) { return lastResult; } long t1, t2, t3; t1 = System.currentTimeMillis(); lastContext = job; lastResult = super.getSplits(job); t2 = System.currentTimeMillis(); System.out.println("Spent " + (t2 - t1) + "ms computing base-splits."); if (job.getConfiguration().getBoolean(TeraSortConfigKeys.USE_TERA_SCHEDULER.key(), TeraSortConfigKeys.DEFAULT_USE_TERA_SCHEDULER)) { TeraScheduler scheduler = new TeraScheduler( lastResult.toArray(new FileSplit[0]), job.getConfiguration()); lastResult = scheduler.getNewFileSplits(); t3 = System.currentTimeMillis(); System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits."); } return lastResult; }
@Override public void readFields(DataInput in) throws IOException { Path file = new Path(in.readUTF()); long start = in.readLong(); long length = in.readLong(); String [] hosts = null; if(in.readBoolean()) { int numHosts = in.readInt(); hosts = new String[numHosts]; for(int i = 0; i < numHosts; i++) hosts[i] = in.readUTF(); } fileSplit = new FileSplit(file, start, length, hosts); partition = in.readInt(); }
public void testPartialXML2WithNoPartialRecordsReturned() throws Exception { conf.set(AggregatingRecordReader.RETURN_PARTIAL_MATCHES, Boolean.toString(false)); File f = createFile(xml3); // Create FileSplit Path p = new Path(f.toURI().toString()); WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); reader.initialize(split, ctx); assertTrue(reader.nextKeyValue()); testXML(reader.getCurrentValue(), "A", "B", ""); assertTrue(reader.nextKeyValue()); testXML(reader.getCurrentValue(), "C", "D", ""); assertTrue(!reader.nextKeyValue()); }
@Override public void initialize(InputSplit inSplit, TaskAttemptContext context) throws IOException, InterruptedException { initConfig(context); initDocType(); initDelimConf(); setFile(((FileSplit) inSplit).getPath()); fs = file.getFileSystem(context.getConfiguration()); FileStatus status = fs.getFileStatus(file); if (status.isDirectory()) { iterator = new FileIterator((FileSplit)inSplit, context); inSplit = iterator.next(); } initStream(inSplit); }
@Override public void initialize(InputSplit inSplit, TaskAttemptContext context) throws IOException, InterruptedException { initConfig(context); allowEmptyMeta = conf.getBoolean( CONF_INPUT_ARCHIVE_METADATA_OPTIONAL, false); setFile(((FileSplit) inSplit).getPath()); fs = file.getFileSystem(context.getConfiguration()); FileStatus status = fs.getFileStatus(file); if(status.isDirectory()) { iterator = new FileIterator((FileSplit)inSplit, context); inSplit = iterator.next(); } initStream(inSplit); }
@Test public void testIncorrectArgs() throws Exception { File f = createFile(xml1); // Create FileSplit Path p = new Path(f.toURI().toString()); WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0); AggregatingRecordReader reader = new AggregatingRecordReader(); try { // Clear the values for BEGIN and STOP TOKEN conf.set(AggregatingRecordReader.START_TOKEN, null); conf.set(AggregatingRecordReader.END_TOKEN, null); reader.initialize(split, ctx); // If we got here, then the code didnt throw an exception fail(); } catch (Exception e) { // Do nothing, we succeeded f = null; } reader.close(); }
protected void initStreamReader(InputSplit inSplit) throws IOException, InterruptedException { start = 0; end = inSplit.getLength(); overflow = false; fInputStream = openFile(inSplit, true); if (fInputStream == null) { return; } try { xmlSR = f.createXMLStreamReader(fInputStream, encoding); } catch (XMLStreamException e) { LOG.error(e.getMessage(), e); } if (useAutomaticId) { idGen = new IdGenerator(file.toUri().getPath() + "-" + ((FileSplit) inSplit).getStart()); } }
public FSDataInputStream openFile(InputSplit inSplit, boolean configCol) throws IOException { while (true) { setFile(((FileSplit) inSplit).getPath()); if (configCol) { configFileNameAsCollection(conf, file); } try { return fs.open(file); } catch (IllegalArgumentException e){ LOG.error("Input file skipped, reason: " + e.getMessage()); if (iterator != null && iterator.hasNext()) { inSplit = iterator.next(); } else { return null; } } } }
public void readFields(DataInput in) throws IOException { // splits int splitSize = in.readInt(); splits = new ArrayList<FileSplit>(); for (int i = 0; i < splitSize; i++) { Path path = new Path(Text.readString(in)); long start = in.readLong(); long len = in.readLong(); FileSplit split = new FileSplit(path, start, len, null); splits.add(split); } // length length = in.readLong(); // locations locations = new HashSet<String>(); }