Java 类org.apache.hadoop.mapreduce.JobContext 实例源码

项目:hadoop    文件:GenerateData.java   
@Override
public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
  final JobClient client =
    new JobClient(new JobConf(jobCtxt.getConfiguration()));
  ClusterStatus stat = client.getClusterStatus(true);
  final long toGen =
    jobCtxt.getConfiguration().getLong(GRIDMIX_GEN_BYTES, -1);
  if (toGen < 0) {
    throw new IOException("Invalid/missing generation bytes: " + toGen);
  }
  final int nTrackers = stat.getTaskTrackers();
  final long bytesPerTracker = toGen / nTrackers;
  final ArrayList<InputSplit> splits = new ArrayList<InputSplit>(nTrackers);
  final Pattern trackerPattern = Pattern.compile("tracker_([^:]*):.*");
  final Matcher m = trackerPattern.matcher("");
  for (String tracker : stat.getActiveTrackerNames()) {
    m.reset(tracker);
    if (!m.find()) {
      System.err.println("Skipping node: " + tracker);
      continue;
    }
    final String name = m.group(1);
    splits.add(new GenSplit(bytesPerTracker, new String[] { name }));
  }
  return splits;
}
项目:circus-train    文件:DynamicInputFormat.java   
private List<InputSplit> createSplits(JobContext jobContext, List<DynamicInputChunk> chunks) throws IOException {
  int numMaps = getNumMapTasks(jobContext.getConfiguration());

  final int nSplits = Math.min(numMaps, chunks.size());
  List<InputSplit> splits = new ArrayList<>(nSplits);

  for (int i = 0; i < nSplits; ++i) {
    TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
    chunks.get(i).assignTo(taskId);
    splits.add(new FileSplit(chunks.get(i).getPath(), 0,
        // Setting non-zero length for FileSplit size, to avoid a possible
        // future when 0-sized file-splits are considered "empty" and skipped
        // over.
        getMinRecordsPerChunk(jobContext.getConfiguration()), null));
  }
  ConfigurationUtil.publish(jobContext.getConfiguration(), CONF_LABEL_NUM_SPLITS, splits.size());
  return splits;
}
项目:easyhbase    文件:WdTableInputFormat.java   
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
  List<InputSplit> allSplits = new ArrayList<InputSplit>();
  Scan originalScan = getScan();

  Scan[] scans = rowKeyDistributor.getDistributedScans(originalScan);

  for (Scan scan : scans) {
    // Internally super.getSplits(...) uses scan object stored in private variable,
    // to re-use the code of super class we switch scan object with scans we
    setScan(scan);
    List<InputSplit> splits = super.getSplits(context);
    allSplits.addAll(splits);
  }

  // Setting original scan back
  setScan(originalScan);

  return allSplits;
}
项目:aliyun-maxcompute-data-collectors    文件:OraOopDataDrivenDBInputFormat.java   
private int getDesiredNumberOfMappers(JobContext jobContext) {

    int desiredNumberOfMappers =
        jobContext.getConfiguration().getInt(
            OraOopConstants.ORAOOP_DESIRED_NUMBER_OF_MAPPERS, -1);

    int minMappersAcceptedByOraOop =
        OraOopUtilities.getMinNumberOfImportMappersAcceptedByOraOop(jobContext
            .getConfiguration());

    if (desiredNumberOfMappers < minMappersAcceptedByOraOop) {
      LOG.warn(String.format("%s should not be used to perform a sqoop import "
          + "when the number of mappers is %d\n "
          + "i.e. OraOopManagerFactory.accept() should only appect jobs "
          + "where the number of mappers is at least %d",
          OraOopConstants.ORAOOP_PRODUCT_NAME, desiredNumberOfMappers,
          minMappersAcceptedByOraOop));
    }

    return desiredNumberOfMappers;
  }
项目:hadoop    文件:FileOutputFormat.java   
public void checkOutputSpecs(JobContext job
                             ) throws FileAlreadyExistsException, IOException{
  // Ensure that the output directory is set and not already there
  Path outDir = getOutputPath(job);
  if (outDir == null) {
    throw new InvalidJobConfException("Output directory not set.");
  }

  // get delegation token for outDir's file system
  TokenCache.obtainTokensForNamenodes(job.getCredentials(),
      new Path[] { outDir }, job.getConfiguration());

  if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
    throw new FileAlreadyExistsException("Output directory " + outDir + 
                                         " already exists");
  }
}
项目:aliyun-maxcompute-data-collectors    文件:UpdateOutputFormat.java   
@Override
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context)
    throws IOException, InterruptedException {
  Configuration conf = context.getConfiguration();
  DBConfiguration dbConf = new DBConfiguration(conf);

  // Sanity check all the configuration values we need.
  if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
    throw new IOException("Database connection URL is not set.");
  } else if (null == dbConf.getOutputTableName()) {
    throw new IOException("Table name is not set for export.");
  } else if (null == dbConf.getOutputFieldNames()) {
    throw new IOException(
        "Output field names are null.");
  } else if (null == conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY)) {
    throw new IOException("Update key column is not set for export.");
  }
}
项目:aliyun-maxcompute-data-collectors    文件:ExportCallOutputFormat.java   
@Override
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context)
    throws IOException, InterruptedException {
  Configuration conf = context.getConfiguration();
  DBConfiguration dbConf = new DBConfiguration(conf);

  // Sanity check all the configuration values we need.
  if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
    throw new IOException("Database connection URL is not set.");
  } else if (null == dbConf.getOutputTableName()) {
    throw new IOException("Procedure name is not set for export");
  } else if (null == dbConf.getOutputFieldNames()
      && 0 == dbConf.getOutputFieldCount()) {
    throw new IOException(
        "Output field names are null and zero output field count set.");
  }
}
项目:aliyun-maxcompute-data-collectors    文件:SQLServerResilientUpdateOutputFormat.java   
@Override
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context)
    throws IOException, InterruptedException {
  Configuration conf = context.getConfiguration();
  DBConfiguration dbConf = new DBConfiguration(conf);

  // Sanity check all the configuration values we need.
  if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
    throw new IOException("Database connection URL is not set.");
  } else if (null == dbConf.getOutputTableName()) {
    throw new IOException("Table name is not set for export.");
  } else if (null == dbConf.getOutputFieldNames()) {
    throw new IOException(
        "Output field names are null.");
  } else if (null == conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY)) {
    throw new IOException("Update key column is not set for export.");
  }
}
项目:hadoop    文件:BaileyBorweinPlouffe.java   
/** {@inheritDoc} */
public List<InputSplit> getSplits(JobContext context) {
  //get the property values
  final int startDigit = context.getConfiguration().getInt(
      DIGIT_START_PROPERTY, 1);
  final int nDigits = context.getConfiguration().getInt(
      DIGIT_SIZE_PROPERTY, 100);
  final int nMaps = context.getConfiguration().getInt(
      DIGIT_PARTS_PROPERTY, 1);

  //create splits
  final List<InputSplit> splits = new ArrayList<InputSplit>(nMaps);
  final int[] parts = partition(startDigit - 1, nDigits, nMaps);
  for (int i = 0; i < parts.length; ++i) {
    final int k = i < parts.length - 1 ? parts[i+1]: nDigits+startDigit-1;
    splits.add(new BbpSplit(i, parts[i], k - parts[i]));
  }
  return splits;
}
项目:aliyun-maxcompute-data-collectors    文件:SQLServerResilientExportOutputFormat.java   
@Override
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context)
    throws IOException, InterruptedException {
  Configuration conf = context.getConfiguration();
  DBConfiguration dbConf = new DBConfiguration(conf);

  // Sanity check all the configuration values we need.
  if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
    throw new IOException("Database connection URL is not set.");
  } else if (null == dbConf.getOutputTableName()) {
    throw new IOException("Table name is not set for export");
  } else if (null == dbConf.getOutputFieldNames()
      && 0 == dbConf.getOutputFieldCount()) {
    throw new IOException(
        "Output field names are null and zero output field count set.");
  }
}
项目:hadoop    文件:FileOutputCommitter.java   
@Override
@Deprecated
public void cleanupJob(JobContext context) throws IOException {
  if (hasOutputPath()) {
    Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
    FileSystem fs = pendingJobAttemptsPath
        .getFileSystem(context.getConfiguration());
    fs.delete(pendingJobAttemptsPath, true);
  } else {
    LOG.warn("Output Path is null in cleanupJob()");
  }
}
项目:hadoop    文件:YarnOutputFiles.java   
/**
 * Create a local map output index file name on the same volume.
 */
public Path getOutputIndexFileForWriteInVolume(Path existing) {
  Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
  Path attemptOutputDir = new Path(outputDir,
      conf.get(JobContext.TASK_ATTEMPT_ID));
  return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING +
                                    MAP_OUTPUT_INDEX_SUFFIX_STRING);
}
项目:hadoop    文件:FileOutputCommitter.java   
/**
 * The job has completed so move all committed tasks to the final output dir.
 * Delete the temporary directory, including all of the work directories.
 * Create a _SUCCESS file to make it as successful.
 * @param context the job's context
 */
public void commitJob(JobContext context) throws IOException {
  if (hasOutputPath()) {
    Path finalOutput = getOutputPath();
    FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());

    if (algorithmVersion == 1) {
      for (FileStatus stat: getAllCommittedTaskPaths(context)) {
        mergePaths(fs, stat, finalOutput);
      }
    }

    // delete the _temporary folder and create a _done file in the o/p folder
    cleanupJob(context);
    // True if the job requires output.dir marked on successful job.
    // Note that by default it is set to true.
    if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
      Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
      fs.create(markerPath).close();
    }
  } else {
    LOG.warn("Output Path is null in commitJob()");
  }
}
项目:ditb    文件:CompactionTool.java   
/**
 * Returns a split for each store files directory using the block location
 * of each file as locality reference.
 */
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
  List<InputSplit> splits = new ArrayList<InputSplit>();
  List<FileStatus> files = listStatus(job);

  Text key = new Text();
  for (FileStatus file: files) {
    Path path = file.getPath();
    FileSystem fs = path.getFileSystem(job.getConfiguration());
    LineReader reader = new LineReader(fs.open(path));
    long pos = 0;
    int n;
    try {
      while ((n = reader.readLine(key)) > 0) {
        String[] hosts = getStoreDirHosts(fs, path);
        splits.add(new FileSplit(path, pos, n, hosts));
        pos += n;
      }
    } finally {
      reader.close();
    }
  }

  return splits;
}
项目:hadoop    文件:TestJobImpl.java   
@Test
public void testTransitionsAtFailed() throws IOException {
  Configuration conf = new Configuration();
  AsyncDispatcher dispatcher = new AsyncDispatcher();
  dispatcher.init(conf);
  dispatcher.start();

  OutputCommitter committer = mock(OutputCommitter.class);
  doThrow(new IOException("forcefail"))
    .when(committer).setupJob(any(JobContext.class));
  CommitterEventHandler commitHandler =
      createCommitterEventHandler(dispatcher, committer);
  commitHandler.init(conf);
  commitHandler.start();

  AppContext mockContext = mock(AppContext.class);
  when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false);
  JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
  JobId jobId = job.getID();
  job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
  assertJobState(job, JobStateInternal.INITED);
  job.handle(new JobStartEvent(jobId));
  assertJobState(job, JobStateInternal.FAILED);

  job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
  assertJobState(job, JobStateInternal.FAILED);
  job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED));
  assertJobState(job, JobStateInternal.FAILED);
  job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED));
  assertJobState(job, JobStateInternal.FAILED);
  job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
  assertJobState(job, JobStateInternal.FAILED);
  Assert.assertEquals(JobState.RUNNING, job.getState());
  when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true);
  Assert.assertEquals(JobState.FAILED, job.getState());

  dispatcher.stop();
  commitHandler.stop();
}
项目:hadoop    文件:TestCompressionEmulationUtils.java   
@Override
public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
  // get the total data to be generated
  long toGen =
    jobCtxt.getConfiguration().getLong(GenerateData.GRIDMIX_GEN_BYTES, -1);
  if (toGen < 0) {
    throw new IOException("Invalid/missing generation bytes: " + toGen);
  }
  // get the total number of mappers configured
  int totalMappersConfigured =
    jobCtxt.getConfiguration().getInt(MRJobConfig.NUM_MAPS, -1);
  if (totalMappersConfigured < 0) {
    throw new IOException("Invalid/missing num mappers: " 
                          + totalMappersConfigured);
  }

  final long bytesPerTracker = toGen / totalMappersConfigured;
  final ArrayList<InputSplit> splits = 
    new ArrayList<InputSplit>(totalMappersConfigured);
  for (int i = 0; i < totalMappersConfigured; ++i) {
    splits.add(new GenSplit(bytesPerTracker, 
               new String[] { "tracker_local" }));
  }
  return splits;
}
项目:hadoop    文件:TestMRCJCFileOutputCommitter.java   
public void testEmptyOutput() throws Exception {
  Job job = Job.getInstance();
  FileOutputFormat.setOutputPath(job, outDir);
  Configuration conf = job.getConfiguration();
  conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
  JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
  TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
  FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);

  // setup
  committer.setupJob(jContext);
  committer.setupTask(tContext);

  // Do not write any output

  // do commit
  committer.commitTask(tContext);
  committer.commitJob(jContext);

  FileUtil.fullyDelete(new File(outDir.toString()));
}
项目:hadoop    文件:DistSum.java   
/** Partitions the summation into parts and then return them as splits */
@Override
public List<InputSplit> getSplits(JobContext context) {
  //read sigma from conf
  final Configuration conf = context.getConfiguration();
  final Summation sigma = SummationWritable.read(DistSum.class, conf); 
  final int nParts = conf.getInt(N_PARTS, 0);

  //create splits
  final List<InputSplit> splits = new ArrayList<InputSplit>(nParts);
  final Summation[] parts = sigma.partition(nParts);
  for(int i = 0; i < parts.length; ++i) {
    splits.add(new SummationSplit(parts[i]));
    //LOG.info("parts[" + i + "] = " + parts[i]);
  }
  return splits;
}
项目:hadoop    文件:FileOutputFormat.java   
/**
 * Get the {@link CompressionCodec} for compressing the job outputs.
 * @param job the {@link Job} to look in
 * @param defaultValue the {@link CompressionCodec} to return if not set
 * @return the {@link CompressionCodec} to be used to compress the 
 *         job outputs
 * @throws IllegalArgumentException if the class was specified, but not found
 */
public static Class<? extends CompressionCodec> 
getOutputCompressorClass(JobContext job, 
                       Class<? extends CompressionCodec> defaultValue) {
  Class<? extends CompressionCodec> codecClass = defaultValue;
  Configuration conf = job.getConfiguration();
  String name = conf.get(FileOutputFormat.COMPRESS_CODEC);
  if (name != null) {
    try {
      codecClass = 
        conf.getClassByName(name).asSubclass(CompressionCodec.class);
    } catch (ClassNotFoundException e) {
      throw new IllegalArgumentException("Compression codec " + name + 
                                         " was not found.", e);
    }
  }
  return codecClass;
}
项目:hadoop    文件:MapTask.java   
@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                   JobConf job,
                   TaskUmbilicalProtocol umbilical,
                   TaskReporter reporter
                   ) throws IOException, ClassNotFoundException {
  collector = createSortingCollector(job, reporter);
  partitions = jobContext.getNumReduceTasks();
  if (partitions > 1) {
    partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
      ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
  } else {
    partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
      @Override
      public int getPartition(K key, V value, int numPartitions) {
        return partitions - 1;
      }
    };
  }
}
项目:ditb    文件:ExportSnapshot.java   
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
  Configuration conf = context.getConfiguration();
  Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
  FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);

  List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
  int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
  if (mappers == 0 && snapshotFiles.size() > 0) {
    mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
    mappers = Math.min(mappers, snapshotFiles.size());
    conf.setInt(CONF_NUM_SPLITS, mappers);
    conf.setInt(MR_NUM_MAPS, mappers);
  }

  List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
  List<InputSplit> splits = new ArrayList(groups.size());
  for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
    splits.add(new ExportSnapshotInputSplit(files));
  }
  return splits;
}
项目:hadoop    文件:FileOutputCommitter.java   
/**
 * Create a file output committer
 * @param outputPath the job's output path, or null if you want the output
 * committer to act as a noop.
 * @param context the task's context
 * @throws IOException
 */
@Private
public FileOutputCommitter(Path outputPath, 
                           JobContext context) throws IOException {
  Configuration conf = context.getConfiguration();
  algorithmVersion =
      conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
                  FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT);
  LOG.info("File Output Committer Algorithm version is " + algorithmVersion);
  if (algorithmVersion != 1 && algorithmVersion != 2) {
    throw new IOException("Only 1 or 2 algorithm version is supported");
  }
  if (outputPath != null) {
    FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
    this.outputPath = fs.makeQualified(outputPath);
  }
}
项目:hadoop    文件:NLineInputFormat.java   
/** 
 * Logically splits the set of input files for the job, splits N lines
 * of the input as one split.
 * 
 * @see FileInputFormat#getSplits(JobContext)
 */
public List<InputSplit> getSplits(JobContext job)
throws IOException {
  List<InputSplit> splits = new ArrayList<InputSplit>();
  int numLinesPerSplit = getNumLinesPerSplit(job);
  for (FileStatus status : listStatus(job)) {
    splits.addAll(getSplitsForFile(status,
      job.getConfiguration(), numLinesPerSplit));
  }
  return splits;
}
项目:hadoop    文件:FileInputFormat.java   
/**
 * Get the list of input {@link Path}s for the map-reduce job.
 * 
 * @param context The job
 * @return the list of input {@link Path}s for the map-reduce job.
 */
public static Path[] getInputPaths(JobContext context) {
  String dirs = context.getConfiguration().get(INPUT_DIR, "");
  String [] list = StringUtils.split(dirs);
  Path[] result = new Path[list.length];
  for (int i = 0; i < list.length; i++) {
    result[i] = new Path(StringUtils.unEscapeString(list[i]));
  }
  return result;
}
项目:hadoop    文件:FileInputFormat.java   
private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
    PathFilter inputFilter, boolean recursive) throws IOException {
  List<FileStatus> result = new ArrayList<FileStatus>();
  List<IOException> errors = new ArrayList<IOException>();
  for (int i=0; i < dirs.length; ++i) {
    Path p = dirs[i];
    FileSystem fs = p.getFileSystem(job.getConfiguration()); 
    FileStatus[] matches = fs.globStatus(p, inputFilter);
    if (matches == null) {
      errors.add(new IOException("Input path does not exist: " + p));
    } else if (matches.length == 0) {
      errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
    } else {
      for (FileStatus globStat: matches) {
        if (globStat.isDirectory()) {
          RemoteIterator<LocatedFileStatus> iter =
              fs.listLocatedStatus(globStat.getPath());
          while (iter.hasNext()) {
            LocatedFileStatus stat = iter.next();
            if (inputFilter.accept(stat.getPath())) {
              if (recursive && stat.isDirectory()) {
                addInputPathRecursively(result, fs, stat.getPath(),
                    inputFilter);
              } else {
                result.add(stat);
              }
            }
          }
        } else {
          result.add(globStat);
        }
      }
    }
  }

  if (!errors.isEmpty()) {
    throw new InvalidInputException(errors);
  }
  return result;
}
项目:hadoop    文件:CompositeInputFormat.java   
/**
 * Build a CompositeInputSplit from the child InputFormats by assigning the
 * ith split from each child to the ith composite split.
 */
@SuppressWarnings("unchecked")
public List<InputSplit> getSplits(JobContext job) 
    throws IOException, InterruptedException {
  setFormat(job.getConfiguration());
  job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.minsize", Long.MAX_VALUE);
  return root.getSplits(job);
}
项目:circus-train    文件:CopyCommitter.java   
/** @inheritDoc */
@Override
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
  try {
    super.abortJob(jobContext, state);
  } finally {
    cleanup(jobContext.getConfiguration());
  }
}
项目:circus-train    文件:UniformSizeInputFormat.java   
/**
 * Implementation of InputFormat::getSplits(). Returns a list of InputSplits, such that the number of bytes to be
 * copied for all the splits are approximately equal.
 *
 * @param context JobContext for the job.
 * @return The list of uniformly-distributed input-splits.
 * @throws IOException: On failure.
 * @throws InterruptedException
 */
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
  Configuration configuration = context.getConfiguration();
  int numSplits = ConfigurationUtil.getInt(configuration, MRJobConfig.NUM_MAPS);

  if (numSplits == 0) {
    return new ArrayList<>();
  }

  return getSplits(configuration, numSplits,
      ConfigurationUtil.getLong(configuration, S3MapReduceCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED));
}
项目:circus-train    文件:CopyOutputFormat.java   
/** @inheritDoc */
@Override
public void checkOutputSpecs(JobContext context) throws IOException {
  Configuration conf = context.getConfiguration();

  Path workingPath = getCommitDirectory(conf);
  if (getCommitDirectory(conf) == null) {
    throw new IllegalStateException("Commit directory not configured");
  }

  // get delegation token for outDir's file system
  TokenCache.obtainTokensForNamenodes(context.getCredentials(), new Path[] { workingPath }, conf);
}
项目:circus-train    文件:DynamicInputFormatTest.java   
@Test
public void getSplits() throws Exception {
  S3MapReduceCpOptions options = getOptions();
  Configuration configuration = new Configuration();
  configuration.set("mapred.map.tasks", String.valueOf(options.getMaxMaps()));
  CopyListing.getCopyListing(configuration, CREDENTIALS, options).buildListing(
      new Path(cluster.getFileSystem().getUri().toString() + "/tmp/testDynInputFormat/fileList.seq"), options);

  JobContext jobContext = new JobContextImpl(configuration, new JobID());
  DynamicInputFormat<Text, CopyListingFileStatus> inputFormat = new DynamicInputFormat<>();
  List<InputSplit> splits = inputFormat.getSplits(jobContext);

  int nFiles = 0;
  int taskId = 0;

  for (InputSplit split : splits) {
    RecordReader<Text, CopyListingFileStatus> recordReader = inputFormat.createRecordReader(split, null);
    StubContext stubContext = new StubContext(jobContext.getConfiguration(), recordReader, taskId);
    final TaskAttemptContext taskAttemptContext = stubContext.getContext();

    recordReader.initialize(splits.get(0), taskAttemptContext);
    float previousProgressValue = 0f;
    while (recordReader.nextKeyValue()) {
      CopyListingFileStatus fileStatus = recordReader.getCurrentValue();
      String source = fileStatus.getPath().toString();
      assertTrue(expectedFilePaths.contains(source));
      final float progress = recordReader.getProgress();
      assertTrue(progress >= previousProgressValue);
      assertTrue(progress >= 0.0f);
      assertTrue(progress <= 1.0f);
      previousProgressValue = progress;
      ++nFiles;
    }
    assertTrue(recordReader.getProgress() == 1.0f);

    ++taskId;
  }

  Assert.assertEquals(expectedFilePaths.size(), nFiles);
}
项目:ditb    文件:MultiTableSnapshotInputFormat.java   
@Override
public List<InputSplit> getSplits(JobContext jobContext)
    throws IOException, InterruptedException {
  List<TableSnapshotInputFormatImpl.InputSplit> splits =
      delegate.getSplits(jobContext.getConfiguration());
  List<InputSplit> rtn = Lists.newArrayListWithCapacity(splits.size());

  for (TableSnapshotInputFormatImpl.InputSplit split : splits) {
    rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split));
  }

  return rtn;
}
项目:aliyun-maxcompute-data-collectors    文件:NetezzaExternalTableInputFormat.java   
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException,
    InterruptedException {
  int targetNumTasks = ConfigurationHelper.getJobNumMaps(context);
  List<InputSplit> splits = new ArrayList<InputSplit>(targetNumTasks);
  for (int i = 0; i < targetNumTasks; ++i) {
    splits.add(new NetezzaExternalTableInputSplit(i));
  }
  return splits;
}
项目:hadoop    文件:TeraGen.java   
/**
 * Create the desired number of splits, dividing the number of rows
 * between the mappers.
 */
public List<InputSplit> getSplits(JobContext job) {
  long totalRows = getNumberOfRows(job);
  int numSplits = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
  LOG.info("Generating " + totalRows + " using " + numSplits);
  List<InputSplit> splits = new ArrayList<InputSplit>();
  long currentRow = 0;
  for(int split = 0; split < numSplits; ++split) {
    long goal = 
      (long) Math.ceil(totalRows * (double)(split + 1) / numSplits);
    splits.add(new RangeInputSplit(currentRow, goal - currentRow));
    currentRow = goal;
  }
  return splits;
}
项目:hadoop    文件:TestDistCacheEmulation.java   
/**
 * Reset the config properties related to Distributed Cache in the given job
 * configuration <code>jobConf</code>.
 * 
 * @param jobConf
 *          job configuration
 */
private void resetDistCacheConfigProperties(Configuration jobConf) {
  // reset current/latest property names
  jobConf.setStrings(MRJobConfig.CACHE_FILES, "");
  jobConf.setStrings(MRJobConfig.CACHE_FILES_SIZES, "");
  jobConf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, "");
  jobConf.setStrings(JobContext.CACHE_FILE_VISIBILITIES, "");
  // reset old property names
  jobConf.setStrings("mapred.cache.files", "");
  jobConf.setStrings("mapred.cache.files.filesizes", "");
  jobConf.setStrings("mapred.cache.files.visibilities", "");
  jobConf.setStrings("mapred.cache.files.timestamps", "");
}
项目:aliyun-maxcompute-data-collectors    文件:ConfigurationHelper.java   
/**
 * Get the (hinted) number of map tasks for a job.
 */
public static int getJobNumMaps(JobContext job) {
  if (isLocalJobTracker(job.getConfiguration())) {
    return numLocalModeMaps;
  } else {
    return job.getConfiguration().getInt(
      ConfigurationConstants.PROP_MAPRED_MAP_TASKS, 1);
  }
}
项目:hadoop    文件:TeraOutputFormat.java   
@Override
public void checkOutputSpecs(JobContext job
                            ) throws InvalidJobConfException, IOException {
  // Ensure that the output directory is set
  Path outDir = getOutputPath(job);
  if (outDir == null) {
    throw new InvalidJobConfException("Output directory not set in JobConf.");
  }

  final Configuration jobConf = job.getConfiguration();

  // get delegation token for outDir's file system
  TokenCache.obtainTokensForNamenodes(job.getCredentials(),
      new Path[] { outDir }, jobConf);

  final FileSystem fs = outDir.getFileSystem(jobConf);

  if (fs.exists(outDir)) {
    // existing output dir is considered empty iff its only content is the
    // partition file.
    //
    final FileStatus[] outDirKids = fs.listStatus(outDir);
    boolean empty = false;
    if (outDirKids != null && outDirKids.length == 1) {
      final FileStatus st = outDirKids[0];
      final String fname = st.getPath().getName();
      empty =
        !st.isDirectory() && TeraInputFormat.PARTITION_FILENAME.equals(fname);
    }
    if (TeraSort.getUseSimplePartitioner(job) || !empty) {
      throw new FileAlreadyExistsException("Output directory " + outDir
          + " already exists");
    }
  }
}
项目:ditb    文件:NMapInputFormat.java   
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException,
    InterruptedException {
  int count = getNumMapTasks(context.getConfiguration());
  List<InputSplit> splits = new ArrayList<InputSplit>(count);
  for (int i = 0; i < count; i++) {
    splits.add(new NullInputSplit());
  }
  return splits;
}
项目:hadoop    文件:CombineFileInputFormat.java   
@Override
protected boolean isSplitable(JobContext context, Path file) {
  final CompressionCodec codec =
    new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
  if (null == codec) {
    return true;
  }
  return codec instanceof SplittableCompressionCodec;
}
项目:hadoop    文件:TestJobImpl.java   
@Test
public void testAbortJobCalledAfterKillingTasks() throws IOException {
  Configuration conf = new Configuration();
  conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
  conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000");
  InlineDispatcher dispatcher = new InlineDispatcher();
  dispatcher.init(conf);
  dispatcher.start();
  OutputCommitter committer = Mockito.mock(OutputCommitter.class);
  CommitterEventHandler commitHandler =
      createCommitterEventHandler(dispatcher, committer);
  commitHandler.init(conf);
  commitHandler.start();
  JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);

  //Fail one task. This should land the JobImpl in the FAIL_WAIT state
  job.handle(new JobTaskEvent(
    MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
    TaskState.FAILED));
  //Verify abort job hasn't been called
  Mockito.verify(committer, Mockito.never())
    .abortJob((JobContext) Mockito.any(), (State) Mockito.any());
  assertJobState(job, JobStateInternal.FAIL_WAIT);

  //Verify abortJob is called once and the job failed
  Mockito.verify(committer, Mockito.timeout(2000).times(1))
    .abortJob((JobContext) Mockito.any(), (State) Mockito.any());
  assertJobState(job, JobStateInternal.FAILED);

  dispatcher.stop();
}
项目:ditb    文件:TableInputFormat.java   
@Override
protected void initialize(JobContext context) throws IOException {
  // Do we have to worry about mis-matches between the Configuration from setConf and the one
  // in this context?
  TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
  try {
    initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName);
  } catch (Exception e) {
    LOG.error(StringUtils.stringifyException(e));
  }
}