Java 类org.apache.hadoop.mapreduce.lib.input.FileSplit 实例源码

项目:ViraPipe    文件:InterleaveMulti.java   
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {

    List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
    List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);

    JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
    JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
    JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);

    zips.foreach( splits ->  {
      Path path = splits._1.getPath();
      FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
      FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);
      writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir+"/"+path.getParent().getName()+"_"+splits._1.getStart()+".fq");
    });
  }
项目:ViraPipe    文件:InterleaveMulti.java   
private static void splitFastq(FileStatus fst, String fqPath, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
  Path fqpath = new Path(fqPath);
  String fqname = fqpath.getName();
  String[] ns = fqname.split("\\.");
  //TODO: Handle also compressed files
  List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);

  JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);

  splitRDD.foreach( split ->  {

    FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), split);
    writeFastqFile(fqreader, new Configuration(), splitDir + "/split_" + split.getStart() + "." + ns[1]);

   });
}
项目:ViraPipe    文件:Decompress.java   
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {

    List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
    List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);

    JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
    JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
    JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);

    zips.foreach( splits ->  {
      Path path = splits._1.getPath();
      FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
      FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);

      writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir+"/"+path.getParent().getName()+"_"+splits._1.getStart()+".fq");
    });
  }
项目:ViraPipe    文件:DecompressInterleave.java   
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {

    String[] ns = fst.getPath().getName().split("\\.");
    //TODO: Handle also compressed files
    List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
    List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);

    JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
    JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
    JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);

    zips.foreach( splits ->  {
      Path path = splits._1.getPath();
      FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
      FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);
      writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir, path.getParent().getName()+"_"+splits._1.getStart()+".fq");
    });
  }
项目:Wikipedia-Index    文件:XmlInputFormat.java   
/**
 * 初始化读取资源以及相关的参数也可以放到initialize()方法中去执行
 * @param inputSplit
 * @param context
 * @throws IOException
 */
public XMLRecordReader(InputSplit inputSplit, Configuration context) throws IOException {
    /**
     * 获取开传入的开始和结束标签
     */
    startTag = context.get(START_TAG_KEY).getBytes("UTF-8");
    endTag = context.get(END_TAG_KEY).getBytes("UTF-8");
    FileSplit fileSplit = (FileSplit) inputSplit;
    /**
     * 获取分片的开始位置和结束的位置
     */
    start = fileSplit.getStart();
    end = start + fileSplit.getLength();
    Path file = fileSplit.getPath();
    FileSystem fs = file.getFileSystem(context);
    /**
     * 根据分片打开一个HDFS的文件输入流
     */
    fsin = fs.open(fileSplit.getPath());
    /**
     * 定位到分片开始的位置
     */
    fsin.seek(start);
}
项目:circus-train    文件:DynamicInputFormat.java   
private List<InputSplit> createSplits(JobContext jobContext, List<DynamicInputChunk> chunks) throws IOException {
  int numMaps = getNumMapTasks(jobContext.getConfiguration());

  final int nSplits = Math.min(numMaps, chunks.size());
  List<InputSplit> splits = new ArrayList<>(nSplits);

  for (int i = 0; i < nSplits; ++i) {
    TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
    chunks.get(i).assignTo(taskId);
    splits.add(new FileSplit(chunks.get(i).getPath(), 0,
        // Setting non-zero length for FileSplit size, to avoid a possible
        // future when 0-sized file-splits are considered "empty" and skipped
        // over.
        getMinRecordsPerChunk(jobContext.getConfiguration()), null));
  }
  ConfigurationUtil.publish(jobContext.getConfiguration(), CONF_LABEL_NUM_SPLITS, splits.size());
  return splits;
}
项目:aliyun-maxcompute-data-collectors    文件:MergeMapperBase.java   
@Override
protected void setup(Context context)
    throws IOException, InterruptedException {
  Configuration conf = context.getConfiguration();
  keyColName = conf.get(MergeJob.MERGE_KEY_COL_KEY);

  InputSplit is = context.getInputSplit();
  FileSplit fs = (FileSplit) is;
  Path splitPath = fs.getPath();

  if (splitPath.toString().startsWith(
      conf.get(MergeJob.MERGE_NEW_PATH_KEY))) {
    this.isNew = true;
  } else if (splitPath.toString().startsWith(
      conf.get(MergeJob.MERGE_OLD_PATH_KEY))) {
    this.isNew = false;
  } else {
    throw new IOException("File " + splitPath + " is not under new path "
        + conf.get(MergeJob.MERGE_NEW_PATH_KEY) + " or old path "
        + conf.get(MergeJob.MERGE_OLD_PATH_KEY));
  }
}
项目:hadoop    文件:TestJobSplitWriter.java   
@Test
public void testMaxBlockLocationsNewSplits() throws Exception {
  TEST_DIR.mkdirs();
  try {
    Configuration conf = new Configuration();
    conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
    Path submitDir = new Path(TEST_DIR.getAbsolutePath());
    FileSystem fs = FileSystem.getLocal(conf);
    FileSplit split = new FileSplit(new Path("/some/path"), 0, 1,
        new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
    JobSplitWriter.createSplitFiles(submitDir, conf, fs,
        new FileSplit[] { split });
    JobSplit.TaskSplitMetaInfo[] infos =
        SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
            submitDir);
    assertEquals("unexpected number of splits", 1, infos.length);
    assertEquals("unexpected number of split locations",
        4, infos[0].getLocations().length);
  } finally {
    FileUtil.fullyDelete(TEST_DIR);
  }
}
项目:hadoop    文件:TestJobSplitWriter.java   
@Test
public void testMaxBlockLocationsOldSplits() throws Exception {
  TEST_DIR.mkdirs();
  try {
    Configuration conf = new Configuration();
    conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
    Path submitDir = new Path(TEST_DIR.getAbsolutePath());
    FileSystem fs = FileSystem.getLocal(conf);
    org.apache.hadoop.mapred.FileSplit split =
        new org.apache.hadoop.mapred.FileSplit(new Path("/some/path"), 0, 1,
            new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
    JobSplitWriter.createSplitFiles(submitDir, conf, fs,
        new org.apache.hadoop.mapred.InputSplit[] { split });
    JobSplit.TaskSplitMetaInfo[] infos =
        SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
            submitDir);
    assertEquals("unexpected number of splits", 1, infos.length);
    assertEquals("unexpected number of split locations",
        4, infos[0].getLocations().length);
  } finally {
    FileUtil.fullyDelete(TEST_DIR);
  }
}
项目:hadoop    文件:TeraInputFormat.java   
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
  if (job == lastContext) {
    return lastResult;
  }
  long t1, t2, t3;
  t1 = System.currentTimeMillis();
  lastContext = job;
  lastResult = super.getSplits(job);
  t2 = System.currentTimeMillis();
  System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
  if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
    TeraScheduler scheduler = new TeraScheduler(
      lastResult.toArray(new FileSplit[0]), job.getConfiguration());
    lastResult = scheduler.getNewFileSplits();
    t3 = System.currentTimeMillis(); 
    System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
  }
  return lastResult;
}
项目:hadoop    文件:TeraScheduler.java   
public TeraScheduler(FileSplit[] realSplits,
                     Configuration conf) throws IOException {
  this.realSplits = realSplits;
  this.slotsPerHost = conf.getInt(TTConfig.TT_MAP_SLOTS, 4);
  Map<String, Host> hostTable = new HashMap<String, Host>();
  splits = new Split[realSplits.length];
  for(FileSplit realSplit: realSplits) {
    Split split = new Split(realSplit.getPath().toString());
    splits[remainingSplits++] = split;
    for(String hostname: realSplit.getLocations()) {
      Host host = hostTable.get(hostname);
      if (host == null) {
        host = new Host(hostname);
        hostTable.put(hostname, host);
        hosts.add(host);
      }
      host.splits.add(split);
      split.locations.add(host);
    }
  }
}
项目:hadoop    文件:TeraScheduler.java   
/**
 * Solve the schedule and modify the FileSplit array to reflect the new
 * schedule. It will move placed splits to front and unplacable splits
 * to the end.
 * @return a new list of FileSplits that are modified to have the
 *    best host as the only host.
 * @throws IOException
 */
public List<InputSplit> getNewFileSplits() throws IOException {
  solve();
  FileSplit[] result = new FileSplit[realSplits.length];
  int left = 0;
  int right = realSplits.length - 1;
  for(int i=0; i < splits.length; ++i) {
    if (splits[i].isAssigned) {
      // copy the split and fix up the locations
      String[] newLocations = {splits[i].locations.get(0).hostname};
      realSplits[i] = new FileSplit(realSplits[i].getPath(),
          realSplits[i].getStart(), realSplits[i].getLength(), newLocations);
      result[left++] = realSplits[i];
    } else {
      result[right--] = realSplits[i];
    }
  }
  List<InputSplit> ret = new ArrayList<InputSplit>();
  for (FileSplit fs : result) {
    ret.add(fs);
  }
  return ret;
}
项目:hadoop    文件:DynamicInputFormat.java   
private List<InputSplit> createSplits(JobContext jobContext,
                                      List<DynamicInputChunk> chunks)
        throws IOException {
  int numMaps = getNumMapTasks(jobContext.getConfiguration());

  final int nSplits = Math.min(numMaps, chunks.size());
  List<InputSplit> splits = new ArrayList<InputSplit>(nSplits);

  for (int i=0; i< nSplits; ++i) {
    TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
    chunks.get(i).assignTo(taskId);
    splits.add(new FileSplit(chunks.get(i).getPath(), 0,
        // Setting non-zero length for FileSplit size, to avoid a possible
        // future when 0-sized file-splits are considered "empty" and skipped
        // over.
        getMinRecordsPerChunk(jobContext.getConfiguration()),
        null));
  }
  DistCpUtils.publish(jobContext.getConfiguration(),
                      CONF_LABEL_NUM_SPLITS, splits.size());
  return splits;
}
项目:hadoop    文件:TestUniformSizeInputFormat.java   
private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
  long lastEnd = 0;

  //Verify if each split's start is matching with the previous end and
  //we are not missing anything
  for (InputSplit split : splits) {
    FileSplit fileSplit = (FileSplit) split;
    long start = fileSplit.getStart();
    Assert.assertEquals(lastEnd, start);
    lastEnd = start + fileSplit.getLength();
  }

  //Verify there is nothing more to read from the input file
  SequenceFile.Reader reader
          = new SequenceFile.Reader(cluster.getFileSystem().getConf(),
                  SequenceFile.Reader.file(listFile));

  try {
    reader.seek(lastEnd);
    CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
    Text srcRelPath = new Text();
    Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
  } finally {
    IOUtils.closeStream(reader);
  }
}
项目:hadoop    文件:GenerateDistCacheData.java   
@Override
public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
  final JobConf jobConf = new JobConf(jobCtxt.getConfiguration());
  final JobClient client = new JobClient(jobConf);
  ClusterStatus stat = client.getClusterStatus(true);
  int numTrackers = stat.getTaskTrackers();
  final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1);

  // Total size of distributed cache files to be generated
  final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1);
  // Get the path of the special file
  String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST);
  if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) {
    throw new RuntimeException("Invalid metadata: #files (" + fileCount
        + "), total_size (" + totalSize + "), filelisturi ("
        + distCacheFileList + ")");
  }

  Path sequenceFile = new Path(distCacheFileList);
  FileSystem fs = sequenceFile.getFileSystem(jobConf);
  FileStatus srcst = fs.getFileStatus(sequenceFile);
  // Consider the number of TTs * mapSlotsPerTracker as number of mappers.
  int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2);
  int numSplits = numTrackers * numMapSlotsPerTracker;

  List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
  LongWritable key = new LongWritable();
  BytesWritable value = new BytesWritable();

  // Average size of data to be generated by each map task
  final long targetSize = Math.max(totalSize / numSplits,
                            DistributedCacheEmulator.AVG_BYTES_PER_MAP);
  long splitStartPosition = 0L;
  long splitEndPosition = 0L;
  long acc = 0L;
  long bytesRemaining = srcst.getLen();
  SequenceFile.Reader reader = null;
  try {
    reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
    while (reader.next(key, value)) {

      // If adding this file would put this split past the target size,
      // cut the last split and put this file in the next split.
      if (acc + key.get() > targetSize && acc != 0) {
        long splitSize = splitEndPosition - splitStartPosition;
        splits.add(new FileSplit(
            sequenceFile, splitStartPosition, splitSize, (String[])null));
        bytesRemaining -= splitSize;
        splitStartPosition = splitEndPosition;
        acc = 0L;
      }
      acc += key.get();
      splitEndPosition = reader.getPosition();
    }
  } finally {
    if (reader != null) {
      reader.close();
    }
  }
  if (bytesRemaining != 0) {
    splits.add(new FileSplit(
        sequenceFile, splitStartPosition, bytesRemaining, (String[])null));
  }

  return splits;
}
项目:SparkSeq    文件:SingleFastqInputFormat.java   
public SingleFastqRecordReader(Configuration conf, FileSplit split) throws IOException {
    file = split.getPath();
    start = split.getStart();
    end = start + split.getLength();

    FileSystem fs = file.getFileSystem(conf);
    FSDataInputStream fileIn = fs.open(file);

    CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
    CompressionCodec codec        = codecFactory.getCodec(file);

    if (codec == null) { // no codec.  Uncompressed file.
        positionAtFirstRecord(fileIn);
        inputStream = fileIn;
    } else {
        // compressed file
        if (start != 0) {
            throw new RuntimeException("Start position for compressed file is not 0! (found " + start + ")");
        }

        inputStream = codec.createInputStream(fileIn);
        end = Long.MAX_VALUE; // read until the end of the file
    }

    lineReader = new LineReader(inputStream);
}
项目:pipelines    文件:DwCAInputFormat.java   
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
  throws IOException, InterruptedException {

  // what follows is very questionable but a quick test
  // the file is read from HDFS and copied to a temporary location
  FileSplit split = (FileSplit)inputSplit;
  Configuration job = context.getConfiguration();
  Path file = split.getPath();
  FileSystem fs = file.getFileSystem(job);
  java.nio.file.Path tmpFile = Files.createTempFile("tmp", ".zip"); // consider using job and task IDs?
  FSDataInputStream fileIn = fs.open(file);
  FileOutputStream fileOut = new FileOutputStream(tmpFile.toFile());
  LOG.info("Copying from {} to {}", file, tmpFile);
  IOUtils.copyBytes(fileIn, fileOut, 100000, true);

  // having copied the file out of HDFS onto the local FS in a temp folder, we prepare it (sorts files)
  java.nio.file.Path tmpSpace = Files.createTempDirectory("tmp-" + context.getTaskAttemptID().getJobID().getId() +
                                                          ":" + context.getTaskAttemptID().getId());

  reader = new DwCAReader(tmpFile.toAbsolutePath().toString(), tmpSpace.toAbsolutePath().toString());
  nextKeyValue();
}
项目:dataSqueeze    文件:OrcCompactionMapper.java   
/**
 * {@inheritDoc}
 */
protected void map(final Object key, final OrcStruct value, final Context context) throws IOException, InterruptedException {
    if (value!= null && value.toString() != null && value.toString().isEmpty()) {
        return;
    }

    // Mapper sends data with parent directory path as keys to retain directory structure
    final FileSplit fileSplit = (FileSplit) context.getInputSplit();
    final Path filePath = fileSplit.getPath();
    final String parentFilePath = String.format("%s/", filePath.getParent().toString());
    log.debug("Parent file path {}", parentFilePath);

    if (!fileSizesMap.containsKey(filePath.toString())) {
        if (fileSystem == null){
            final URI uri = URI.create(filePath.toString());
            fileSystem = FileSystem.get(uri, configuration);
        }
        final FileStatus[] listStatuses = fileSystem.listStatus(filePath);
        for (FileStatus fileStatus : listStatuses) {
            if (!fileStatus.isDirectory()) {
                fileSizesMap.put(fileStatus.getPath().toString(), fileStatus.getLen());
                log.info("Entry added to fileSizes Map {} {}", fileStatus.getPath().toString(), fileStatus.getLen());
            }
        }
    }

    final Text parentFilePathKey = new Text(parentFilePath);
    final Text filePathKey = new Text(filePath.toString());
    final OrcValue orcValue = new OrcValue();
    orcValue.value = value;


    final Long fileSize = fileSizesMap.get(filePath.toString());

    if (fileSize < threshold) {
        context.write(parentFilePathKey, orcValue);
    } else {
        context.write(filePathKey, orcValue);
    }
}
项目:ditb    文件:CompactionTool.java   
/**
 * Returns a split for each store files directory using the block location
 * of each file as locality reference.
 */
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
  List<InputSplit> splits = new ArrayList<InputSplit>();
  List<FileStatus> files = listStatus(job);

  Text key = new Text();
  for (FileStatus file: files) {
    Path path = file.getPath();
    FileSystem fs = path.getFileSystem(job.getConfiguration());
    LineReader reader = new LineReader(fs.open(path));
    long pos = 0;
    int n;
    try {
      while ((n = reader.readLine(key)) > 0) {
        String[] hosts = getStoreDirHosts(fs, path);
        splits.add(new FileSplit(path, pos, n, hosts));
        pos += n;
      }
    } finally {
      reader.close();
    }
  }

  return splits;
}
项目:ditb    文件:IntegrationTestBigLinkedList.java   
private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
    final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException,
    InterruptedException {
  SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
  // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is
  // what is missing.
  TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
  try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
      new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
    InputSplit is =
      new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {});
    rr.initialize(is, context);
    while (rr.nextKeyValue()) {
      rr.getCurrentKey();
      BytesWritable bw = rr.getCurrentValue();
      if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) {
        byte[] key = new byte[rr.getCurrentKey().getLength()];
        System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey()
            .getLength());
        result.add(key);
      }
    }
  }
  return result;
}
项目:accumulo-wikisearch    文件:AggregatingRecordReaderTest.java   
@Test
public void testPartialXML2() throws Exception {
  File f = createFile(xml3);

  // Create FileSplit
  Path p = new Path(f.toURI().toString());
  WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);

  // Initialize the RecordReader
  AggregatingRecordReader reader = new AggregatingRecordReader();
  reader.initialize(split, ctx);
  assertTrue(reader.nextKeyValue());
  testXML(reader.getCurrentValue(), "A", "B", "");
  assertTrue(reader.nextKeyValue());
  testXML(reader.getCurrentValue(), "C", "D", "");
  assertTrue(reader.nextKeyValue());
  try {
    testXML(reader.getCurrentValue(), "E", "", "");
    fail("Fragment returned, and it somehow passed XML parsing.");
  } catch (SAXParseException e) {
    // ignore
  }
  assertTrue(!reader.nextKeyValue());
}
项目:big-c    文件:TeraInputFormat.java   
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
  if (job == lastContext) {
    return lastResult;
  }
  long t1, t2, t3;
  t1 = System.currentTimeMillis();
  lastContext = job;
  lastResult = super.getSplits(job);
  t2 = System.currentTimeMillis();
  System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
  if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
    TeraScheduler scheduler = new TeraScheduler(
      lastResult.toArray(new FileSplit[0]), job.getConfiguration());
    lastResult = scheduler.getNewFileSplits();
    t3 = System.currentTimeMillis(); 
    System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
  }
  return lastResult;
}
项目:aliyun-oss-hadoop-fs    文件:TestJobSplitWriter.java   
@Test
public void testMaxBlockLocationsOldSplits() throws Exception {
  TEST_DIR.mkdirs();
  try {
    Configuration conf = new Configuration();
    conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
    Path submitDir = new Path(TEST_DIR.getAbsolutePath());
    FileSystem fs = FileSystem.getLocal(conf);
    org.apache.hadoop.mapred.FileSplit split =
        new org.apache.hadoop.mapred.FileSplit(new Path("/some/path"), 0, 1,
            new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
    JobSplitWriter.createSplitFiles(submitDir, conf, fs,
        new org.apache.hadoop.mapred.InputSplit[] { split });
    JobSplit.TaskSplitMetaInfo[] infos =
        SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
            submitDir);
    assertEquals("unexpected number of splits", 1, infos.length);
    assertEquals("unexpected number of split locations",
        4, infos[0].getLocations().length);
  } finally {
    FileUtil.fullyDelete(TEST_DIR);
  }
}
项目:big-c    文件:TestUniformSizeInputFormat.java   
private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
  long lastEnd = 0;

  //Verify if each split's start is matching with the previous end and
  //we are not missing anything
  for (InputSplit split : splits) {
    FileSplit fileSplit = (FileSplit) split;
    long start = fileSplit.getStart();
    Assert.assertEquals(lastEnd, start);
    lastEnd = start + fileSplit.getLength();
  }

  //Verify there is nothing more to read from the input file
  SequenceFile.Reader reader
          = new SequenceFile.Reader(cluster.getFileSystem().getConf(),
                  SequenceFile.Reader.file(listFile));

  try {
    reader.seek(lastEnd);
    CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
    Text srcRelPath = new Text();
    Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
  } finally {
    IOUtils.closeStream(reader);
  }
}
项目:aliyun-oss-hadoop-fs    文件:TeraInputFormat.java   
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
  if (job == lastContext) {
    return lastResult;
  }
  long t1, t2, t3;
  t1 = System.currentTimeMillis();
  lastContext = job;
  lastResult = super.getSplits(job);
  t2 = System.currentTimeMillis();
  System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
  if (job.getConfiguration().getBoolean(TeraSortConfigKeys.USE_TERA_SCHEDULER.key(),
                                        TeraSortConfigKeys.DEFAULT_USE_TERA_SCHEDULER)) {
    TeraScheduler scheduler = new TeraScheduler(
      lastResult.toArray(new FileSplit[0]), job.getConfiguration());
    lastResult = scheduler.getNewFileSplits();
    t3 = System.currentTimeMillis(); 
    System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
  }
  return lastResult;
}
项目:accumulo-wikisearch    文件:WikipediaInputFormat.java   
@Override
public void readFields(DataInput in) throws IOException {
  Path file = new Path(in.readUTF());
  long start = in.readLong();
  long length = in.readLong();
  String [] hosts = null;
  if(in.readBoolean())
  {
    int numHosts = in.readInt();
    hosts = new String[numHosts];
    for(int i = 0; i < numHosts; i++)
      hosts[i] = in.readUTF();
  }
  fileSplit = new FileSplit(file, start, length, hosts);
  partition = in.readInt();
}
项目:aliyun-oss-hadoop-fs    文件:TeraScheduler.java   
public TeraScheduler(FileSplit[] realSplits,
                     Configuration conf) throws IOException {
  this.realSplits = realSplits;
  this.slotsPerHost = conf.getInt(TTConfig.TT_MAP_SLOTS, 4);
  Map<String, Host> hostTable = new HashMap<String, Host>();
  splits = new Split[realSplits.length];
  for(FileSplit realSplit: realSplits) {
    Split split = new Split(realSplit.getPath().toString());
    splits[remainingSplits++] = split;
    for(String hostname: realSplit.getLocations()) {
      Host host = hostTable.get(hostname);
      if (host == null) {
        host = new Host(hostname);
        hostTable.put(hostname, host);
        hosts.add(host);
      }
      host.splits.add(split);
      split.locations.add(host);
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:DynamicInputFormat.java   
private List<InputSplit> createSplits(JobContext jobContext,
                                      List<DynamicInputChunk> chunks)
        throws IOException {
  int numMaps = getNumMapTasks(jobContext.getConfiguration());

  final int nSplits = Math.min(numMaps, chunks.size());
  List<InputSplit> splits = new ArrayList<InputSplit>(nSplits);

  for (int i=0; i< nSplits; ++i) {
    TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
    chunks.get(i).assignTo(taskId);
    splits.add(new FileSplit(chunks.get(i).getPath(), 0,
        // Setting non-zero length for FileSplit size, to avoid a possible
        // future when 0-sized file-splits are considered "empty" and skipped
        // over.
        getMinRecordsPerChunk(jobContext.getConfiguration()),
        null));
  }
  DistCpUtils.publish(jobContext.getConfiguration(),
                      CONF_LABEL_NUM_SPLITS, splits.size());
  return splits;
}
项目:accumulo-wikisearch    文件:AggregatingRecordReaderTest.java   
public void testPartialXML2WithNoPartialRecordsReturned() throws Exception {
  conf.set(AggregatingRecordReader.RETURN_PARTIAL_MATCHES, Boolean.toString(false));
  File f = createFile(xml3);

  // Create FileSplit
  Path p = new Path(f.toURI().toString());
  WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);

  // Initialize the RecordReader
  AggregatingRecordReader reader = new AggregatingRecordReader();
  reader.initialize(split, ctx);
  assertTrue(reader.nextKeyValue());
  testXML(reader.getCurrentValue(), "A", "B", "");
  assertTrue(reader.nextKeyValue());
  testXML(reader.getCurrentValue(), "C", "D", "");
  assertTrue(!reader.nextKeyValue());
}
项目:big-c    文件:DynamicInputFormat.java   
private List<InputSplit> createSplits(JobContext jobContext,
                                      List<DynamicInputChunk> chunks)
        throws IOException {
  int numMaps = getNumMapTasks(jobContext.getConfiguration());

  final int nSplits = Math.min(numMaps, chunks.size());
  List<InputSplit> splits = new ArrayList<InputSplit>(nSplits);

  for (int i=0; i< nSplits; ++i) {
    TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
    chunks.get(i).assignTo(taskId);
    splits.add(new FileSplit(chunks.get(i).getPath(), 0,
        // Setting non-zero length for FileSplit size, to avoid a possible
        // future when 0-sized file-splits are considered "empty" and skipped
        // over.
        getMinRecordsPerChunk(jobContext.getConfiguration()),
        null));
  }
  DistCpUtils.publish(jobContext.getConfiguration(),
                      CONF_LABEL_NUM_SPLITS, splits.size());
  return splits;
}
项目:fst-bench    文件:TeraInputFormat.java   
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
  if (job == lastContext) {
    return lastResult;
  }
  long t1, t2, t3;
  t1 = System.currentTimeMillis();
  lastContext = job;
  lastResult = super.getSplits(job);
  t2 = System.currentTimeMillis();
  System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
  if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
    TeraScheduler scheduler = new TeraScheduler(
      lastResult.toArray(new FileSplit[0]), job.getConfiguration());
    lastResult = scheduler.getNewFileSplits();
    t3 = System.currentTimeMillis(); 
    System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
  }
  return lastResult;
}
项目:fst-bench    文件:TeraScheduler.java   
public TeraScheduler(FileSplit[] realSplits,
                     Configuration conf) throws IOException {
  this.realSplits = realSplits;
  this.slotsPerHost = conf.getInt(TTConfig.TT_MAP_SLOTS, 4);
  Map<String, Host> hostTable = new HashMap<String, Host>();
  splits = new Split[realSplits.length];
  for(FileSplit realSplit: realSplits) {
    Split split = new Split(realSplit.getPath().toString());
    splits[remainingSplits++] = split;
    for(String hostname: realSplit.getLocations()) {
      Host host = hostTable.get(hostname);
      if (host == null) {
        host = new Host(hostname);
        hostTable.put(hostname, host);
        hosts.add(host);
      }
      host.splits.add(split);
      split.locations.add(host);
    }
  }
}
项目:marklogic-contentpump    文件:CompressedDelimitedTextReader.java   
@Override
public void initialize(InputSplit inSplit, TaskAttemptContext context)
    throws IOException, InterruptedException {
    initConfig(context);
    initDocType();
    initDelimConf();

    setFile(((FileSplit) inSplit).getPath());
    fs = file.getFileSystem(context.getConfiguration());
    FileStatus status = fs.getFileStatus(file);
    if (status.isDirectory()) {
        iterator = new FileIterator((FileSplit)inSplit, context);
        inSplit = iterator.next();
    }

    initStream(inSplit);
}
项目:marklogic-contentpump    文件:ArchiveRecordReader.java   
@Override
public void initialize(InputSplit inSplit, TaskAttemptContext context)
    throws IOException, InterruptedException {
    initConfig(context);
    allowEmptyMeta = conf.getBoolean(
        CONF_INPUT_ARCHIVE_METADATA_OPTIONAL, false);

    setFile(((FileSplit) inSplit).getPath());
    fs = file.getFileSystem(context.getConfiguration());
    FileStatus status = fs.getFileStatus(file);
    if(status.isDirectory()) {
        iterator = new FileIterator((FileSplit)inSplit, context);
        inSplit = iterator.next();
    }
    initStream(inSplit);
}
项目:accumulo-wikisearch    文件:AggregatingRecordReaderTest.java   
@Test
public void testIncorrectArgs() throws Exception {
  File f = createFile(xml1);

  // Create FileSplit
  Path p = new Path(f.toURI().toString());
  WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
  AggregatingRecordReader reader = new AggregatingRecordReader();
  try {
    // Clear the values for BEGIN and STOP TOKEN
    conf.set(AggregatingRecordReader.START_TOKEN, null);
    conf.set(AggregatingRecordReader.END_TOKEN, null);
    reader.initialize(split, ctx);
    // If we got here, then the code didnt throw an exception
    fail();
  } catch (Exception e) {
    // Do nothing, we succeeded
    f = null;
  }
  reader.close();
}
项目:marklogic-contentpump    文件:AggregateXMLReader.java   
protected void initStreamReader(InputSplit inSplit) throws IOException,
    InterruptedException {
    start = 0;
    end = inSplit.getLength();
    overflow = false;
    fInputStream = openFile(inSplit, true);
    if (fInputStream == null) {
        return;
    }

    try {
        xmlSR = f.createXMLStreamReader(fInputStream, encoding);
    } catch (XMLStreamException e) {
        LOG.error(e.getMessage(), e);
    }

    if (useAutomaticId) {
        idGen = new IdGenerator(file.toUri().getPath() + "-"
            + ((FileSplit) inSplit).getStart());
    }
}
项目:hadoop-wiki-index    文件:XmlInputFormat.java   
/**
 * 初始化读取资源以及相关的参数也可以放到initialize()方法中去执行
 * @param inputSplit
 * @param context
 * @throws IOException
 */
public XMLRecordReader(InputSplit inputSplit, Configuration context) throws IOException {
    /**
     * 获取开传入的开始和结束标签
     */
    startTag = context.get(START_TAG_KEY).getBytes("UTF-8");
    endTag = context.get(END_TAG_KEY).getBytes("UTF-8");
    FileSplit fileSplit = (FileSplit) inputSplit;
    /**
     * 获取分片的开始位置和结束的位置
     */
    start = fileSplit.getStart();
    end = start + fileSplit.getLength();
    Path file = fileSplit.getPath();
    FileSystem fs = file.getFileSystem(context);
    /**
     * 根据分片打开一个HDFS的文件输入流
     */
    fsin = fs.open(fileSplit.getPath());
    /**
     * 定位到分片开始的位置
     */
    fsin.seek(start);
}
项目:marklogic-contentpump    文件:ImportRecordReader.java   
public FSDataInputStream openFile(InputSplit inSplit,
        boolean configCol) throws IOException {
    while (true) {
        setFile(((FileSplit) inSplit).getPath());
        if (configCol) {
            configFileNameAsCollection(conf, file);
        }
        try {
            return fs.open(file);
        } catch (IllegalArgumentException e){
            LOG.error("Input file skipped, reason: " + e.getMessage());
            if (iterator != null &&
                    iterator.hasNext()) {
                inSplit = iterator.next();
            } else {
                return null;
            }
        }
    }
}
项目:big-c    文件:TeraScheduler.java   
public TeraScheduler(FileSplit[] realSplits,
                     Configuration conf) throws IOException {
  this.realSplits = realSplits;
  this.slotsPerHost = conf.getInt(TTConfig.TT_MAP_SLOTS, 4);
  Map<String, Host> hostTable = new HashMap<String, Host>();
  splits = new Split[realSplits.length];
  for(FileSplit realSplit: realSplits) {
    Split split = new Split(realSplit.getPath().toString());
    splits[remainingSplits++] = split;
    for(String hostname: realSplit.getLocations()) {
      Host host = hostTable.get(hostname);
      if (host == null) {
        host = new Host(hostname);
        hostTable.put(hostname, host);
        hosts.add(host);
      }
      host.splits.add(split);
      split.locations.add(host);
    }
  }
}
项目:marklogic-contentpump    文件:CombineDocumentSplit.java   
public void readFields(DataInput in) throws IOException {
    // splits
    int splitSize = in.readInt();
    splits = new ArrayList<FileSplit>();
    for (int i = 0; i < splitSize; i++) {
        Path path = new Path(Text.readString(in));
        long start = in.readLong();
        long len = in.readLong();
        FileSplit split = new FileSplit(path, start, len, null);
        splits.add(split);
    }
    // length
    length = in.readLong();
    // locations
    locations = new HashSet<String>();
}