Java 类org.apache.hadoop.mapreduce.MRJobConfig 实例源码

项目:hadoop    文件:TestSpeculativeExecution.java   
public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context) throws IOException, InterruptedException {
  // Make one reducer slower for speculative execution
  TaskAttemptID taid = context.getTaskAttemptID();
  long sleepTime = 100;
  Configuration conf = context.getConfiguration();
  boolean test_speculate_reduce =
            conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);

  // IF TESTING REDUCE SPECULATIVE EXECUTION:
  //   Make the "*_r_000000_0" attempt take much longer than the others.
  //   When speculative execution is enabled, this should cause the attempt
  //   to be killed and restarted. At that point, the attempt ID will be
  //   "*_r_000000_1", so sleepTime will still remain 100ms.
  if ( (taid.getTaskType() == TaskType.REDUCE) && test_speculate_reduce
        && (taid.getTaskID().getId() == 0) && (taid.getId() == 0)) {
    sleepTime = 10000;
  }
  try{
    Thread.sleep(sleepTime);
  } catch(InterruptedException ie) {
    // Ignore
  }
  context.write(key,new IntWritable(0));
}
项目:hadoop    文件:PipeMapper.java   
public void configure(JobConf job) {
  super.configure(job);
  //disable the auto increment of the counter. For streaming, no of 
  //processed records could be different(equal or less) than the no of 
  //records input.
  SkipBadRecords.setAutoIncrMapperProcCount(job, false);
  skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
  if (mapInputWriterClass_.getCanonicalName().equals(TextInputWriter.class.getCanonicalName())) {
    String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
    ignoreKey = job.getBoolean("stream.map.input.ignoreKey", 
      inputFormatClassName.equals(TextInputFormat.class.getCanonicalName()));
  }

  try {
    mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8");
    mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8");
    numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1);
  } catch (UnsupportedEncodingException e) {
    throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
  }
}
项目:hadoop    文件:TestMRApps.java   
@Test (timeout = 120000)
public void testSetClasspathWithNoUserPrecendence() {
  Configuration conf = new Configuration();
  conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
  conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
  Map<String, String> env = new HashMap<String, String>();
  try {
    MRApps.setClasspath(env, conf);
  } catch (Exception e) {
    fail("Got exception while setting classpath");
  }
  String env_str = env.get("CLASSPATH");
  String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
    Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
      ApplicationConstants.Environment.PWD.$$() + "/*"));
  assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not in"
    + " the classpath!", env_str.contains(expectedClasspath));
  assertFalse("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
    env_str.startsWith(expectedClasspath));
}
项目:hadoop    文件:TestTokenCache.java   
@SuppressWarnings("deprecation")
@Test
public void testGetTokensForNamenodes() throws IOException,
    URISyntaxException {
  Path TEST_ROOT_DIR =
      new Path(System.getProperty("test.build.data", "test/build/data"));
  // ick, but need fq path minus file:/
  String binaryTokenFile =
      FileSystem.getLocal(conf)
        .makeQualified(new Path(TEST_ROOT_DIR, "tokenFile")).toUri()
        .getPath();

  MockFileSystem fs1 = createFileSystemForServiceName("service1");
  Credentials creds = new Credentials();
  Token<?> token1 = fs1.getDelegationToken(renewer);
  creds.addToken(token1.getService(), token1);
  // wait to set, else the obtain tokens call above will fail with FNF
  conf.set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, binaryTokenFile);
  creds.writeTokenStorageFile(new Path(binaryTokenFile), conf);
  TokenCache.obtainTokensForNamenodesInternal(fs1, creds, conf);
  String fs_addr = fs1.getCanonicalServiceName();
  Token<?> nnt = TokenCache.getDelegationToken(creds, fs_addr);
  assertNotNull("Token for nn is null", nnt);
}
项目:hadoop    文件:DefaultSpeculator.java   
public DefaultSpeculator
    (Configuration conf, AppContext context,
     TaskRuntimeEstimator estimator, Clock clock) {
  super(DefaultSpeculator.class.getName());

  this.conf = conf;
  this.context = context;
  this.estimator = estimator;
  this.clock = clock;
  this.eventHandler = context.getEventHandler();
  this.soonestRetryAfterNoSpeculate =
      conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE,
              MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_NO_SPECULATE);
  this.soonestRetryAfterSpeculate =
      conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE,
              MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_SPECULATE);
  this.proportionRunningTasksSpeculatable =
      conf.getDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS,
              MRJobConfig.DEFAULT_SPECULATIVECAP_RUNNING_TASKS);
  this.proportionTotalTasksSpeculatable =
      conf.getDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS,
              MRJobConfig.DEFAULT_SPECULATIVECAP_TOTAL_TASKS);
  this.minimumAllowedSpeculativeTasks =
      conf.getInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS,
              MRJobConfig.DEFAULT_SPECULATIVE_MINIMUM_ALLOWED_TASKS);
}
项目:hadoop    文件:TestMRApps.java   
@SuppressWarnings("deprecation")
public void testSetupDistributedCacheConflictsFiles() throws Exception {
  Configuration conf = new Configuration();
  conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);

  URI mockUri = URI.create("mockfs://mock/");
  FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
      .getRawFileSystem();

  URI file = new URI("mockfs://mock/tmp/something.zip#something");
  Path filePath = new Path(file);
  URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
  Path file2Path = new Path(file2);

  when(mockFs.resolvePath(filePath)).thenReturn(filePath);
  when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);

  DistributedCache.addCacheFile(file, conf);
  DistributedCache.addCacheFile(file2, conf);
  conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
  conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
  conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
  Map<String, LocalResource> localResources = 
    new HashMap<String, LocalResource>();
  MRApps.setupDistributedCache(conf, localResources);

  assertEquals(1, localResources.size());
  LocalResource lr = localResources.get("something");
  //First one wins
  assertNotNull(lr);
  assertEquals(10l, lr.getSize());
  assertEquals(10l, lr.getTimestamp());
  assertEquals(LocalResourceType.FILE, lr.getType());
}
项目:hadoop    文件:KeyFieldBasedPartitioner.java   
public void setConf(Configuration conf) {
  this.conf = conf;
  keyFieldHelper = new KeyFieldHelper();
  String keyFieldSeparator = 
    conf.get(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, "\t");
  keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
  if (conf.get("num.key.fields.for.partition") != null) {
    LOG.warn("Using deprecated num.key.fields.for.partition. " +
            "Use mapreduce.partition.keypartitioner.options instead");
    this.numOfPartitionFields = conf.getInt("num.key.fields.for.partition",0);
    keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
  } else {
    String option = conf.get(PARTITIONER_OPTIONS);
    keyFieldHelper.parseOption(option);
  }
}
项目:hadoop    文件:TestTotalOrderPartitioner.java   
public void testTotalOrderBinarySearch() throws Exception {
  TotalOrderPartitioner<Text,NullWritable> partitioner =
    new TotalOrderPartitioner<Text,NullWritable>();
  Configuration conf = new Configuration();
  Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
      "totalorderbinarysearch", conf, splitStrings);
  conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
  conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, Text.class, Object.class);
  try {
    partitioner.setConf(conf);
    NullWritable nw = NullWritable.get();
    for (Check<Text> chk : testStrings) {
      assertEquals(chk.data.toString(), chk.part,
          partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
    }
  } finally {
    p.getFileSystem(conf).delete(p, true);
  }
}
项目:hadoop    文件:TestTotalOrderPartitioner.java   
public void testTotalOrderMemCmp() throws Exception {
  TotalOrderPartitioner<Text,NullWritable> partitioner =
    new TotalOrderPartitioner<Text,NullWritable>();
  Configuration conf = new Configuration();
  Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
      "totalordermemcmp", conf, splitStrings);
  conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, Text.class, Object.class);
  try {
    partitioner.setConf(conf);
    NullWritable nw = NullWritable.get();
    for (Check<Text> chk : testStrings) {
      assertEquals(chk.data.toString(), chk.part,
          partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
    }
  } finally {
    p.getFileSystem(conf).delete(p, true);
  }
}
项目:hadoop    文件:GridmixJob.java   
/**
 * Sets the high ram job properties in the simulated job's configuration.
 */
@SuppressWarnings("deprecation")
static void configureHighRamProperties(Configuration sourceConf, 
                                       Configuration destConf) {
  // set the memory per map task
  scaleConfigParameter(sourceConf, destConf, 
                       MRConfig.MAPMEMORY_MB, MRJobConfig.MAP_MEMORY_MB, 
                       MRJobConfig.DEFAULT_MAP_MEMORY_MB);

  // validate and fail early
  validateTaskMemoryLimits(destConf, MRJobConfig.MAP_MEMORY_MB, 
                           JTConfig.JT_MAX_MAPMEMORY_MB);

  // set the memory per reduce task
  scaleConfigParameter(sourceConf, destConf, 
                       MRConfig.REDUCEMEMORY_MB, MRJobConfig.REDUCE_MEMORY_MB,
                       MRJobConfig.DEFAULT_REDUCE_MEMORY_MB);
  // validate and fail early
  validateTaskMemoryLimits(destConf, MRJobConfig.REDUCE_MEMORY_MB, 
                           JTConfig.JT_MAX_REDUCEMEMORY_MB);
}
项目:hadoop    文件:TestBinaryTokenFile.java   
/**
 * run a distributed job and verify that TokenCache is available
 * @throws IOException
 */
@Test
public void testBinaryTokenFile() throws IOException {
  Configuration conf = mrCluster.getConfig();

  // provide namenodes names for the job to get the delegation tokens for
  final String nnUri = dfsCluster.getURI(0).toString();
  conf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri);

  // using argument to pass the file name
  final String[] args = { 
      "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
      };
  int res = -1;
  try {
    res = ToolRunner.run(conf, new MySleepJob(), args);
  } catch (Exception e) {
    System.out.println("Job failed with " + e.getLocalizedMessage());
    e.printStackTrace(System.out);
    fail("Job failed");
  }
  assertEquals("dist job res is not 0:", 0, res);
}
项目:hadoop    文件:DBOutputFormat.java   
/** {@inheritDoc} */
public RecordWriter<K, V> getRecordWriter(FileSystem filesystem,
    JobConf job, String name, Progressable progress) throws IOException {
  org.apache.hadoop.mapreduce.RecordWriter<K, V> w = super.getRecordWriter(
    new TaskAttemptContextImpl(job, 
          TaskAttemptID.forName(job.get(MRJobConfig.TASK_ATTEMPT_ID))));
  org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter writer = 
   (org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter) w;
  try {
    return new DBRecordWriter(writer.getConnection(), writer.getStatement());
  } catch(SQLException se) {
    throw new IOException(se);
  }
}
项目:hadoop    文件:MRApps.java   
/**
 * Creates a {@link ApplicationClassLoader} if
 * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
 * the APP_CLASSPATH environment variable is set.
 * @param conf
 * @return the created job classloader, or null if the job classloader is not
 * enabled or the APP_CLASSPATH environment variable is not set
 * @throws IOException
 */
public static ClassLoader createJobClassLoader(Configuration conf)
    throws IOException {
  ClassLoader jobClassLoader = null;
  if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) {
    String appClasspath = System.getenv(Environment.APP_CLASSPATH.key());
    if (appClasspath == null) {
      LOG.warn("Not creating job classloader since APP_CLASSPATH is not set.");
    } else {
      LOG.info("Creating job classloader");
      if (LOG.isDebugEnabled()) {
        LOG.debug("APP_CLASSPATH=" + appClasspath);
      }
      String[] systemClasses = getSystemClasses(conf);
      jobClassLoader = createJobClassLoader(appClasspath,
          systemClasses);
    }
  }
  return jobClassLoader;
}
项目:hadoop    文件:TestStagingCleanup.java   
@Test (timeout = 30000)
public void testDeletionofStagingOnKill() throws IOException {
  conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
  fs = mock(FileSystem.class);
  when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
  //Staging Dir exists
  String user = UserGroupInformation.getCurrentUser().getShortUserName();
  Path stagingDir = MRApps.getStagingAreaDir(conf, user);
  when(fs.exists(stagingDir)).thenReturn(true);
  ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
      0);
  ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 0);
  JobId jobid = recordFactory.newRecordInstance(JobId.class);
  jobid.setAppId(appId);
  ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
  MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
  appMaster.init(conf);
  //simulate the process being killed
  MRAppMaster.MRAppMasterShutdownHook hook = 
    new MRAppMaster.MRAppMasterShutdownHook(appMaster);
  hook.run();
  verify(fs, times(0)).delete(stagingJobPath, true);
}
项目:hadoop    文件:TestStreamingOutputKeyValueTypes.java   
@Override
protected String[] genArgs() {
  // set the testcase-specific config properties first and the remaining
  // arguments are set in TestStreaming.genArgs().
  args.add("-jobconf");
  args.add(MRJobConfig.MAP_OUTPUT_KEY_CLASS +
      "=org.apache.hadoop.io.LongWritable");
  args.add("-jobconf");
  args.add(MRJobConfig.OUTPUT_KEY_CLASS +
      "=org.apache.hadoop.io.LongWritable");

  // Using SequenceFileOutputFormat here because with TextOutputFormat, the
  // mapred.output.key.class set in JobConf (which we want to test here) is
  // not read/used at all.
  args.add("-outputformat");
  args.add("org.apache.hadoop.mapred.SequenceFileOutputFormat");

  return super.genArgs();
}
项目:hadoop    文件:SplitMetaInfoReader.java   
public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo(
    JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir) 
throws IOException {
  long maxMetaInfoSize = conf.getLong(MRJobConfig.SPLIT_METAINFO_MAXSIZE,
      MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
  Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir);
  String jobSplitFile = JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString();
  FileStatus fStatus = fs.getFileStatus(metaSplitFile);
  if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
    throw new IOException("Split metadata size exceeded " +
        maxMetaInfoSize +". Aborting job " + jobId);
  }
  FSDataInputStream in = fs.open(metaSplitFile);
  byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
  in.readFully(header);
  if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
    throw new IOException("Invalid header on split file");
  }
  int vers = WritableUtils.readVInt(in);
  if (vers != JobSplit.META_SPLIT_VERSION) {
    in.close();
    throw new IOException("Unsupported split version " + vers);
  }
  int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values
  JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = 
    new JobSplit.TaskSplitMetaInfo[numSplits];
  for (int i = 0; i < numSplits; i++) {
    JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
    splitMetaInfo.readFields(in);
    JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
        jobSplitFile, 
        splitMetaInfo.getStartOffset());
    allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex, 
        splitMetaInfo.getLocations(), 
        splitMetaInfo.getInputDataLength());
  }
  in.close();
  return allSplitMetaInfo;
}
项目:lustre-connector-for-hadoop    文件:LustreFsShuffle.java   
@Override
public void init(ShuffleConsumerPlugin.Context context) {

    this.reduceId = context.getReduceId();
    this.jobConf = context.getJobConf();
    this.umbilical = context.getUmbilical();
    this.reporter = context.getReporter();
    this.copyPhase = context.getCopyPhase();
    this.mergePhase = context.getMergePhase();
    this.taskStatus = context.getStatus();
    this.reduceTask = context.getReduceTask();
    this.codec = context.getCodec();
    this.spilledRecordsCounter = context.getSpilledRecordsCounter();
    this.mergedMapOutputsCounter = context.getMergedMapOutputsCounter();

    jobConf.setBoolean(MRConfig.MAPRED_IFILE_READAHEAD, false);
    try {
        lustrefs = (LustreFileSystem)FileSystem.get(LustreFileSystem.NAME, jobConf);
        mapOutputDir = SharedFsPlugins.getTempPath(jobConf,
                                                   JobID.downgrade(reduceId.getJobID()));
        reduceDir = new Path(mapOutputDir,
                             String.format(SharedFsPlugins.MAP_OUTPUT,
                                           reduceId.getTaskID().getId(), 0, 0)).getParent();
        mergeTempDir = new Path(mapOutputDir, "temp");
    } catch (IOException ioe) {
        throw new RuntimeException("Map Output directory not found !!", ioe);
    }

    // Scheduler
    scheduler = new ShuffleSchedulerImpl<K, V>(
                                               jobConf, taskStatus, reduceId, this, copyPhase,
                                               context.getShuffledMapsCounter(),
                                               context.getReduceShuffleBytes(),
                                               context.getFailedShuffleCounter());

    this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);

    this.merger = new FileMerger();
    this.merger.start();
}
项目:circus-train    文件:UniformSizeInputFormat.java   
/**
 * Implementation of InputFormat::getSplits(). Returns a list of InputSplits, such that the number of bytes to be
 * copied for all the splits are approximately equal.
 *
 * @param context JobContext for the job.
 * @return The list of uniformly-distributed input-splits.
 * @throws IOException: On failure.
 * @throws InterruptedException
 */
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
  Configuration configuration = context.getConfiguration();
  int numSplits = ConfigurationUtil.getInt(configuration, MRJobConfig.NUM_MAPS);

  if (numSplits == 0) {
    return new ArrayList<>();
  }

  return getSplits(configuration, numSplits,
      ConfigurationUtil.getLong(configuration, S3MapReduceCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED));
}
项目:hadoop    文件:MapTaskAttemptImpl.java   
@Override
public Task createRemoteTask() {
  //job file name is set in TaskAttempt, setting it null here
  MapTask mapTask =
    new MapTask("", TypeConverter.fromYarn(getID()), partition,
        splitInfo.getSplitIndex(), 1); // YARN doesn't have the concept of slots per task, set it as 1.
  mapTask.setUser(conf.get(MRJobConfig.USER_NAME));
  mapTask.setConf(conf);
  return mapTask;
}
项目:hadoop    文件:TestYARNRunner.java   
@Test
public void testAMStandardEnv() throws Exception {
  final String ADMIN_LIB_PATH = "foo";
  final String USER_LIB_PATH = "bar";
  final String USER_SHELL = "shell";
  JobConf jobConf = new JobConf();

  jobConf.set(MRJobConfig.MR_AM_ADMIN_USER_ENV, "LD_LIBRARY_PATH=" +
      ADMIN_LIB_PATH);
  jobConf.set(MRJobConfig.MR_AM_ENV, "LD_LIBRARY_PATH="
      + USER_LIB_PATH);
  jobConf.set(MRJobConfig.MAPRED_ADMIN_USER_SHELL, USER_SHELL);

  YARNRunner yarnRunner = new YARNRunner(jobConf);
  ApplicationSubmissionContext appSubCtx =
      buildSubmitContext(yarnRunner, jobConf);

  // make sure PWD is first in the lib path
  ContainerLaunchContext clc = appSubCtx.getAMContainerSpec();
  Map<String, String> env = clc.getEnvironment();
  String libPath = env.get(Environment.LD_LIBRARY_PATH.name());
  assertNotNull("LD_LIBRARY_PATH not set", libPath);
  String cps = jobConf.getBoolean(
      MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
      MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM)
      ? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator;
  assertEquals("Bad AM LD_LIBRARY_PATH setting",
      MRApps.crossPlatformifyMREnv(conf, Environment.PWD)
      + cps + ADMIN_LIB_PATH + cps + USER_LIB_PATH, libPath);

  // make sure SHELL is set
  String shell = env.get(Environment.SHELL.name());
  assertNotNull("SHELL not set", shell);
  assertEquals("Bad SHELL setting", USER_SHELL, shell);
}
项目:hadoop    文件:DistributedCache.java   
/**
 * Add a file path to the current set of classpath entries. It adds the file
 * to cache as well.  Intended to be used by user code.
 *
 * @param file Path of the file to be added
 * @param conf Configuration that contains the classpath setting
 * @param fs FileSystem with respect to which {@code archivefile} should
 *              be interpreted.
 */
public static void addFileToClassPath
         (Path file, Configuration conf, FileSystem fs)
      throws IOException {
  String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
  conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString()
           : classpath + "," + file.toString());
  URI uri = fs.makeQualified(file).toUri();
  addCacheFile(uri, conf);
}
项目:hadoop    文件:KeyFieldBasedComparator.java   
public void setConf(Configuration conf) {
  this.conf = conf;
  String option = conf.get(COMPARATOR_OPTIONS);
  String keyFieldSeparator = conf.get(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR,"\t");
  keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
  keyFieldHelper.parseOption(option);
}
项目:hadoop    文件:ExponentiallySmoothedTaskRuntimeEstimator.java   
@Override
public void contextualize(Configuration conf, AppContext context) {
  super.contextualize(conf, context);

  lambda
      = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS,
          MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS);
  smoothedValue
      = conf.getBoolean(MRJobConfig.MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE, true)
          ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS;
}
项目:hadoop    文件:StartEndTimesBase.java   
@Override
public void contextualize(Configuration conf, AppContext context) {
  this.context = context;

  Map<JobId, Job> allJobs = context.getAllJobs();

  for (Map.Entry<JobId, Job> entry : allJobs.entrySet()) {
    final Job job = entry.getValue();
    mapperStatistics.put(job, new DataStatistics());
    reducerStatistics.put(job, new DataStatistics());
    slowTaskRelativeTresholds.put
        (job, conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
  }
}
项目:hadoop    文件:TestJobConf.java   
@Test
public void testProfileParamsSetter() {
  JobConf configuration = new JobConf();

  configuration.setProfileParams("test");
  Assert.assertEquals("test", configuration.get(MRJobConfig.TASK_PROFILE_PARAMS));
}
项目:hadoop    文件:TestStreamingKeyValue.java   
protected String[] genArgs(boolean ignoreKey) {
  return new String[] {
    "-input", INPUT_FILE.getAbsolutePath(),
    "-output", OUTPUT_DIR.getAbsolutePath(),
    "-mapper", TestStreaming.CAT,
    "-jobconf", MRJobConfig.PRESERVE_FAILED_TASK_FILES + "=true", 
    "-jobconf", "stream.non.zero.exit.is.failure=true",
    "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
    "-jobconf", "stream.map.input.ignoreKey="+ignoreKey,      
  };
}
项目:hadoop    文件:TaskAttemptImpl.java   
private int getMemoryRequired(Configuration conf, TaskType taskType) {
  int memory = 1024;
  if (taskType == TaskType.MAP)  {
    memory =
        conf.getInt(MRJobConfig.MAP_MEMORY_MB,
            MRJobConfig.DEFAULT_MAP_MEMORY_MB);
  } else if (taskType == TaskType.REDUCE) {
    memory =
        conf.getInt(MRJobConfig.REDUCE_MEMORY_MB,
            MRJobConfig.DEFAULT_REDUCE_MEMORY_MB);
  }

  return memory;
}
项目:hadoop    文件:TaskAttemptImpl.java   
private int getGpuRequired(Configuration conf, TaskType taskType) {
  int gcores = 0;
  if (taskType == TaskType.MAP)  {
    gcores =
        conf.getInt(MRJobConfig.MAP_GPU_CORES,
            MRJobConfig.DEFAULT_MAP_GPU_CORES);
  } else if (taskType == TaskType.REDUCE) {
    gcores =
        conf.getInt(MRJobConfig.REDUCE_GPU_CORES,
            MRJobConfig.DEFAULT_REDUCE_GPU_CORES);
  }

  return gcores;
}
项目:hadoop    文件:PipesReducer.java   
public void configure(JobConf job) {
  this.job = job;
  //disable the auto increment of the counter. For pipes, no of processed 
  //records could be different(equal or less) than the no of records input.
  SkipBadRecords.setAutoIncrReducerProcCount(job, false);
  skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
}
项目:hadoop    文件:TestYARNRunner.java   
@Test
public void testNodeLabelExp() throws Exception {
  JobConf jobConf = new JobConf();

  jobConf.set(MRJobConfig.JOB_NODE_LABEL_EXP, "GPU");
  jobConf.set(MRJobConfig.AM_NODE_LABEL_EXP, "highMem");

  YARNRunner yarnRunner = new YARNRunner(jobConf);
  ApplicationSubmissionContext appSubCtx =
      buildSubmitContext(yarnRunner, jobConf);

  assertEquals(appSubCtx.getNodeLabelExpression(), "GPU");
  assertEquals(appSubCtx.getAMContainerResourceRequest()
      .getNodeLabelExpression(), "highMem");
}
项目:hadoop    文件:JobHistory.java   
@Override
public CharSequence getUser() {
  if (userName != null) {
    userName = conf.get(MRJobConfig.USER_NAME, "history-user");
  }
  return userName;
}
项目:hadoop    文件:TestYARNRunner.java   
@Test(timeout=20000)
public void testWarnCommandOpts() throws Exception {
  Logger logger = Logger.getLogger(YARNRunner.class);

  ByteArrayOutputStream bout = new ByteArrayOutputStream();
  Layout layout = new SimpleLayout();
  Appender appender = new WriterAppender(layout, bout);
  logger.addAppender(appender);

  JobConf jobConf = new JobConf();

  jobConf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, "-Djava.net.preferIPv4Stack=true -Djava.library.path=foo");
  jobConf.set(MRJobConfig.MR_AM_COMMAND_OPTS, "-Xmx1024m -Djava.library.path=bar");

  YARNRunner yarnRunner = new YARNRunner(jobConf);

  @SuppressWarnings("unused")
  ApplicationSubmissionContext submissionContext =
      buildSubmitContext(yarnRunner, jobConf);

  String logMsg = bout.toString();
  assertTrue(logMsg.contains("WARN - Usage of -Djava.library.path in " + 
        "yarn.app.mapreduce.am.admin-command-opts can cause programs to no " +
      "longer function if hadoop native libraries are used. These values " + 
        "should be set as part of the LD_LIBRARY_PATH in the app master JVM " +
      "env using yarn.app.mapreduce.am.admin.user.env config settings."));
  assertTrue(logMsg.contains("WARN - Usage of -Djava.library.path in " + 
      "yarn.app.mapreduce.am.command-opts can cause programs to no longer " +
      "function if hadoop native libraries are used. These values should " +
      "be set as part of the LD_LIBRARY_PATH in the app master JVM env " +
      "using yarn.app.mapreduce.am.env config settings."));
}
项目:hadoop    文件:TestAMWebServicesJobConf.java   
@Override
protected void configureServlets() {

  Path confPath = new Path(testConfDir.toString(),
      MRJobConfig.JOB_CONF_FILE);
  Configuration config = new Configuration();

  FileSystem localFs;
  try {
    localFs = FileSystem.getLocal(config);
    confPath = localFs.makeQualified(confPath);

    OutputStream out = localFs.create(confPath);
    try {
      conf.writeXml(out);
    } finally {
      out.close();
    }
    if (!localFs.exists(confPath)) {
      fail("error creating config file: " + confPath);
    }

  } catch (IOException e) {
    fail("error creating config file: " + e.getMessage());
  }

  appContext = new MockAppContext(0, 2, 1, confPath);

  bind(JAXBContextResolver.class);
  bind(AMWebServices.class);
  bind(GenericExceptionHandler.class);
  bind(AppContext.class).toInstance(appContext);
  bind(Configuration.class).toInstance(conf);

  serve("/*").with(GuiceContainer.class);
}
项目:hadoop    文件:MROutputFiles.java   
/**
 * Create a local reduce input file name.
 *
 * @param mapId a map task id
 * @param size the size of the file
 * @return path
 * @throws IOException
 */
@Override
public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId,
                                 long size)
    throws IOException {
  return lDirAlloc.getLocalPathForWrite(String.format(
      REDUCE_INPUT_FILE_FORMAT_STRING, MRJobConfig.OUTPUT, mapId.getId()),
      size, getConf());
}
项目:hadoop    文件:TeraGen.java   
/**
 * Create the desired number of splits, dividing the number of rows
 * between the mappers.
 */
public List<InputSplit> getSplits(JobContext job) {
  long totalRows = getNumberOfRows(job);
  int numSplits = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
  LOG.info("Generating " + totalRows + " using " + numSplits);
  List<InputSplit> splits = new ArrayList<InputSplit>();
  long currentRow = 0;
  for(int split = 0; split < numSplits; ++split) {
    long goal = 
      (long) Math.ceil(totalRows * (double)(split + 1) / numSplits);
    splits.add(new RangeInputSplit(currentRow, goal - currentRow));
    currentRow = goal;
  }
  return splits;
}
项目:hadoop    文件:MROutputFiles.java   
/**
 * Return a local reduce input file created earlier
 *
 * @param mapId a map task id
 * @return path
 * @throws IOException
 */
@Override
public Path getInputFile(int mapId)
    throws IOException {
  return lDirAlloc.getLocalPathToRead(String.format(
      REDUCE_INPUT_FILE_FORMAT_STRING, MRJobConfig.OUTPUT, Integer
          .valueOf(mapId)), getConf());
}
项目:hadoop    文件:TestJobImpl.java   
@Test(timeout=20000)
public void testCheckJobCompleteSuccess() throws Exception {
  Configuration conf = new Configuration();
  conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
  AsyncDispatcher dispatcher = new AsyncDispatcher();
  dispatcher.init(conf);
  dispatcher.start();
  CyclicBarrier syncBarrier = new CyclicBarrier(2);
  OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true);
  CommitterEventHandler commitHandler =
      createCommitterEventHandler(dispatcher, committer);
  commitHandler.init(conf);
  commitHandler.start();

  JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
  completeJobTasks(job);
  assertJobState(job, JobStateInternal.COMMITTING);

  // let the committer complete and verify the job succeeds
  syncBarrier.await();
  assertJobState(job, JobStateInternal.SUCCEEDED);

  job.handle(new JobEvent(job.getID(),
      JobEventType.JOB_TASK_ATTEMPT_COMPLETED));
  assertJobState(job, JobStateInternal.SUCCEEDED);

  job.handle(new JobEvent(job.getID(), 
      JobEventType.JOB_MAP_TASK_RESCHEDULED));
  assertJobState(job, JobStateInternal.SUCCEEDED);

  dispatcher.stop();
  commitHandler.stop();
}
项目:hadoop    文件:TestJobImpl.java   
@Test(timeout=20000)
public void testRebootedDuringCommit() throws Exception {
  Configuration conf = new Configuration();
  conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
  conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 2);
  AsyncDispatcher dispatcher = new AsyncDispatcher();
  dispatcher.init(conf);
  dispatcher.start();
  CyclicBarrier syncBarrier = new CyclicBarrier(2);
  OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true);
  CommitterEventHandler commitHandler =
      createCommitterEventHandler(dispatcher, committer);
  commitHandler.init(conf);
  commitHandler.start();

  AppContext mockContext = mock(AppContext.class);
  when(mockContext.isLastAMRetry()).thenReturn(true);
  when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false);
  JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext);
  completeJobTasks(job);
  assertJobState(job, JobStateInternal.COMMITTING);

  syncBarrier.await();
  job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
  assertJobState(job, JobStateInternal.REBOOT);
  // return the external state as ERROR since this is last retry.
  Assert.assertEquals(JobState.RUNNING, job.getState());
  when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true);
  Assert.assertEquals(JobState.ERROR, job.getState());

  dispatcher.stop();
  commitHandler.stop();
}
项目:hadoop    文件:TestJobImpl.java   
@Test(timeout=20000)
public void testKilledDuringSetup() throws Exception {
  Configuration conf = new Configuration();
  conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
  AsyncDispatcher dispatcher = new AsyncDispatcher();
  dispatcher.init(conf);
  dispatcher.start();
  OutputCommitter committer = new StubbedOutputCommitter() {
    @Override
    public synchronized void setupJob(JobContext jobContext)
        throws IOException {
      while (!Thread.interrupted()) {
        try {
          wait();
        } catch (InterruptedException e) {
        }
      }
    }
  };
  CommitterEventHandler commitHandler =
      createCommitterEventHandler(dispatcher, committer);
  commitHandler.init(conf);
  commitHandler.start();

  JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
  JobId jobId = job.getID();
  job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
  assertJobState(job, JobStateInternal.INITED);
  job.handle(new JobStartEvent(jobId));
  assertJobState(job, JobStateInternal.SETUP);

  job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
  assertJobState(job, JobStateInternal.KILLED);
  dispatcher.stop();
  commitHandler.stop();
}
项目:hadoop    文件:TestMRApps.java   
@Test (timeout = 120000)
public void testGetJobFileWithUser() {
  Configuration conf = new Configuration();
  conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/my/path/to/staging");
  String jobFile = MRApps.getJobFile(conf, "dummy-user", 
      new JobID("dummy-job", 12345));
  assertNotNull("getJobFile results in null.", jobFile);
  assertEquals("jobFile with specified user is not as expected.",
      "/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile);
}