Java 类org.apache.hadoop.mapreduce.InputSplit 实例源码

项目:angel    文件:BalanceInputFormat.java   
private void addCreatedSplit(List<InputSplit> splitList,
  Collection<String> locations,
  ArrayList<OneBlockInfo> validBlocks) {
  // create an input split
  Path[] fl = new Path[validBlocks.size()];
  long[] offset = new long[validBlocks.size()];
  long[] length = new long[validBlocks.size()];
  for (int i = 0; i < validBlocks.size(); i++) {
    fl[i] = validBlocks.get(i).onepath;
    offset[i] = validBlocks.get(i).offset;
    length[i] = validBlocks.get(i).length;
  }
  // add this split to the list that is returned
  CombineFileSplit thissplit = new CombineFileSplit(fl, offset,
    length, locations.toArray(new String[0]));
  splitList.add(thissplit);
}
项目:Wikipedia-Index    文件:XmlInputFormat.java   
/**
 * 初始化读取资源以及相关的参数也可以放到initialize()方法中去执行
 * @param inputSplit
 * @param context
 * @throws IOException
 */
public XMLRecordReader(InputSplit inputSplit, Configuration context) throws IOException {
    /**
     * 获取开传入的开始和结束标签
     */
    startTag = context.get(START_TAG_KEY).getBytes("UTF-8");
    endTag = context.get(END_TAG_KEY).getBytes("UTF-8");
    FileSplit fileSplit = (FileSplit) inputSplit;
    /**
     * 获取分片的开始位置和结束的位置
     */
    start = fileSplit.getStart();
    end = start + fileSplit.getLength();
    Path file = fileSplit.getPath();
    FileSystem fs = file.getFileSystem(context);
    /**
     * 根据分片打开一个HDFS的文件输入流
     */
    fsin = fs.open(fileSplit.getPath());
    /**
     * 定位到分片开始的位置
     */
    fsin.seek(start);
}
项目:circus-train    文件:DynamicInputFormat.java   
private List<InputSplit> createSplits(JobContext jobContext, List<DynamicInputChunk> chunks) throws IOException {
  int numMaps = getNumMapTasks(jobContext.getConfiguration());

  final int nSplits = Math.min(numMaps, chunks.size());
  List<InputSplit> splits = new ArrayList<>(nSplits);

  for (int i = 0; i < nSplits; ++i) {
    TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
    chunks.get(i).assignTo(taskId);
    splits.add(new FileSplit(chunks.get(i).getPath(), 0,
        // Setting non-zero length for FileSplit size, to avoid a possible
        // future when 0-sized file-splits are considered "empty" and skipped
        // over.
        getMinRecordsPerChunk(jobContext.getConfiguration()), null));
  }
  ConfigurationUtil.publish(jobContext.getConfiguration(), CONF_LABEL_NUM_SPLITS, splits.size());
  return splits;
}
项目:easyhbase    文件:WdTableInputFormat.java   
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
  List<InputSplit> allSplits = new ArrayList<InputSplit>();
  Scan originalScan = getScan();

  Scan[] scans = rowKeyDistributor.getDistributedScans(originalScan);

  for (Scan scan : scans) {
    // Internally super.getSplits(...) uses scan object stored in private variable,
    // to re-use the code of super class we switch scan object with scans we
    setScan(scan);
    List<InputSplit> splits = super.getSplits(context);
    allSplits.addAll(splits);
  }

  // Setting original scan back
  setScan(originalScan);

  return allSplits;
}
项目:aliyun-maxcompute-data-collectors    文件:NetezzaDataDrivenDBInputFormat.java   
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
  int numMappers = ConfigurationHelper.getJobNumMaps(job);

  String boundaryQuery = getDBConf().getInputBoundingQuery();
  // Resort to base class if
  // dataslice aligned import is not requested
  // Not table extract
  // No boundary query
  // Only one mapper.
  if (!getConf().getBoolean(
      NetezzaManager.NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, false)
      || getDBConf().getInputTableName() == null
      || numMappers == 1
      || (boundaryQuery != null && !boundaryQuery.isEmpty())) {
    return super.getSplits(job);
  }

  // Generate a splitter that splits only on datasliceid. It is an
  // integer split. We will just use the lower bounding query to specify
  // the restriction of dataslice and set the upper bound to a constant

  NetezzaDBDataSliceSplitter splitter = new NetezzaDBDataSliceSplitter();

  return splitter.split(getConf(), null, null);
}
项目:hadoop    文件:TestFileInputFormat.java   
@Test
public void testSplitLocationInfo() throws Exception {
  Configuration conf = getConfiguration();
  conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
      "test:///a1/a2");
  Job job = Job.getInstance(conf);
  TextInputFormat fileInputFormat = new TextInputFormat();
  List<InputSplit> splits = fileInputFormat.getSplits(job);
  String[] locations = splits.get(0).getLocations();
  Assert.assertEquals(2, locations.length);
  SplitLocationInfo[] locationInfo = splits.get(0).getLocationInfo();
  Assert.assertEquals(2, locationInfo.length);
  SplitLocationInfo localhostInfo = locations[0].equals("localhost") ?
      locationInfo[0] : locationInfo[1];
  SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ?
      locationInfo[0] : locationInfo[1];
  Assert.assertTrue(localhostInfo.isOnDisk());
  Assert.assertTrue(localhostInfo.isInMemory());
  Assert.assertTrue(otherhostInfo.isOnDisk());
  Assert.assertFalse(otherhostInfo.isInMemory());
}
项目:aliyun-maxcompute-data-collectors    文件:ExportInputFormat.java   
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
  // Set the max split size based on the number of map tasks we want.
  long numTasks = getNumMapTasks(job);
  long numFileBytes = getJobSize(job);
  long maxSplitSize = numFileBytes / numTasks;

  setMaxSplitSize(maxSplitSize);

  LOG.debug("Target numMapTasks=" + numTasks);
  LOG.debug("Total input bytes=" + numFileBytes);
  LOG.debug("maxSplitSize=" + maxSplitSize);

  List<InputSplit> splits =  super.getSplits(job);

  if (LOG.isDebugEnabled()) {
    LOG.debug("Generated splits:");
    for (InputSplit split : splits) {
      LOG.debug("  " + split);
    }
  }
  return splits;
}
项目:aliyun-maxcompute-data-collectors    文件:NetezzaDBDataSliceSplitter.java   
@Override
public List<InputSplit> split(Configuration conf, ResultSet results,
    String colName) {
  // For each map we will add a split such that
  // the datasliceid % the mapper index equals the mapper index.
  // The query will only be on the lower bound where clause.
  // For upper bounds, we will specify a constant clause which always
  // evaluates to true

  int numSplits = ConfigurationHelper.getConfNumMaps(conf);
  List<InputSplit> splitList = new ArrayList<InputSplit>(numSplits);
  for (int i = 0; i < numSplits; ++i) {
    StringBuilder lowerBoundClause = new StringBuilder(128);
    lowerBoundClause.append(" datasliceid % ").append(numSplits)
        .append(" = ").append(i);
    splitList.add(new DataDrivenDBInputSplit(lowerBoundClause.toString(),
        "1 = 1"));
  }
  return splitList;
}
项目:hadoop    文件:DistSum.java   
/** Partitions the summation into parts and then return them as splits */
@Override
public List<InputSplit> getSplits(JobContext context) {
  //read sigma from conf
  final Configuration conf = context.getConfiguration();
  final Summation sigma = SummationWritable.read(DistSum.class, conf); 
  final int nParts = conf.getInt(N_PARTS, 0);

  //create splits
  final List<InputSplit> splits = new ArrayList<InputSplit>(nParts);
  final Summation[] parts = sigma.partition(nParts);
  for(int i = 0; i < parts.length; ++i) {
    splits.add(new SummationSplit(parts[i]));
    //LOG.info("parts[" + i + "] = " + parts[i]);
  }
  return splits;
}
项目:hadoop    文件:CompositeInputSplit.java   
/**
 * {@inheritDoc}
 * @throws IOException If the child InputSplit cannot be read, typically
 *                     for failing access checks.
 */
@SuppressWarnings("unchecked")  // Generic array assignment
public void readFields(DataInput in) throws IOException {
  int card = WritableUtils.readVInt(in);
  if (splits == null || splits.length != card) {
    splits = new InputSplit[card];
  }
  Class<? extends InputSplit>[] cls = new Class[card];
  try {
    for (int i = 0; i < card; ++i) {
      cls[i] =
        Class.forName(Text.readString(in)).asSubclass(InputSplit.class);
    }
    for (int i = 0; i < card; ++i) {
      splits[i] = ReflectionUtils.newInstance(cls[i], null);
      SerializationFactory factory = new SerializationFactory(conf);
      Deserializer deserializer = factory.getDeserializer(cls[i]);
      deserializer.open((DataInputStream)in);
      splits[i] = (InputSplit)deserializer.deserialize(splits[i]);
    }
  } catch (ClassNotFoundException e) {
    throw new IOException("Failed split init", e);
  }
}
项目:hadoop    文件:TeraScheduler.java   
/**
 * Solve the schedule and modify the FileSplit array to reflect the new
 * schedule. It will move placed splits to front and unplacable splits
 * to the end.
 * @return a new list of FileSplits that are modified to have the
 *    best host as the only host.
 * @throws IOException
 */
public List<InputSplit> getNewFileSplits() throws IOException {
  solve();
  FileSplit[] result = new FileSplit[realSplits.length];
  int left = 0;
  int right = realSplits.length - 1;
  for(int i=0; i < splits.length; ++i) {
    if (splits[i].isAssigned) {
      // copy the split and fix up the locations
      String[] newLocations = {splits[i].locations.get(0).hostname};
      realSplits[i] = new FileSplit(realSplits[i].getPath(),
          realSplits[i].getStart(), realSplits[i].getLength(), newLocations);
      result[left++] = realSplits[i];
    } else {
      result[right--] = realSplits[i];
    }
  }
  List<InputSplit> ret = new ArrayList<InputSplit>();
  for (FileSplit fs : result) {
    ret.add(fs);
  }
  return ret;
}
项目:ditb    文件:IntegrationTestBulkLoad.java   
@Override
public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
  TaskAttemptContext context)
      throws IOException, InterruptedException {
  int taskId = context.getTaskAttemptID().getTaskID().getId();
  int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
  int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
  int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);

  taskId = taskId + iteration * numMapTasks;
  numMapTasks = numMapTasks * numIterations;

  long chainId = Math.abs(new Random().nextLong());
  chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations
  LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};

  return new FixedRecordReader<LongWritable, LongWritable>(keys, keys);
}
项目:aliyun-maxcompute-data-collectors    文件:OdpsExportMapper.java   
public void map(LongWritable key, Record val, Context context) 
    throws IOException, InterruptedException{
  try {
    odpsImpl.parse(val);
    context.write(odpsImpl, NullWritable.get());
  } catch (Exception e) {
    LOG.error("Exception raised during data export");
    LOG.error("Exception: ", e);

    LOG.error("On input: " + val);
    LOG.error("At position " + key);
    InputSplit is = context.getInputSplit();
    LOG.error("");
    LOG.error("Currently processing split:");
    LOG.error(is);
    LOG.error("");
    LOG.error("This issue might not necessarily be caused by current input");
    LOG.error("due to the batching nature of export.");
    LOG.error("");
    throw new IOException("Can't export data, please check failed map task logs", e);
  }
}
项目:hadoop    文件:TestSleepJob.java   
private void testRandomLocation(int locations, int njobs,
                                UserGroupInformation ugi) throws Exception {
  Configuration configuration = new Configuration();

  DebugJobProducer jobProducer = new DebugJobProducer(njobs, configuration);
  Configuration jconf = GridmixTestUtils.mrvl.getConfig();
  jconf.setInt(JobCreator.SLEEPJOB_RANDOM_LOCATIONS, locations);

  JobStory story;
  int seq = 1;
  while ((story = jobProducer.getNextJob()) != null) {
    GridmixJob gridmixJob = JobCreator.SLEEPJOB.createGridmixJob(jconf, 0,
            story, new Path("ignored"), ugi, seq++);
    gridmixJob.buildSplits(null);
    List<InputSplit> splits = new SleepJob.SleepInputFormat()
            .getSplits(gridmixJob.getJob());
    for (InputSplit split : splits) {
      assertEquals(locations, split.getLocations().length);
    }
  }
  jobProducer.close();
}
项目:hadoop    文件:TestFileInputFormat.java   
@Test
public void testNumInputFilesRecursively() throws Exception {
  Configuration conf = getConfiguration();
  conf.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
  conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
  Job job = Job.getInstance(conf);
  FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
  List<InputSplit> splits = fileInputFormat.getSplits(job);
  Assert.assertEquals("Input splits are not correct", 3, splits.size());
  verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3",
      "test:/a1/file1"), splits);

  // Using the deprecated configuration
  conf = getConfiguration();
  conf.set("mapred.input.dir.recursive", "true");
  job = Job.getInstance(conf);
  splits = fileInputFormat.getSplits(job);
  verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3",
      "test:/a1/file1"), splits);
}
项目:aliyun-maxcompute-data-collectors    文件:TestMainframeDatasetInputFormat.java   
@Test
public void testRetrieveDatasets() throws IOException {
  JobConf conf = new JobConf();
  conf.set(DBConfiguration.URL_PROPERTY, "localhost:12345");
  conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
  conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
  // set the password in the secure credentials object
  Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
  conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
      "pssword".getBytes());

  String dsName = "dsName1";
  conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME, dsName);
  Job job = new Job(conf);
  ConfigurationHelper.setJobNumMaps(job, 2);
  //format.getSplits(job);

  List<InputSplit> splits = new ArrayList<InputSplit>();
  splits = ((MainframeDatasetInputFormat<SqoopRecord>) format).getSplits(job);
  Assert.assertEquals("test1", ((MainframeDatasetInputSplit) splits.get(0))
      .getNextDataset().toString());
  Assert.assertEquals("test2", ((MainframeDatasetInputSplit) splits.get(1))
      .getNextDataset().toString());
}
项目:ditb    文件:TestWALRecordReader.java   
/**
 * Create a new reader from the split, and match the edits against the passed columns.
 */
private void testSplit(InputSplit split, byte[]... columns) throws Exception {
  final WALRecordReader reader = getReader();
  reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));

  for (byte[] column : columns) {
    assertTrue(reader.nextKeyValue());
    Cell cell = reader.getCurrentValue().getCells().get(0);
    if (!Bytes.equals(column, cell.getQualifier())) {
      assertTrue("expected [" + Bytes.toString(column) + "], actual ["
          + Bytes.toString(cell.getQualifier()) + "]", false);
    }
  }
  assertFalse(reader.nextKeyValue());
  reader.close();
}
项目:hadoop    文件:TestInputSampler.java   
@Override
public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job,
    int numSplits) throws IOException {
  List<InputSplit> splits = null;
  try {
    splits = getSplits(Job.getInstance(job));
  } catch (InterruptedException e) {
    throw new IOException(e);
  }
  org.apache.hadoop.mapred.InputSplit[] retVals =
      new org.apache.hadoop.mapred.InputSplit[splits.size()];
  for (int i = 0; i < splits.size(); ++i) {
    MapredSequentialSplit split = new MapredSequentialSplit(
        ((SequentialSplit) splits.get(i)).getInit());
    retVals[i] = split;
  }
  return retVals;
}
项目:ditb    文件:ExportSnapshot.java   
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
  Configuration conf = context.getConfiguration();
  Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
  FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);

  List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
  int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
  if (mappers == 0 && snapshotFiles.size() > 0) {
    mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
    mappers = Math.min(mappers, snapshotFiles.size());
    conf.setInt(CONF_NUM_SPLITS, mappers);
    conf.setInt(MR_NUM_MAPS, mappers);
  }

  List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
  List<InputSplit> splits = new ArrayList(groups.size());
  for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
    splits.add(new ExportSnapshotInputSplit(files));
  }
  return splits;
}
项目:hadoop    文件:GenerateDistCacheData.java   
@Override
public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
  final JobConf jobConf = new JobConf(jobCtxt.getConfiguration());
  final JobClient client = new JobClient(jobConf);
  ClusterStatus stat = client.getClusterStatus(true);
  int numTrackers = stat.getTaskTrackers();
  final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1);

  // Total size of distributed cache files to be generated
  final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1);
  // Get the path of the special file
  String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST);
  if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) {
    throw new RuntimeException("Invalid metadata: #files (" + fileCount
        + "), total_size (" + totalSize + "), filelisturi ("
        + distCacheFileList + ")");
  }

  Path sequenceFile = new Path(distCacheFileList);
  FileSystem fs = sequenceFile.getFileSystem(jobConf);
  FileStatus srcst = fs.getFileStatus(sequenceFile);
  // Consider the number of TTs * mapSlotsPerTracker as number of mappers.
  int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2);
  int numSplits = numTrackers * numMapSlotsPerTracker;

  List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
  LongWritable key = new LongWritable();
  BytesWritable value = new BytesWritable();

  // Average size of data to be generated by each map task
  final long targetSize = Math.max(totalSize / numSplits,
                            DistributedCacheEmulator.AVG_BYTES_PER_MAP);
  long splitStartPosition = 0L;
  long splitEndPosition = 0L;
  long acc = 0L;
  long bytesRemaining = srcst.getLen();
  SequenceFile.Reader reader = null;
  try {
    reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
    while (reader.next(key, value)) {

      // If adding this file would put this split past the target size,
      // cut the last split and put this file in the next split.
      if (acc + key.get() > targetSize && acc != 0) {
        long splitSize = splitEndPosition - splitStartPosition;
        splits.add(new FileSplit(
            sequenceFile, splitStartPosition, splitSize, (String[])null));
        bytesRemaining -= splitSize;
        splitStartPosition = splitEndPosition;
        acc = 0L;
      }
      acc += key.get();
      splitEndPosition = reader.getPosition();
    }
  } finally {
    if (reader != null) {
      reader.close();
    }
  }
  if (bytesRemaining != 0) {
    splits.add(new FileSplit(
        sequenceFile, splitStartPosition, bytesRemaining, (String[])null));
  }

  return splits;
}
项目:hadoop    文件:CombineFileInputFormat.java   
/**
 * Create a single split from the list of blocks specified in validBlocks
 * Add this new split into splitList.
 */
private void addCreatedSplit(List<InputSplit> splitList, 
                             Collection<String> locations, 
                             ArrayList<OneBlockInfo> validBlocks) {
  // create an input split
  Path[] fl = new Path[validBlocks.size()];
  long[] offset = new long[validBlocks.size()];
  long[] length = new long[validBlocks.size()];
  for (int i = 0; i < validBlocks.size(); i++) {
    fl[i] = validBlocks.get(i).onepath; 
    offset[i] = validBlocks.get(i).offset;
    length[i] = validBlocks.get(i).length;
  }
   // add this split to the list that is returned
  CombineFileSplit thissplit = new CombineFileSplit(fl, offset, 
                                 length, locations.toArray(new String[0]));
  splitList.add(thissplit); 
}
项目:hadoop    文件:BaileyBorweinPlouffe.java   
/** {@inheritDoc} */
public List<InputSplit> getSplits(JobContext context) {
  //get the property values
  final int startDigit = context.getConfiguration().getInt(
      DIGIT_START_PROPERTY, 1);
  final int nDigits = context.getConfiguration().getInt(
      DIGIT_SIZE_PROPERTY, 100);
  final int nMaps = context.getConfiguration().getInt(
      DIGIT_PARTS_PROPERTY, 1);

  //create splits
  final List<InputSplit> splits = new ArrayList<InputSplit>(nMaps);
  final int[] parts = partition(startDigit - 1, nDigits, nMaps);
  for (int i = 0; i < parts.length; ++i) {
    final int k = i < parts.length - 1 ? parts[i+1]: nDigits+startDigit-1;
    splits.add(new BbpSplit(i, parts[i], k - parts[i]));
  }
  return splits;
}
项目:ditb    文件:TestTableInputFormatScanBase.java   
/**
 * Tests a MR scan using data skew auto-balance
 *
 * @throws IOException
 * @throws ClassNotFoundException
 * @throws InterruptedException
 */
public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException,
        InterruptedException,
        ClassNotFoundException {
  String jobName = "TestJobForNumOfSplits";
  LOG.info("Before map/reduce startup - job " + jobName);
  Configuration c = new Configuration(TEST_UTIL.getConfiguration());
  Scan scan = new Scan();
  scan.addFamily(INPUT_FAMILY);
  c.set("hbase.mapreduce.input.autobalance", "true");
  c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio);
  c.set(KEY_STARTROW, "");
  c.set(KEY_LASTROW, "");
  Job job = new Job(c, jobName);
  TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
          ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
  TableInputFormat tif = new TableInputFormat();
  tif.setConf(job.getConfiguration());
  Assert.assertEquals(new String(TABLE_NAME), new String(table.getTableName()));
  List<InputSplit> splits = tif.getSplits(job);
  Assert.assertEquals(expectedNumOfSplits, splits.size());
}
项目:hadoop    文件:CombineFileRecordReaderWrapper.java   
public void initialize(InputSplit split, TaskAttemptContext context)
  throws IOException, InterruptedException {
  // it really should be the same file split at the time the wrapper instance
  // was created
  assert fileSplitIsValid(context);

  delegate.initialize(fileSplit, context);
}
项目:multiple-dimension-spread    文件:MDSSpreadReader.java   
@Override
public void initialize( final InputSplit inputSplit, final TaskAttemptContext context ) throws IOException, InterruptedException {
  FileSplit fileSplit = (FileSplit)inputSplit;
  Configuration config = context.getConfiguration();
  Path path = fileSplit.getPath();
  FileSystem fs = path.getFileSystem( config );
  long fileLength = fs.getLength( path );
  long start = fileSplit.getStart();
  long length = fileSplit.getLength();
  InputStream in = fs.open( path );
}
项目:angel    文件:BalanceInputFormat.java   
public void getMoreSplits(Configuration conf, List<FileStatus> stats,
  long maxSize, long minSizeNode, long minSizeRack,
  List<InputSplit> splits) throws IOException {
  // all blocks for all the files in input set
  OneFileInfo[] files;

  // mapping from a rack name to the list of blocks it has
  HashMap<String, List<OneBlockInfo>> rackToBlocks =
    new HashMap<String, List<OneBlockInfo>>();

  // mapping from a block to the nodes on which it has replicas
  HashMap<OneBlockInfo, String[]> blockToNodes =
    new HashMap<OneBlockInfo, String[]>();

  // mapping from a node to the list of blocks that it contains
  HashMap<String, Set<OneBlockInfo>> nodeToBlocks =
    new HashMap<String, Set<OneBlockInfo>>();

  files = new OneFileInfo[stats.size()];
  if (stats.size() == 0) {
    return;
  }

  // populate all the blocks for all files
  long totLength = 0;
  int i = 0;
  for (FileStatus stat : stats) {
    files[i] = new OneFileInfo(stat, conf, isSplitable(conf, stat.getPath()),
      rackToBlocks, blockToNodes, nodeToBlocks,
      rackToNodes, 256 * 1024 * 1024);
    totLength += files[i].getLength();
  }

  int partNum = (int) (totLength / maxSize);

  createSplitsGreedy(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
    partNum, minSizeNode, minSizeRack, splits);

}
项目:hadoop    文件:DistSum.java   
/** @return a list containing a single split of summation */
@Override
public List<InputSplit> getSplits(JobContext context) {
  //read sigma from conf
  final Configuration conf = context.getConfiguration();
  final Summation sigma = SummationWritable.read(DistSum.class, conf); 

  //create splits
  final List<InputSplit> splits = new ArrayList<InputSplit>(1);
  splits.add(new SummationSplit(sigma));
  return splits;
}
项目:hadoop    文件:TestSplitters.java   
@Test(timeout=2000)
public void testBooleanSplitter() throws Exception{
  BooleanSplitter splitter = new BooleanSplitter();
  ResultSet result = mock(ResultSet.class);
  when(result.getString(1)).thenReturn("result1");

  List<InputSplit> splits=splitter.split(configuration, result, "column");
  assertSplits(new String[] {"column = FALSE column = FALSE",
      "column IS NULL column IS NULL"}, splits);

  when(result.getString(1)).thenReturn("result1");
  when(result.getString(2)).thenReturn("result2");
  when(result.getBoolean(1)).thenReturn(true);
  when(result.getBoolean(2)).thenReturn(false);

  splits=splitter.split(configuration, result, "column");
  assertEquals(0, splits.size());

  when(result.getString(1)).thenReturn("result1");
  when(result.getString(2)).thenReturn("result2");
  when(result.getBoolean(1)).thenReturn(false);
  when(result.getBoolean(2)).thenReturn(true);

  splits = splitter.split(configuration, result, "column");
  assertSplits(new String[] {
      "column = FALSE column = FALSE", ".*column = TRUE"}, splits);
}
项目:BLASpark    文件:RowPerLineRecordReader.java   
@Override
public void initialize(final InputSplit inputSplit,
                       final TaskAttemptContext taskAttemptContext)
        throws IOException, InterruptedException {
    this.lrr = new LineRecordReader();
    this.lrr.initialize(inputSplit, taskAttemptContext);

}
项目:Wikipedia-Index    文件:XmlInputFormat.java   
@Override
public RecordReader<LongWritable, Text> createRecordReader(
        InputSplit inputSplit, TaskAttemptContext context) {
    try {
        return new XMLRecordReader(inputSplit, context.getConfiguration());
    } catch (IOException e) {
        return null;
    }
}
项目:spark-util    文件:ErrorHandlingAvroKeyRecordReader.java   
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
    FileSplit fileSplit = (FileSplit)inputSplit;
    if(fileSplit != null && fileSplit.getPath() != null && fileSplit.getPath().toString().endsWith(TEMP_FILE_SUFFIX)){
        LOG.info("Not processing Avro tmp file {}", fileSplit.getPath());
    }else {
        super.initialize(inputSplit, context);
    }
}
项目:circus-train    文件:UniformSizeInputFormat.java   
/**
 * Implementation of InputFormat::getSplits(). Returns a list of InputSplits, such that the number of bytes to be
 * copied for all the splits are approximately equal.
 *
 * @param context JobContext for the job.
 * @return The list of uniformly-distributed input-splits.
 * @throws IOException: On failure.
 * @throws InterruptedException
 */
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
  Configuration configuration = context.getConfiguration();
  int numSplits = ConfigurationUtil.getInt(configuration, MRJobConfig.NUM_MAPS);

  if (numSplits == 0) {
    return new ArrayList<>();
  }

  return getSplits(configuration, numSplits,
      ConfigurationUtil.getLong(configuration, S3MapReduceCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED));
}
项目:hadoop    文件:CompositeRecordReader.java   
@SuppressWarnings("unchecked")
public void initialize(InputSplit split, TaskAttemptContext context) 
    throws IOException, InterruptedException {
  if (kids != null) {
    for (int i = 0; i < kids.length; ++i) {
      kids[i].initialize(((CompositeInputSplit)split).get(i), context);
      if (kids[i].key() == null) {
        continue;
      }

      // get keyclass
      if (keyclass == null) {
        keyclass = kids[i].createKey().getClass().
          asSubclass(WritableComparable.class);
      }
      // create priority queue
      if (null == q) {
        cmp = WritableComparator.get(keyclass, conf);
        q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
              new Comparator<ComposableRecordReader<K,?>>() {
                public int compare(ComposableRecordReader<K,?> o1,
                                   ComposableRecordReader<K,?> o2) {
                  return cmp.compare(o1.key(), o2.key());
                }
              });
      }
      // Explicit check for key class agreement
      if (!keyclass.equals(kids[i].key().getClass())) {
        throw new ClassCastException("Child key classes fail to agree");
      }

      // add the kid to priority queue if it has any elements
      if (kids[i].hasNext()) {
        q.add(kids[i]);
      }
    }
  }
}
项目:hadoop    文件:WrappedRecordReader.java   
public void initialize(InputSplit split,
                       TaskAttemptContext context)
throws IOException, InterruptedException {
  rr.initialize(split, context);
  conf = context.getConfiguration();
  nextKeyValue();
  if (!empty) {
    keyclass = key.getClass().asSubclass(WritableComparable.class);
    valueclass = value.getClass();
    if (cmp == null) {
      cmp = WritableComparator.get(keyclass, conf);
    }
  }
}
项目:hadoop    文件:BooleanSplitter.java   
public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
    throws SQLException {

  List<InputSplit> splits = new ArrayList<InputSplit>();

  if (results.getString(1) == null && results.getString(2) == null) {
    // Range is null to null. Return a null split accordingly.
    splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
        colName + " IS NULL", colName + " IS NULL"));
    return splits;
  }

  boolean minVal = results.getBoolean(1);
  boolean maxVal = results.getBoolean(2);

  // Use one or two splits.
  if (!minVal) {
    splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
        colName + " = FALSE", colName + " = FALSE"));
  }

  if (maxVal) {
    splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
        colName + " = TRUE", colName + " = TRUE"));
  }

  if (results.getString(1) == null || results.getString(2) == null) {
    // Include a null value.
    splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
        colName + " IS NULL", colName + " IS NULL"));
  }

  return splits;
}
项目:hadoop    文件:FixedLengthRecordReader.java   
@Override
public void initialize(InputSplit genericSplit,
                       TaskAttemptContext context) throws IOException {
  FileSplit split = (FileSplit) genericSplit;
  Configuration job = context.getConfiguration();
  final Path file = split.getPath();
  initialize(job, split.getStart(), split.getLength(), file);
}
项目:hadoop    文件:JobSplitWriter.java   
@SuppressWarnings("unchecked")
private static <T extends InputSplit> 
SplitMetaInfo[] writeNewSplits(Configuration conf, 
    T[] array, FSDataOutputStream out)
throws IOException, InterruptedException {

  SplitMetaInfo[] info = new SplitMetaInfo[array.length];
  if (array.length != 0) {
    SerializationFactory factory = new SerializationFactory(conf);
    int i = 0;
    int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
        MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
    long offset = out.getPos();
    for(T split: array) {
      long prevCount = out.getPos();
      Text.writeString(out, split.getClass().getName());
      Serializer<T> serializer = 
        factory.getSerializer((Class<T>) split.getClass());
      serializer.open(out);
      serializer.serialize(split);
      long currCount = out.getPos();
      String[] locations = split.getLocations();
      if (locations.length > maxBlockLocations) {
        LOG.warn("Max block location exceeded for split: "
            + split + " splitsize: " + locations.length +
            " maxsize: " + maxBlockLocations);
        locations = Arrays.copyOf(locations, maxBlockLocations);
      }
      info[i++] = 
        new JobSplit.SplitMetaInfo( 
            locations, offset,
            split.getLength());
      offset += currCount - prevCount;
    }
  }
  return info;
}
项目:aliyun-maxcompute-data-collectors    文件:MySQLDumpInputFormat.java   
@Override
public void initialize(InputSplit split, TaskAttemptContext context) {
  DataDrivenDBInputFormat.DataDrivenDBInputSplit dbSplit =
      (DataDrivenDBInputFormat.DataDrivenDBInputSplit) split;

  this.clause = "(" + dbSplit.getLowerClause() + ") AND ("
      + dbSplit.getUpperClause() + ")";
}
项目:hadoop    文件:JobSplitWriter.java   
public static void createSplitFiles(Path jobSubmitDir, 
    Configuration conf, FileSystem fs, 
    org.apache.hadoop.mapred.InputSplit[] splits) 
throws IOException {
  FSDataOutputStream out = createFile(fs, 
      JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
  SplitMetaInfo[] info = writeOldSplits(splits, out, conf);
  out.close();
  writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), 
      new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
      info);
}
项目:aliyun-maxcompute-data-collectors    文件:MainframeDatasetFTPRecordReader.java   
@Override
public void initialize(InputSplit inputSplit,
    TaskAttemptContext taskAttemptContext)
    throws IOException, InterruptedException {
  super.initialize(inputSplit, taskAttemptContext);

  Configuration conf = getConfiguration();
  ftp = MainframeFTPClientUtils.getFTPConnection(conf);
  if (ftp != null) {
    String dsName
        = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
    ftp.changeWorkingDirectory("'" + dsName + "'");
  }
}