Java 类org.apache.hadoop.mapred.lib.CombineFileSplit 实例源码

项目:gemfirexd-oss    文件:FragmenterJUnitTest.java   
private void verifyFragments(InputSplit[] fs, List<Fragment> fragments) throws Exception {
  log("Total fragments [expected, actual]: " + fs.length + ", " + fragments.size());
  assertEquals(fs.length, fragments.size());

  for (int i = 0; i < fs.length; i++) {
    CombineFileSplit split = (CombineFileSplit) fs[i];
    Fragment frag = fragments.get(i);

    log("Number of hosts hosting the fragment [expected, actual]: " + fs[i].getLocations().length + ",  " +  frag.getReplicas().length);
    assertEquals(fs[i].getLocations().length, frag.getReplicas().length);

    log("Fragment source name [expected, actual]: " + split.getPath(0).toString() +  ",  " + frag.getSourceName());
    assertEquals(split.getPath(0).toString(), "/" + frag.getSourceName());

    for (int j = 0; j < frag.getReplicas().length; j++) {
      log("Fragment host [expected, actual]: " + fs[i].getLocations()[j] + ",  " + frag.getReplicas()[j]);
      assertEquals(fs[i].getLocations()[j], frag.getReplicas()[j]);

      log(" User data [expected, actual]: " + null + ",  " + frag.getUserData());
      assertEquals(null, frag.getUserData());
    }
  }
}
项目:gemfirexd-oss    文件:RowInputFormat.java   
private InputSplit[] createSplits(JobConf job, Collection<FileStatus> hoplogs)
    throws IOException {
  if (hoplogs == null || hoplogs.isEmpty()) {
    return new InputSplit[0];
  }

  HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs);
  List<org.apache.hadoop.mapreduce.InputSplit> mr2Splits = splitter.getOptimizedSplits(conf);
  InputSplit[] splits = new InputSplit[mr2Splits.size()];
  int i = 0;
  for (org.apache.hadoop.mapreduce.InputSplit inputSplit : mr2Splits) {
    org.apache.hadoop.mapreduce.lib.input.CombineFileSplit mr2Spit;
    mr2Spit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) inputSplit;

    CombineFileSplit split = new CombineFileSplit(job, mr2Spit.getPaths(),
        mr2Spit.getStartOffsets(), mr2Spit.getLengths(),
        mr2Spit.getLocations());
    splits[i] = getSplit(split);
    i++;
  }

  return splits;
}
项目:gemfirexd-oss    文件:GemFireXDFragmenter.java   
@Override
  public List<Fragment> getFragments() throws IOException {
    InputSplit[] splits;
//    try {
      splits = getSplits();
//    } finally {
//      this.gfxdManager.resetLonerSystemInUse();
//    }

    for (InputSplit split : splits) {
      CombineFileSplit cSplit = (CombineFileSplit)split;

      if (cSplit.getLength() > 0L) {
        String filepath = cSplit.getPath(0).toUri().getPath();
        filepath = filepath.substring(1);
        if (this.gfxdManager.getLogger().isDebugEnabled()) {
          this.gfxdManager.getLogger().debug("fragment-filepath " + filepath);
        }
        byte[] data = this.gfxdManager.populateUserData(cSplit);
        this.fragments.add(new Fragment(filepath, cSplit.getLocations(), data));
      }
    }
    return this.fragments;
  }
项目:gemfirexd-oss    文件:GFInputFormat.java   
/**
 * Creates an input split for every block occupied by hoplogs of the input
 * regions
 * 
 * @param job 
 * @param hoplogs
 * @return array of input splits of type file input split
 * @throws IOException
 */
private InputSplit[] createSplits(JobConf job, Collection<FileStatus> hoplogs)
    throws IOException {
  if (hoplogs == null || hoplogs.isEmpty()) {
    return new InputSplit[0];
  }

  HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs);
  List<org.apache.hadoop.mapreduce.InputSplit> mr2Splits = splitter.getOptimizedSplits(conf);
  InputSplit[] splits = new InputSplit[mr2Splits.size()];
  int i = 0;
  for (org.apache.hadoop.mapreduce.InputSplit inputSplit : mr2Splits) {
    org.apache.hadoop.mapreduce.lib.input.CombineFileSplit mr2Spit;
    mr2Spit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) inputSplit;

    CombineFileSplit split = new CombineFileSplit(job, mr2Spit.getPaths(),
        mr2Spit.getStartOffsets(), mr2Spit.getLengths(),
        mr2Spit.getLocations());
    splits[i] = split;
    i++;
  }

  return splits;
}
项目:emr-dynamodb-connector    文件:ImportRecordReaderFactory.java   
static RecordReader<NullWritable, DynamoDBItemWritable> getRecordReader(
    InputSplit inputSplit, JobConf job, Reporter reporter) throws IOException {
  // CombineFileSplit indicates the new export format which includes a manifest file
  if (inputSplit instanceof CombineFileSplit) {
    int version = job.getInt(DynamoDBConstants.EXPORT_FORMAT_VERSION, -1);
    if (version != ExportManifestRecordWriter.FORMAT_VERSION) {
      throw new IOException("Unknown version: " + job.get(DynamoDBConstants
          .EXPORT_FORMAT_VERSION));
    }
    return new ImportCombineFileRecordReader((CombineFileSplit) inputSplit, job, reporter);
  } else if (inputSplit instanceof FileSplit) {
    // FileSplit indicates the old data pipeline format which doesn't include a manifest file
    Path path = ((FileSplit) inputSplit).getPath();
    return new ImportRecordReader(job, path);
  } else {
    throw new IOException("Expecting CombineFileSplit or FileSplit but the input split type is:"
        + " " + inputSplit.getClass());
  }
}
项目:gemfirexd-oss    文件:FragmenterJUnitTest.java   
private void verifyFragments(InputSplit[] fs, List<Fragment> fragments) throws Exception {
  log("Total fragments [expected, actual]: " + fs.length + ", " + fragments.size());
  assertEquals(fs.length, fragments.size());

  for (int i = 0; i < fs.length; i++) {
    CombineFileSplit split = (CombineFileSplit) fs[i];
    Fragment frag = fragments.get(i);

    log("Number of hosts hosting the fragment [expected, actual]: " + fs[i].getLocations().length + ",  " +  frag.getReplicas().length);
    assertEquals(fs[i].getLocations().length, frag.getReplicas().length);

    log("Fragment source name [expected, actual]: " + split.getPath(0).toString() +  ",  " + frag.getSourceName());
    assertEquals(split.getPath(0).toString(), "/" + frag.getSourceName());

    for (int j = 0; j < frag.getReplicas().length; j++) {
      log("Fragment host [expected, actual]: " + fs[i].getLocations()[j] + ",  " + frag.getReplicas()[j]);
      assertEquals(fs[i].getLocations()[j], frag.getReplicas()[j]);

      log(" User data [expected, actual]: " + null + ",  " + frag.getUserData());
      assertEquals(null, frag.getUserData());
    }
  }
}
项目:gemfirexd-oss    文件:RowInputFormat.java   
private InputSplit[] createSplits(JobConf job, Collection<FileStatus> hoplogs)
    throws IOException {
  if (hoplogs == null || hoplogs.isEmpty()) {
    return new InputSplit[0];
  }

  HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs);
  List<org.apache.hadoop.mapreduce.InputSplit> mr2Splits = splitter.getOptimizedSplits(conf);
  InputSplit[] splits = new InputSplit[mr2Splits.size()];
  int i = 0;
  for (org.apache.hadoop.mapreduce.InputSplit inputSplit : mr2Splits) {
    org.apache.hadoop.mapreduce.lib.input.CombineFileSplit mr2Spit;
    mr2Spit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) inputSplit;

    CombineFileSplit split = new CombineFileSplit(job, mr2Spit.getPaths(),
        mr2Spit.getStartOffsets(), mr2Spit.getLengths(),
        mr2Spit.getLocations());
    splits[i] = getSplit(split);
    i++;
  }

  return splits;
}
项目:gemfirexd-oss    文件:GemFireXDFragmenter.java   
@Override
  public List<Fragment> getFragments() throws IOException {
    InputSplit[] splits;
//    try {
      splits = getSplits();
//    } finally {
//      this.gfxdManager.resetLonerSystemInUse();
//    }

    for (InputSplit split : splits) {
      CombineFileSplit cSplit = (CombineFileSplit)split;

      if (cSplit.getLength() > 0L) {
        String filepath = cSplit.getPath(0).toUri().getPath();
        filepath = filepath.substring(1);
        if (this.gfxdManager.getLogger().isDebugEnabled()) {
          this.gfxdManager.getLogger().debug("fragment-filepath " + filepath);
        }
        byte[] data = this.gfxdManager.populateUserData(cSplit);
        this.fragments.add(new Fragment(filepath, cSplit.getLocations(), data));
      }
    }
    return this.fragments;
  }
项目:gemfirexd-oss    文件:GFInputFormat.java   
/**
 * Creates an input split for every block occupied by hoplogs of the input
 * regions
 * 
 * @param job 
 * @param hoplogs
 * @return array of input splits of type file input split
 * @throws IOException
 */
private InputSplit[] createSplits(JobConf job, Collection<FileStatus> hoplogs)
    throws IOException {
  if (hoplogs == null || hoplogs.isEmpty()) {
    return new InputSplit[0];
  }

  HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs);
  List<org.apache.hadoop.mapreduce.InputSplit> mr2Splits = splitter.getOptimizedSplits(conf);
  InputSplit[] splits = new InputSplit[mr2Splits.size()];
  int i = 0;
  for (org.apache.hadoop.mapreduce.InputSplit inputSplit : mr2Splits) {
    org.apache.hadoop.mapreduce.lib.input.CombineFileSplit mr2Spit;
    mr2Spit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) inputSplit;

    CombineFileSplit split = new CombineFileSplit(job, mr2Spit.getPaths(),
        mr2Spit.getStartOffsets(), mr2Spit.getLengths(),
        mr2Spit.getLocations());
    splits[i] = split;
    i++;
  }

  return splits;
}
项目:spatedb    文件:FileSplitUtil.java   
/**
 * Combines a number of file splits into one CombineFileSplit. If number of
 * splits to be combined is one, it returns this split as is without creating
 * a CombineFileSplit.
 * @param splits
 * @param startIndex
 * @param count
 * @return
 * @throws IOException 
 */
public static InputSplit combineFileSplits(JobConf conf,
    List<FileSplit> splits, int startIndex, int count) throws IOException {
  if (count == 1) {
    return splits.get(startIndex);
  } else {
    Path[] paths = new Path[count];
    long[] starts = new long[count];
    long[] lengths = new long[count];
    Vector<String> vlocations = new Vector<String>();
    while (count > 0) {
      paths[count - 1] = splits.get(startIndex).getPath();
      starts[count - 1] = splits.get(startIndex).getStart();
      lengths[count - 1] = splits.get(startIndex).getLength();
      vlocations.addAll(Arrays.asList(splits.get(startIndex).getLocations()));
      count--;
      startIndex++;
    }
    String[] locations = prioritizeLocations(vlocations);
    return new CombineFileSplit(conf, paths, starts, lengths, locations);
  }
}
项目:spatedb    文件:FileSplitUtil.java   
/**
 * Combines two file splits into a CombineFileSplit.
 * @param conf
 * @param split1
 * @param split2
 * @return
 * @throws IOException 
 */
public static InputSplit combineFileSplits(JobConf conf,
    FileSplit split1, FileSplit split2) throws IOException {
  Path[] paths = new Path[2];
  long[] starts = new long[2];
  long[] lengths = new long[2];
  Vector<String> vlocations = new Vector<String>();
  paths[0] = split1.getPath();
  starts[0] = split1.getStart();
  lengths[0] = split1.getLength();
  vlocations.addAll(Arrays.asList(split1.getLocations()));
  paths[1] = split2.getPath();
  starts[1] = split2.getStart();
  lengths[1] = split2.getLength();
  vlocations.addAll(Arrays.asList(split2.getLocations()));
  String[] locations = prioritizeLocations(vlocations);
  return new CombineFileSplit(conf, paths, starts, lengths, locations);
}
项目:gemfirexd-oss    文件:GemFireXDManager.java   
/**
 * Make sure we do not generate a lot of data here as this will be duplicated
 * per split and sent to HAWQ master and later to datanodes.
 * 
 * The sequence in which data is written to out must match the sequence it is
 * read in {@link #readUserData()}
 * 
 * <p>
 * Only called from Fragmenter.
 * 
 * @param cSplit
 * @return
 */
public byte[] populateUserData(CombineFileSplit cSplit) throws IOException {
  // Construct user data
  ByteArrayOutputStream baos = new ByteArrayOutputStream();
  DataOutput out = new DataOutputStream(baos);

  // TODO Uncomment below statement (and its corresponding code in
  // readUserData()) when loner system is started from fragmenter as well as
  // from accessor.
  // 1. restart loner
  // out.write(RESTART_LONER_SYSTEM_CODE);
  // out.writeBoolean(this.restartLoner);

  // 2. home dir
  out.write(HOME_DIR_CODE);
  out.writeUTF(this.homeDir);

  // 3. schema.table
  out.write(SCHEMA_TABLE_NAME_CODE);
  out.writeUTF(this.schemaTableName);

  out.write(SPLIT_CODE);
  cSplit.write(out);

  // Serialize it and return
  return baos.toByteArray();
}
项目:gemfirexd-oss    文件:GemFireXDManager.java   
/**
 * This is only called from Accessor. The sequence in which switch cases
 * appear must match to the sequence followed in writing data to out in
 * {@link #populateUserData(FileSplit)}
 * 
 * @param data
 * @throws IOException
 */
public void readUserData() throws IOException {
  byte[] data = this.inputData.getFragmentMetadata();
  if (data != null && data.length > 0) {
    boolean done = false;
    ByteArrayDataInput in = new ByteArrayDataInput();
    in.initialize(data, null);
    while (!done) {
      try {
        switch (in.readByte()) {
        case HOME_DIR_CODE:
          this.homeDir = in.readUTF();
          this.logger.debug("Accessor received home dir: " + this.homeDir);
          break;
        case SCHEMA_TABLE_NAME_CODE:
          this.schemaTableName = in.readUTF();
          this.logger.debug("Accessor received schemaTable name: "
              + this.schemaTableName);
          break;
        case SPLIT_CODE:
          this.split = new CombineFileSplit();
          this.split.readFields(in);
          this.logger.debug("Accessor split read, total length: " + this.split.getLength());
          done = true;
          break;
        default:
          this.logger.error("Internal error: Invalid data from fragmenter.");
          done = true;
          break;
        }
      } catch (EOFException eofe) {
        this.logger.error("Internal error: Invalid data from fragmenter.");
        break; // from while().
      }
    }
  }
}
项目:gemfirexd-oss    文件:AbstractGFRecordReader.java   
/**
 * Initializes instance of record reader using file split and job
 * configuration
 * 
 * @param split
 * @param conf
 * @throws IOException
 */
public void initialize(CombineFileSplit split, JobConf conf) throws IOException {
  CombineFileSplit cSplit = (CombineFileSplit) split;
  Path[] path = cSplit.getPaths();
  long[] start = cSplit.getStartOffsets();
  long[] len = cSplit.getLengths();

  FileSystem fs = cSplit.getPath(0).getFileSystem(conf);
  this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, 0l, 0l);
}
项目:gemfirexd-oss    文件:GFInputFormat.java   
@Override
public RecordReader<GFKey, PersistedEventImpl> getRecordReader(
    InputSplit split, JobConf job, Reporter reporter) throws IOException {

  CombineFileSplit cSplit = (CombineFileSplit) split;
  AbstractGFRecordReader reader = new AbstractGFRecordReader();
  reader.initialize(cSplit, job);
  return reader;
}
项目:emr-dynamodb-connector    文件:ImportCombineFileRecordReader.java   
public ImportCombineFileRecordReader(CombineFileSplit combineFileSplit, JobConf job, Reporter
    reporter) throws IOException {
  this.combineFileSplit = combineFileSplit;
  this.job = job;
  this.reporter = reporter;

  processedPathCount = 0;
  currentRecordReader = getRecordReader(combineFileSplit.getPath(processedPathCount));
}
项目:gemfirexd-oss    文件:GemFireXDManager.java   
/**
 * Make sure we do not generate a lot of data here as this will be duplicated
 * per split and sent to HAWQ master and later to datanodes.
 * 
 * The sequence in which data is written to out must match the sequence it is
 * read in {@link #readUserData()}
 * 
 * <p>
 * Only called from Fragmenter.
 * 
 * @param cSplit
 * @return
 */
public byte[] populateUserData(CombineFileSplit cSplit) throws IOException {
  // Construct user data
  ByteArrayOutputStream baos = new ByteArrayOutputStream();
  DataOutput out = new DataOutputStream(baos);

  // TODO Uncomment below statement (and its corresponding code in
  // readUserData()) when loner system is started from fragmenter as well as
  // from accessor.
  // 1. restart loner
  // out.write(RESTART_LONER_SYSTEM_CODE);
  // out.writeBoolean(this.restartLoner);

  // 2. home dir
  out.write(HOME_DIR_CODE);
  out.writeUTF(this.homeDir);

  // 3. schema.table
  out.write(SCHEMA_TABLE_NAME_CODE);
  out.writeUTF(this.schemaTableName);

  out.write(SPLIT_CODE);
  cSplit.write(out);

  // Serialize it and return
  return baos.toByteArray();
}
项目:gemfirexd-oss    文件:GemFireXDManager.java   
/**
 * This is only called from Accessor. The sequence in which switch cases
 * appear must match to the sequence followed in writing data to out in
 * {@link #populateUserData(FileSplit)}
 * 
 * @param data
 * @throws IOException
 */
public void readUserData() throws IOException {
  byte[] data = this.inputData.getFragmentMetadata();
  if (data != null && data.length > 0) {
    boolean done = false;
    ByteArrayDataInput in = new ByteArrayDataInput();
    in.initialize(data, null);
    while (!done) {
      try {
        switch (in.readByte()) {
        case HOME_DIR_CODE:
          this.homeDir = in.readUTF();
          this.logger.debug("Accessor received home dir: " + this.homeDir);
          break;
        case SCHEMA_TABLE_NAME_CODE:
          this.schemaTableName = in.readUTF();
          this.logger.debug("Accessor received schemaTable name: "
              + this.schemaTableName);
          break;
        case SPLIT_CODE:
          this.split = new CombineFileSplit();
          this.split.readFields(in);
          this.logger.debug("Accessor split read, total length: " + this.split.getLength());
          done = true;
          break;
        default:
          this.logger.error("Internal error: Invalid data from fragmenter.");
          done = true;
          break;
        }
      } catch (EOFException eofe) {
        this.logger.error("Internal error: Invalid data from fragmenter.");
        break; // from while().
      }
    }
  }
}
项目:gemfirexd-oss    文件:AbstractGFRecordReader.java   
/**
 * Initializes instance of record reader using file split and job
 * configuration
 * 
 * @param split
 * @param conf
 * @throws IOException
 */
public void initialize(CombineFileSplit split, JobConf conf) throws IOException {
  CombineFileSplit cSplit = (CombineFileSplit) split;
  Path[] path = cSplit.getPaths();
  long[] start = cSplit.getStartOffsets();
  long[] len = cSplit.getLengths();

  FileSystem fs = cSplit.getPath(0).getFileSystem(conf);
  this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, 0l, 0l);
}
项目:gemfirexd-oss    文件:GFInputFormat.java   
@Override
public RecordReader<GFKey, PersistedEventImpl> getRecordReader(
    InputSplit split, JobConf job, Reporter reporter) throws IOException {

  CombineFileSplit cSplit = (CombineFileSplit) split;
  AbstractGFRecordReader reader = new AbstractGFRecordReader();
  reader.initialize(cSplit, job);
  return reader;
}
项目:spatedb    文件:BinaryRecordReader.java   
@SuppressWarnings("unchecked")
public BinaryRecordReader(Configuration conf, CombineFileSplit split) throws IOException {
  this.conf = conf;
  this.split = split;
  internalReaders = new RecordReader[(int) split.getNumPaths()];
  // Initialize all record readers
  for (int i = 0; i < split.getNumPaths(); i++) {
    this.internalReaders[i] = createRecordReader(this.conf, this.split, i);
  }
}
项目:hadoop    文件:TestCombineSequenceFileInputFormat.java   
@Test(timeout=10000)
public void testFormat() throws Exception {
  JobConf job = new JobConf(conf);

  Reporter reporter = Reporter.NULL;

  Random random = new Random();
  long seed = random.nextLong();
  LOG.info("seed = "+seed);
  random.setSeed(seed);

  localFs.delete(workDir, true);

  FileInputFormat.setInputPaths(job, workDir);

  final int length = 10000;
  final int numFiles = 10;

  // create a file with various lengths
  createFiles(length, numFiles, random);

  // create a combine split for the files
  InputFormat<IntWritable, BytesWritable> format =
    new CombineSequenceFileInputFormat<IntWritable, BytesWritable>();
  IntWritable key = new IntWritable();
  BytesWritable value = new BytesWritable();
  for (int i = 0; i < 3; i++) {
    int numSplits =
      random.nextInt(length/(SequenceFile.SYNC_INTERVAL/20))+1;
    LOG.info("splitting: requesting = " + numSplits);
    InputSplit[] splits = format.getSplits(job, numSplits);
    LOG.info("splitting: got =        " + splits.length);

    // we should have a single split as the length is comfortably smaller than
    // the block size
    assertEquals("We got more than one splits!", 1, splits.length);
    InputSplit split = splits[0];
    assertEquals("It should be CombineFileSplit",
      CombineFileSplit.class, split.getClass());

    // check each split
    BitSet bits = new BitSet(length);
    RecordReader<IntWritable, BytesWritable> reader =
      format.getRecordReader(split, job, reporter);
    try {
      while (reader.next(key, value)) {
        assertFalse("Key in multiple partitions.", bits.get(key.get()));
        bits.set(key.get());
      }
    } finally {
      reader.close();
    }
    assertEquals("Some keys in no partition.", length, bits.cardinality());
  }
}
项目:hadoop    文件:TestCombineTextInputFormat.java   
@Test(timeout=10000)
public void testFormat() throws Exception {
  JobConf job = new JobConf(defaultConf);

  Random random = new Random();
  long seed = random.nextLong();
  LOG.info("seed = "+seed);
  random.setSeed(seed);

  localFs.delete(workDir, true);
  FileInputFormat.setInputPaths(job, workDir);

  final int length = 10000;
  final int numFiles = 10;

  createFiles(length, numFiles, random);

  // create a combined split for the files
  CombineTextInputFormat format = new CombineTextInputFormat();
  LongWritable key = new LongWritable();
  Text value = new Text();
  for (int i = 0; i < 3; i++) {
    int numSplits = random.nextInt(length/20)+1;
    LOG.info("splitting: requesting = " + numSplits);
    InputSplit[] splits = format.getSplits(job, numSplits);
    LOG.info("splitting: got =        " + splits.length);

    // we should have a single split as the length is comfortably smaller than
    // the block size
    assertEquals("We got more than one splits!", 1, splits.length);
    InputSplit split = splits[0];
    assertEquals("It should be CombineFileSplit",
      CombineFileSplit.class, split.getClass());

    // check the split
    BitSet bits = new BitSet(length);
    LOG.debug("split= " + split);
    RecordReader<LongWritable, Text> reader =
      format.getRecordReader(split, job, voidReporter);
    try {
      int count = 0;
      while (reader.next(key, value)) {
        int v = Integer.parseInt(value.toString());
        LOG.debug("read " + v);
        if (bits.get(v)) {
          LOG.warn("conflict with " + v +
                   " at position "+reader.getPos());
        }
        assertFalse("Key in multiple partitions.", bits.get(v));
        bits.set(v);
        count++;
      }
      LOG.info("splits="+split+" count=" + count);
    } finally {
      reader.close();
    }
    assertEquals("Some keys in no partition.", length, bits.cardinality());
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestCombineSequenceFileInputFormat.java   
@Test(timeout=10000)
public void testFormat() throws Exception {
  JobConf job = new JobConf(conf);

  Reporter reporter = Reporter.NULL;

  Random random = new Random();
  long seed = random.nextLong();
  LOG.info("seed = "+seed);
  random.setSeed(seed);

  localFs.delete(workDir, true);

  FileInputFormat.setInputPaths(job, workDir);

  final int length = 10000;
  final int numFiles = 10;

  // create a file with various lengths
  createFiles(length, numFiles, random);

  // create a combine split for the files
  InputFormat<IntWritable, BytesWritable> format =
    new CombineSequenceFileInputFormat<IntWritable, BytesWritable>();
  IntWritable key = new IntWritable();
  BytesWritable value = new BytesWritable();
  for (int i = 0; i < 3; i++) {
    int numSplits =
      random.nextInt(length/(SequenceFile.SYNC_INTERVAL/20))+1;
    LOG.info("splitting: requesting = " + numSplits);
    InputSplit[] splits = format.getSplits(job, numSplits);
    LOG.info("splitting: got =        " + splits.length);

    // we should have a single split as the length is comfortably smaller than
    // the block size
    assertEquals("We got more than one splits!", 1, splits.length);
    InputSplit split = splits[0];
    assertEquals("It should be CombineFileSplit",
      CombineFileSplit.class, split.getClass());

    // check each split
    BitSet bits = new BitSet(length);
    RecordReader<IntWritable, BytesWritable> reader =
      format.getRecordReader(split, job, reporter);
    try {
      while (reader.next(key, value)) {
        assertFalse("Key in multiple partitions.", bits.get(key.get()));
        bits.set(key.get());
      }
    } finally {
      reader.close();
    }
    assertEquals("Some keys in no partition.", length, bits.cardinality());
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestCombineTextInputFormat.java   
@Test(timeout=10000)
public void testFormat() throws Exception {
  JobConf job = new JobConf(defaultConf);

  Random random = new Random();
  long seed = random.nextLong();
  LOG.info("seed = "+seed);
  random.setSeed(seed);

  localFs.delete(workDir, true);
  FileInputFormat.setInputPaths(job, workDir);

  final int length = 10000;
  final int numFiles = 10;

  createFiles(length, numFiles, random);

  // create a combined split for the files
  CombineTextInputFormat format = new CombineTextInputFormat();
  LongWritable key = new LongWritable();
  Text value = new Text();
  for (int i = 0; i < 3; i++) {
    int numSplits = random.nextInt(length/20)+1;
    LOG.info("splitting: requesting = " + numSplits);
    InputSplit[] splits = format.getSplits(job, numSplits);
    LOG.info("splitting: got =        " + splits.length);

    // we should have a single split as the length is comfortably smaller than
    // the block size
    assertEquals("We got more than one splits!", 1, splits.length);
    InputSplit split = splits[0];
    assertEquals("It should be CombineFileSplit",
      CombineFileSplit.class, split.getClass());

    // check the split
    BitSet bits = new BitSet(length);
    LOG.debug("split= " + split);
    RecordReader<LongWritable, Text> reader =
      format.getRecordReader(split, job, voidReporter);
    try {
      int count = 0;
      while (reader.next(key, value)) {
        int v = Integer.parseInt(value.toString());
        LOG.debug("read " + v);
        if (bits.get(v)) {
          LOG.warn("conflict with " + v +
                   " at position "+reader.getPos());
        }
        assertFalse("Key in multiple partitions.", bits.get(v));
        bits.set(v);
        count++;
      }
      LOG.info("splits="+split+" count=" + count);
    } finally {
      reader.close();
    }
    assertEquals("Some keys in no partition.", length, bits.cardinality());
  }
}
项目:gemfirexd-oss    文件:EventInputFormatTest.java   
public void testEventInputFormat() throws Exception {
  getConnection();
  Connection conn = startNetserverAndGetLocalNetConnection();

  Statement st = conn.createStatement();
  st.execute("create hdfsstore myhdfs namenode 'localhost' homedir '" + HDFS_DIR + "' batchtimeinterval 5000 milliseconds");
  st.execute("create table app.mytab1 (col1 int primary key, col2 varchar(100)) persistent hdfsstore (myhdfs) BUCKETS 1");

  PreparedStatement ps = conn.prepareStatement("insert into mytab1 values (?, ?)");
  int NUM_ENTRIES = 20;
  for(int i = 0; i < NUM_ENTRIES; i++) {
    ps.setInt(1, i);
    ps.setString(2, "Value-" + System.nanoTime());
    ps.execute();
  }
  //Wait for data to get to HDFS...
  String qname = HDFSStoreFactoryImpl.getEventQueueName("/APP/MYTAB1");
  st.execute("CALL SYS.WAIT_FOR_SENDER_QUEUE_FLUSH('" + qname + "', 1, 0)");

  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(conf);
  FileStatus[] list = fs.listStatus(new Path(HDFS_DIR + "/APP_MYTAB1/0/"));
  assertEquals(1, list.length);

  conf.set(RowInputFormat.INPUT_TABLE, "MYTAB1");
  conf.set(RowInputFormat.HOME_DIR, HDFS_DIR);

  JobConf job = new JobConf(conf);
  job.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
  RowInputFormat ipformat = new RowInputFormat();
  InputSplit[] splits = ipformat.getSplits(job, 2);
  assertEquals(1, splits.length);
  CombineFileSplit split = (CombineFileSplit) splits[0];
  assertEquals(1, split.getPaths().length);
  assertEquals(list[0].getPath().toString(), split.getPath(0).toString());
  assertEquals(0, split.getOffset(0));
  assertEquals(list[0].getLen(), split.getLength(0));

  RecordReader<Key, Row> rr = ipformat.getRecordReader(split, job, null);
  Key key = rr.createKey();
  Row value = rr.createValue();

  int count = 0;
  while (rr.next(key, value)) {
    assertEquals(count++, value.getRowAsResultSet().getInt("col1"));
  }

  assertEquals(20, count);

  TestUtil.shutDown();
}
项目:gemfirexd-oss    文件:EventInputFormatTest.java   
public void testNoSecureHdfsCheck() throws Exception {
  getConnection();
  Connection conn = startNetserverAndGetLocalNetConnection();


  Statement st = conn.createStatement();
  st.execute("create hdfsstore myhdfs namenode 'localhost' homedir '" + HDFS_DIR + "'  batchtimeinterval 5000 milliseconds");
  st.execute("create table app.mytab1 (col1 int primary key, col2 varchar(100)) persistent hdfsstore (myhdfs) BUCKETS 1");

  PreparedStatement ps = conn.prepareStatement("insert into mytab1 values (?, ?)");
  int NUM_ENTRIES = 20;
  for(int i = 0; i < NUM_ENTRIES; i++) {
    ps.setInt(1, i);
    ps.setString(2, "Value-" + System.nanoTime());
    ps.execute();
  }
  //Wait for data to get to HDFS...
  String qname = HDFSStoreFactoryImpl.getEventQueueName("/APP/MYTAB1");
  st.execute("CALL SYS.WAIT_FOR_SENDER_QUEUE_FLUSH('" + qname + "', 1, 0)");

  stopNetServer();
  FabricServiceManager.currentFabricServiceInstance().stop(new Properties());

  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(conf);
  FileStatus[] list = fs.listStatus(new Path(HDFS_DIR + "/APP_MYTAB1/0/"));
  assertEquals(1, list.length);

  conf.set(RowInputFormat.INPUT_TABLE, "MYTAB1");
  conf.set(RowInputFormat.HOME_DIR, HDFS_DIR);
  conf.set("hadoop.security.authentication", "kerberos");

  JobConf job = new JobConf(conf);
  job.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
  RowInputFormat ipformat = new RowInputFormat();
  InputSplit[] splits = ipformat.getSplits(job, 2);
  assertEquals(1, splits.length);
  CombineFileSplit split = (CombineFileSplit) splits[0];
  assertEquals(1, split.getPaths().length);
  assertEquals(list[0].getPath().toString(), split.getPath(0).toString());
  assertEquals(0, split.getOffset(0));
  assertEquals(list[0].getLen(), split.getLength(0));

  RecordReader<Key, Row> rr = ipformat.getRecordReader(split, job, null);
  Key key = rr.createKey();
  Row value = rr.createValue();

  int count = 0;
  while (rr.next(key, value)) {
    assertEquals(count++, value.getRowAsResultSet().getInt("col1"));
  }

  assertEquals(20, count);

  TestUtil.shutDown();
}
项目:gemfirexd-oss    文件:TestDataHelper.java   
private static InputSplit getCombineSplit(Path path, long l, long m, String[] strings) {
  Path[] paths = {path};
  long[] offsets = {l};
  long[] lengths = {m};
  return new CombineFileSplit(null, paths, offsets, lengths, strings);
}
项目:gemfirexd-oss    文件:TestGemFireXDManager.java   
@Override
public byte[] populateUserData(CombineFileSplit fileSplit) throws IOException {
  return null;
}
项目:gemfirexd-oss    文件:GFXDHiveInputFormat.java   
@Override
protected InputSplit getSplit(CombineFileSplit split) {
  return new GFXDHiveSplit(split);
}
项目:gemfirexd-oss    文件:GFXDHiveSplit.java   
public GFXDHiveSplit(CombineFileSplit fileSplit) {
  this.combineFileSplit = fileSplit;
}
项目:gemfirexd-oss    文件:GFXDHiveSplit.java   
public void readFields(DataInput in) throws IOException {
  this.combineFileSplit  = new CombineFileSplit();
  this.combineFileSplit.readFields(in);
}
项目:gemfirexd-oss    文件:MapRedRowRecordReader.java   
protected Path[] getSplitPaths(InputSplit split){
  CombineFileSplit combineSplit = (CombineFileSplit) split;
  return combineSplit.getPaths();
}
项目:gemfirexd-oss    文件:MapRedRowRecordReader.java   
protected long[] getStartOffsets(InputSplit split){
  CombineFileSplit combineSplit = (CombineFileSplit) split;
  return combineSplit.getStartOffsets();
}
项目:gemfirexd-oss    文件:MapRedRowRecordReader.java   
protected long[] getLengths(InputSplit split){
  CombineFileSplit combineSplit = (CombineFileSplit) split;
  return combineSplit.getLengths();
}
项目:gemfirexd-oss    文件:RowInputFormat.java   
protected InputSplit getSplit(CombineFileSplit split) {
    return split;
}
项目:gemfirexd-oss    文件:RowInputFormat.java   
protected Path[] getSplitPaths(InputSplit split){
  CombineFileSplit combineSplit = (CombineFileSplit) split;
  return combineSplit.getPaths();
}
项目:big-c    文件:TestCombineSequenceFileInputFormat.java   
@Test(timeout=10000)
public void testFormat() throws Exception {
  JobConf job = new JobConf(conf);

  Reporter reporter = Reporter.NULL;

  Random random = new Random();
  long seed = random.nextLong();
  LOG.info("seed = "+seed);
  random.setSeed(seed);

  localFs.delete(workDir, true);

  FileInputFormat.setInputPaths(job, workDir);

  final int length = 10000;
  final int numFiles = 10;

  // create a file with various lengths
  createFiles(length, numFiles, random);

  // create a combine split for the files
  InputFormat<IntWritable, BytesWritable> format =
    new CombineSequenceFileInputFormat<IntWritable, BytesWritable>();
  IntWritable key = new IntWritable();
  BytesWritable value = new BytesWritable();
  for (int i = 0; i < 3; i++) {
    int numSplits =
      random.nextInt(length/(SequenceFile.SYNC_INTERVAL/20))+1;
    LOG.info("splitting: requesting = " + numSplits);
    InputSplit[] splits = format.getSplits(job, numSplits);
    LOG.info("splitting: got =        " + splits.length);

    // we should have a single split as the length is comfortably smaller than
    // the block size
    assertEquals("We got more than one splits!", 1, splits.length);
    InputSplit split = splits[0];
    assertEquals("It should be CombineFileSplit",
      CombineFileSplit.class, split.getClass());

    // check each split
    BitSet bits = new BitSet(length);
    RecordReader<IntWritable, BytesWritable> reader =
      format.getRecordReader(split, job, reporter);
    try {
      while (reader.next(key, value)) {
        assertFalse("Key in multiple partitions.", bits.get(key.get()));
        bits.set(key.get());
      }
    } finally {
      reader.close();
    }
    assertEquals("Some keys in no partition.", length, bits.cardinality());
  }
}
项目:big-c    文件:TestCombineTextInputFormat.java   
@Test(timeout=10000)
public void testFormat() throws Exception {
  JobConf job = new JobConf(defaultConf);

  Random random = new Random();
  long seed = random.nextLong();
  LOG.info("seed = "+seed);
  random.setSeed(seed);

  localFs.delete(workDir, true);
  FileInputFormat.setInputPaths(job, workDir);

  final int length = 10000;
  final int numFiles = 10;

  createFiles(length, numFiles, random);

  // create a combined split for the files
  CombineTextInputFormat format = new CombineTextInputFormat();
  LongWritable key = new LongWritable();
  Text value = new Text();
  for (int i = 0; i < 3; i++) {
    int numSplits = random.nextInt(length/20)+1;
    LOG.info("splitting: requesting = " + numSplits);
    InputSplit[] splits = format.getSplits(job, numSplits);
    LOG.info("splitting: got =        " + splits.length);

    // we should have a single split as the length is comfortably smaller than
    // the block size
    assertEquals("We got more than one splits!", 1, splits.length);
    InputSplit split = splits[0];
    assertEquals("It should be CombineFileSplit",
      CombineFileSplit.class, split.getClass());

    // check the split
    BitSet bits = new BitSet(length);
    LOG.debug("split= " + split);
    RecordReader<LongWritable, Text> reader =
      format.getRecordReader(split, job, voidReporter);
    try {
      int count = 0;
      while (reader.next(key, value)) {
        int v = Integer.parseInt(value.toString());
        LOG.debug("read " + v);
        if (bits.get(v)) {
          LOG.warn("conflict with " + v +
                   " at position "+reader.getPos());
        }
        assertFalse("Key in multiple partitions.", bits.get(v));
        bits.set(v);
        count++;
      }
      LOG.info("splits="+split+" count=" + count);
    } finally {
      reader.close();
    }
    assertEquals("Some keys in no partition.", length, bits.cardinality());
  }
}
项目:emr-dynamodb-connector    文件:ImportInputFormat.java   
/**
 * This method retrieves the URLs of all S3 files and generates input splits by combining
 * multiple S3 URLs into one split.
 *
 * @return a list of input splits. The length of this list may not be exactly the same as
 * <code>numSplits</code>. For example, if numSplits is larger than MAX_NUM_SPLITS or the number
 * of S3 files, then numSplits is ignored. Furthermore, not all input splits contain the same
 * number of S3 files. For example, with five S3 files {s1, s2, s3, s4, s5} and numSplits = 3,
 * this method returns a list of three input splits: {s1, s2}, {s3, s4} and {s5}.
 */
private List<InputSplit> readEntries(JsonReader reader, JobConf job) throws IOException {
  List<Path> paths = new ArrayList<Path>();
  Gson gson = DynamoDBUtil.getGson();

  reader.beginArray();
  while (reader.hasNext()) {
    ExportManifestEntry entry = gson.fromJson(reader, ExportManifestEntry.class);
    paths.add(new Path(entry.url));
  }
  reader.endArray();
  log.info("Number of S3 files: " + paths.size());

  if (paths.size() == 0) {
    return Collections.emptyList();
  }

  int filesPerSplit = (int) Math.ceil((double) (paths.size()) / Math.min(MAX_NUM_SPLITS, paths
      .size()));
  int numSplits = (int) Math.ceil((double) (paths.size()) / filesPerSplit);

  long[] fileMaxLengths = new long[filesPerSplit];
  Arrays.fill(fileMaxLengths, Long.MAX_VALUE / filesPerSplit);

  long[] fileStarts = new long[filesPerSplit];
  Arrays.fill(fileStarts, 0);

  List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
  for (int i = 0; i < numSplits; i++) {
    int start = filesPerSplit * i;
    int end = filesPerSplit * (i + 1);
    if (i == (numSplits - 1)) {
      end = paths.size();
    }
    Path[] pathsInOneSplit = paths.subList(start, end).toArray(new Path[end - start]);
    CombineFileSplit combineFileSplit = new CombineFileSplit(job, pathsInOneSplit, fileStarts,
        fileMaxLengths, new String[0]);
    splits.add(combineFileSplit);
  }

  return splits;
}