public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // Make one reducer slower for speculative execution TaskAttemptID taid = context.getTaskAttemptID(); long sleepTime = 100; Configuration conf = context.getConfiguration(); boolean test_speculate_reduce = conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); // IF TESTING REDUCE SPECULATIVE EXECUTION: // Make the "*_r_000000_0" attempt take much longer than the others. // When speculative execution is enabled, this should cause the attempt // to be killed and restarted. At that point, the attempt ID will be // "*_r_000000_1", so sleepTime will still remain 100ms. if ( (taid.getTaskType() == TaskType.REDUCE) && test_speculate_reduce && (taid.getTaskID().getId() == 0) && (taid.getId() == 0)) { sleepTime = 10000; } try{ Thread.sleep(sleepTime); } catch(InterruptedException ie) { // Ignore } context.write(key,new IntWritable(0)); }
public void configure(JobConf job) { super.configure(job); //disable the auto increment of the counter. For streaming, no of //processed records could be different(equal or less) than the no of //records input. SkipBadRecords.setAutoIncrMapperProcCount(job, false); skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false); if (mapInputWriterClass_.getCanonicalName().equals(TextInputWriter.class.getCanonicalName())) { String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName(); ignoreKey = job.getBoolean("stream.map.input.ignoreKey", inputFormatClassName.equals(TextInputFormat.class.getCanonicalName())); } try { mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8"); mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8"); numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1); } catch (UnsupportedEncodingException e) { throw new RuntimeException("The current system does not support UTF-8 encoding!", e); } }
@Test (timeout = 120000) public void testSetClasspathWithNoUserPrecendence() { Configuration conf = new Configuration(); conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true); conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false); Map<String, String> env = new HashMap<String, String>(); try { MRApps.setClasspath(env, conf); } catch (Exception e) { fail("Got exception while setting classpath"); } String env_str = env.get("CLASSPATH"); String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR, Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*", ApplicationConstants.Environment.PWD.$$() + "/*")); assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not in" + " the classpath!", env_str.contains(expectedClasspath)); assertFalse("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!", env_str.startsWith(expectedClasspath)); }
@SuppressWarnings("deprecation") @Test public void testGetTokensForNamenodes() throws IOException, URISyntaxException { Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", "test/build/data")); // ick, but need fq path minus file:/ String binaryTokenFile = FileSystem.getLocal(conf) .makeQualified(new Path(TEST_ROOT_DIR, "tokenFile")).toUri() .getPath(); MockFileSystem fs1 = createFileSystemForServiceName("service1"); Credentials creds = new Credentials(); Token<?> token1 = fs1.getDelegationToken(renewer); creds.addToken(token1.getService(), token1); // wait to set, else the obtain tokens call above will fail with FNF conf.set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, binaryTokenFile); creds.writeTokenStorageFile(new Path(binaryTokenFile), conf); TokenCache.obtainTokensForNamenodesInternal(fs1, creds, conf); String fs_addr = fs1.getCanonicalServiceName(); Token<?> nnt = TokenCache.getDelegationToken(creds, fs_addr); assertNotNull("Token for nn is null", nnt); }
public DefaultSpeculator (Configuration conf, AppContext context, TaskRuntimeEstimator estimator, Clock clock) { super(DefaultSpeculator.class.getName()); this.conf = conf; this.context = context; this.estimator = estimator; this.clock = clock; this.eventHandler = context.getEventHandler(); this.soonestRetryAfterNoSpeculate = conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE, MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_NO_SPECULATE); this.soonestRetryAfterSpeculate = conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE, MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_SPECULATE); this.proportionRunningTasksSpeculatable = conf.getDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS, MRJobConfig.DEFAULT_SPECULATIVECAP_RUNNING_TASKS); this.proportionTotalTasksSpeculatable = conf.getDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS, MRJobConfig.DEFAULT_SPECULATIVECAP_TOTAL_TASKS); this.minimumAllowedSpeculativeTasks = conf.getInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS, MRJobConfig.DEFAULT_SPECULATIVE_MINIMUM_ALLOWED_TASKS); }
@SuppressWarnings("deprecation") public void testSetupDistributedCacheConflictsFiles() throws Exception { Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); URI mockUri = URI.create("mockfs://mock/"); FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf)) .getRawFileSystem(); URI file = new URI("mockfs://mock/tmp/something.zip#something"); Path filePath = new Path(file); URI file2 = new URI("mockfs://mock/tmp/something.txt#something"); Path file2Path = new Path(file2); when(mockFs.resolvePath(filePath)).thenReturn(filePath); when(mockFs.resolvePath(file2Path)).thenReturn(file2Path); DistributedCache.addCacheFile(file, conf); DistributedCache.addCacheFile(file2, conf); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true"); Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); MRApps.setupDistributedCache(conf, localResources); assertEquals(1, localResources.size()); LocalResource lr = localResources.get("something"); //First one wins assertNotNull(lr); assertEquals(10l, lr.getSize()); assertEquals(10l, lr.getTimestamp()); assertEquals(LocalResourceType.FILE, lr.getType()); }
public void setConf(Configuration conf) { this.conf = conf; keyFieldHelper = new KeyFieldHelper(); String keyFieldSeparator = conf.get(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, "\t"); keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator); if (conf.get("num.key.fields.for.partition") != null) { LOG.warn("Using deprecated num.key.fields.for.partition. " + "Use mapreduce.partition.keypartitioner.options instead"); this.numOfPartitionFields = conf.getInt("num.key.fields.for.partition",0); keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields); } else { String option = conf.get(PARTITIONER_OPTIONS); keyFieldHelper.parseOption(option); } }
public void testTotalOrderBinarySearch() throws Exception { TotalOrderPartitioner<Text,NullWritable> partitioner = new TotalOrderPartitioner<Text,NullWritable>(); Configuration conf = new Configuration(); Path p = TestTotalOrderPartitioner.<Text>writePartitionFile( "totalorderbinarysearch", conf, splitStrings); conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false); conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, Text.class, Object.class); try { partitioner.setConf(conf); NullWritable nw = NullWritable.get(); for (Check<Text> chk : testStrings) { assertEquals(chk.data.toString(), chk.part, partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); } } finally { p.getFileSystem(conf).delete(p, true); } }
public void testTotalOrderMemCmp() throws Exception { TotalOrderPartitioner<Text,NullWritable> partitioner = new TotalOrderPartitioner<Text,NullWritable>(); Configuration conf = new Configuration(); Path p = TestTotalOrderPartitioner.<Text>writePartitionFile( "totalordermemcmp", conf, splitStrings); conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, Text.class, Object.class); try { partitioner.setConf(conf); NullWritable nw = NullWritable.get(); for (Check<Text> chk : testStrings) { assertEquals(chk.data.toString(), chk.part, partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); } } finally { p.getFileSystem(conf).delete(p, true); } }
/** * Sets the high ram job properties in the simulated job's configuration. */ @SuppressWarnings("deprecation") static void configureHighRamProperties(Configuration sourceConf, Configuration destConf) { // set the memory per map task scaleConfigParameter(sourceConf, destConf, MRConfig.MAPMEMORY_MB, MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); // validate and fail early validateTaskMemoryLimits(destConf, MRJobConfig.MAP_MEMORY_MB, JTConfig.JT_MAX_MAPMEMORY_MB); // set the memory per reduce task scaleConfigParameter(sourceConf, destConf, MRConfig.REDUCEMEMORY_MB, MRJobConfig.REDUCE_MEMORY_MB, MRJobConfig.DEFAULT_REDUCE_MEMORY_MB); // validate and fail early validateTaskMemoryLimits(destConf, MRJobConfig.REDUCE_MEMORY_MB, JTConfig.JT_MAX_REDUCEMEMORY_MB); }
/** * run a distributed job and verify that TokenCache is available * @throws IOException */ @Test public void testBinaryTokenFile() throws IOException { Configuration conf = mrCluster.getConfig(); // provide namenodes names for the job to get the delegation tokens for final String nnUri = dfsCluster.getURI(0).toString(); conf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri); // using argument to pass the file name final String[] args = { "-m", "1", "-r", "1", "-mt", "1", "-rt", "1" }; int res = -1; try { res = ToolRunner.run(conf, new MySleepJob(), args); } catch (Exception e) { System.out.println("Job failed with " + e.getLocalizedMessage()); e.printStackTrace(System.out); fail("Job failed"); } assertEquals("dist job res is not 0:", 0, res); }
/** {@inheritDoc} */ public RecordWriter<K, V> getRecordWriter(FileSystem filesystem, JobConf job, String name, Progressable progress) throws IOException { org.apache.hadoop.mapreduce.RecordWriter<K, V> w = super.getRecordWriter( new TaskAttemptContextImpl(job, TaskAttemptID.forName(job.get(MRJobConfig.TASK_ATTEMPT_ID)))); org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter writer = (org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter) w; try { return new DBRecordWriter(writer.getConnection(), writer.getStatement()); } catch(SQLException se) { throw new IOException(se); } }
/** * Creates a {@link ApplicationClassLoader} if * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and * the APP_CLASSPATH environment variable is set. * @param conf * @return the created job classloader, or null if the job classloader is not * enabled or the APP_CLASSPATH environment variable is not set * @throws IOException */ public static ClassLoader createJobClassLoader(Configuration conf) throws IOException { ClassLoader jobClassLoader = null; if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) { String appClasspath = System.getenv(Environment.APP_CLASSPATH.key()); if (appClasspath == null) { LOG.warn("Not creating job classloader since APP_CLASSPATH is not set."); } else { LOG.info("Creating job classloader"); if (LOG.isDebugEnabled()) { LOG.debug("APP_CLASSPATH=" + appClasspath); } String[] systemClasses = getSystemClasses(conf); jobClassLoader = createJobClassLoader(appClasspath, systemClasses); } } return jobClassLoader; }
@Test (timeout = 30000) public void testDeletionofStagingOnKill() throws IOException { conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); fs = mock(FileSystem.class); when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); //Staging Dir exists String user = UserGroupInformation.getCurrentUser().getShortUserName(); Path stagingDir = MRApps.getStagingAreaDir(conf, user); when(fs.exists(stagingDir)).thenReturn(true); ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 0); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 0); JobId jobid = recordFactory.newRecordInstance(JobId.class); jobid.setAppId(appId); ContainerAllocator mockAlloc = mock(ContainerAllocator.class); MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc); appMaster.init(conf); //simulate the process being killed MRAppMaster.MRAppMasterShutdownHook hook = new MRAppMaster.MRAppMasterShutdownHook(appMaster); hook.run(); verify(fs, times(0)).delete(stagingJobPath, true); }
@Override protected String[] genArgs() { // set the testcase-specific config properties first and the remaining // arguments are set in TestStreaming.genArgs(). args.add("-jobconf"); args.add(MRJobConfig.MAP_OUTPUT_KEY_CLASS + "=org.apache.hadoop.io.LongWritable"); args.add("-jobconf"); args.add(MRJobConfig.OUTPUT_KEY_CLASS + "=org.apache.hadoop.io.LongWritable"); // Using SequenceFileOutputFormat here because with TextOutputFormat, the // mapred.output.key.class set in JobConf (which we want to test here) is // not read/used at all. args.add("-outputformat"); args.add("org.apache.hadoop.mapred.SequenceFileOutputFormat"); return super.genArgs(); }
public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo( JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir) throws IOException { long maxMetaInfoSize = conf.getLong(MRJobConfig.SPLIT_METAINFO_MAXSIZE, MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE); Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir); String jobSplitFile = JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString(); FileStatus fStatus = fs.getFileStatus(metaSplitFile); if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) { throw new IOException("Split metadata size exceeded " + maxMetaInfoSize +". Aborting job " + jobId); } FSDataInputStream in = fs.open(metaSplitFile); byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length]; in.readFully(header); if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) { throw new IOException("Invalid header on split file"); } int vers = WritableUtils.readVInt(in); if (vers != JobSplit.META_SPLIT_VERSION) { in.close(); throw new IOException("Unsupported split version " + vers); } int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = new JobSplit.TaskSplitMetaInfo[numSplits]; for (int i = 0; i < numSplits; i++) { JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo(); splitMetaInfo.readFields(in); JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex( jobSplitFile, splitMetaInfo.getStartOffset()); allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex, splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength()); } in.close(); return allSplitMetaInfo; }
@Override public void init(ShuffleConsumerPlugin.Context context) { this.reduceId = context.getReduceId(); this.jobConf = context.getJobConf(); this.umbilical = context.getUmbilical(); this.reporter = context.getReporter(); this.copyPhase = context.getCopyPhase(); this.mergePhase = context.getMergePhase(); this.taskStatus = context.getStatus(); this.reduceTask = context.getReduceTask(); this.codec = context.getCodec(); this.spilledRecordsCounter = context.getSpilledRecordsCounter(); this.mergedMapOutputsCounter = context.getMergedMapOutputsCounter(); jobConf.setBoolean(MRConfig.MAPRED_IFILE_READAHEAD, false); try { lustrefs = (LustreFileSystem)FileSystem.get(LustreFileSystem.NAME, jobConf); mapOutputDir = SharedFsPlugins.getTempPath(jobConf, JobID.downgrade(reduceId.getJobID())); reduceDir = new Path(mapOutputDir, String.format(SharedFsPlugins.MAP_OUTPUT, reduceId.getTaskID().getId(), 0, 0)).getParent(); mergeTempDir = new Path(mapOutputDir, "temp"); } catch (IOException ioe) { throw new RuntimeException("Map Output directory not found !!", ioe); } // Scheduler scheduler = new ShuffleSchedulerImpl<K, V>( jobConf, taskStatus, reduceId, this, copyPhase, context.getShuffledMapsCounter(), context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100); this.merger = new FileMerger(); this.merger.start(); }
/** * Implementation of InputFormat::getSplits(). Returns a list of InputSplits, such that the number of bytes to be * copied for all the splits are approximately equal. * * @param context JobContext for the job. * @return The list of uniformly-distributed input-splits. * @throws IOException: On failure. * @throws InterruptedException */ @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { Configuration configuration = context.getConfiguration(); int numSplits = ConfigurationUtil.getInt(configuration, MRJobConfig.NUM_MAPS); if (numSplits == 0) { return new ArrayList<>(); } return getSplits(configuration, numSplits, ConfigurationUtil.getLong(configuration, S3MapReduceCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED)); }
@Override public Task createRemoteTask() { //job file name is set in TaskAttempt, setting it null here MapTask mapTask = new MapTask("", TypeConverter.fromYarn(getID()), partition, splitInfo.getSplitIndex(), 1); // YARN doesn't have the concept of slots per task, set it as 1. mapTask.setUser(conf.get(MRJobConfig.USER_NAME)); mapTask.setConf(conf); return mapTask; }
@Test public void testAMStandardEnv() throws Exception { final String ADMIN_LIB_PATH = "foo"; final String USER_LIB_PATH = "bar"; final String USER_SHELL = "shell"; JobConf jobConf = new JobConf(); jobConf.set(MRJobConfig.MR_AM_ADMIN_USER_ENV, "LD_LIBRARY_PATH=" + ADMIN_LIB_PATH); jobConf.set(MRJobConfig.MR_AM_ENV, "LD_LIBRARY_PATH=" + USER_LIB_PATH); jobConf.set(MRJobConfig.MAPRED_ADMIN_USER_SHELL, USER_SHELL); YARNRunner yarnRunner = new YARNRunner(jobConf); ApplicationSubmissionContext appSubCtx = buildSubmitContext(yarnRunner, jobConf); // make sure PWD is first in the lib path ContainerLaunchContext clc = appSubCtx.getAMContainerSpec(); Map<String, String> env = clc.getEnvironment(); String libPath = env.get(Environment.LD_LIBRARY_PATH.name()); assertNotNull("LD_LIBRARY_PATH not set", libPath); String cps = jobConf.getBoolean( MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM) ? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator; assertEquals("Bad AM LD_LIBRARY_PATH setting", MRApps.crossPlatformifyMREnv(conf, Environment.PWD) + cps + ADMIN_LIB_PATH + cps + USER_LIB_PATH, libPath); // make sure SHELL is set String shell = env.get(Environment.SHELL.name()); assertNotNull("SHELL not set", shell); assertEquals("Bad SHELL setting", USER_SHELL, shell); }
/** * Add a file path to the current set of classpath entries. It adds the file * to cache as well. Intended to be used by user code. * * @param file Path of the file to be added * @param conf Configuration that contains the classpath setting * @param fs FileSystem with respect to which {@code archivefile} should * be interpreted. */ public static void addFileToClassPath (Path file, Configuration conf, FileSystem fs) throws IOException { String classpath = conf.get(MRJobConfig.CLASSPATH_FILES); conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString() : classpath + "," + file.toString()); URI uri = fs.makeQualified(file).toUri(); addCacheFile(uri, conf); }
public void setConf(Configuration conf) { this.conf = conf; String option = conf.get(COMPARATOR_OPTIONS); String keyFieldSeparator = conf.get(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR,"\t"); keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator); keyFieldHelper.parseOption(option); }
@Override public void contextualize(Configuration conf, AppContext context) { super.contextualize(conf, context); lambda = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS, MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS); smoothedValue = conf.getBoolean(MRJobConfig.MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE, true) ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS; }
@Override public void contextualize(Configuration conf, AppContext context) { this.context = context; Map<JobId, Job> allJobs = context.getAllJobs(); for (Map.Entry<JobId, Job> entry : allJobs.entrySet()) { final Job job = entry.getValue(); mapperStatistics.put(job, new DataStatistics()); reducerStatistics.put(job, new DataStatistics()); slowTaskRelativeTresholds.put (job, conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f)); } }
@Test public void testProfileParamsSetter() { JobConf configuration = new JobConf(); configuration.setProfileParams("test"); Assert.assertEquals("test", configuration.get(MRJobConfig.TASK_PROFILE_PARAMS)); }
protected String[] genArgs(boolean ignoreKey) { return new String[] { "-input", INPUT_FILE.getAbsolutePath(), "-output", OUTPUT_DIR.getAbsolutePath(), "-mapper", TestStreaming.CAT, "-jobconf", MRJobConfig.PRESERVE_FAILED_TASK_FILES + "=true", "-jobconf", "stream.non.zero.exit.is.failure=true", "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"), "-jobconf", "stream.map.input.ignoreKey="+ignoreKey, }; }
private int getMemoryRequired(Configuration conf, TaskType taskType) { int memory = 1024; if (taskType == TaskType.MAP) { memory = conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); } else if (taskType == TaskType.REDUCE) { memory = conf.getInt(MRJobConfig.REDUCE_MEMORY_MB, MRJobConfig.DEFAULT_REDUCE_MEMORY_MB); } return memory; }
private int getGpuRequired(Configuration conf, TaskType taskType) { int gcores = 0; if (taskType == TaskType.MAP) { gcores = conf.getInt(MRJobConfig.MAP_GPU_CORES, MRJobConfig.DEFAULT_MAP_GPU_CORES); } else if (taskType == TaskType.REDUCE) { gcores = conf.getInt(MRJobConfig.REDUCE_GPU_CORES, MRJobConfig.DEFAULT_REDUCE_GPU_CORES); } return gcores; }
public void configure(JobConf job) { this.job = job; //disable the auto increment of the counter. For pipes, no of processed //records could be different(equal or less) than the no of records input. SkipBadRecords.setAutoIncrReducerProcCount(job, false); skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false); }
@Test public void testNodeLabelExp() throws Exception { JobConf jobConf = new JobConf(); jobConf.set(MRJobConfig.JOB_NODE_LABEL_EXP, "GPU"); jobConf.set(MRJobConfig.AM_NODE_LABEL_EXP, "highMem"); YARNRunner yarnRunner = new YARNRunner(jobConf); ApplicationSubmissionContext appSubCtx = buildSubmitContext(yarnRunner, jobConf); assertEquals(appSubCtx.getNodeLabelExpression(), "GPU"); assertEquals(appSubCtx.getAMContainerResourceRequest() .getNodeLabelExpression(), "highMem"); }
@Override public CharSequence getUser() { if (userName != null) { userName = conf.get(MRJobConfig.USER_NAME, "history-user"); } return userName; }
@Test(timeout=20000) public void testWarnCommandOpts() throws Exception { Logger logger = Logger.getLogger(YARNRunner.class); ByteArrayOutputStream bout = new ByteArrayOutputStream(); Layout layout = new SimpleLayout(); Appender appender = new WriterAppender(layout, bout); logger.addAppender(appender); JobConf jobConf = new JobConf(); jobConf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, "-Djava.net.preferIPv4Stack=true -Djava.library.path=foo"); jobConf.set(MRJobConfig.MR_AM_COMMAND_OPTS, "-Xmx1024m -Djava.library.path=bar"); YARNRunner yarnRunner = new YARNRunner(jobConf); @SuppressWarnings("unused") ApplicationSubmissionContext submissionContext = buildSubmitContext(yarnRunner, jobConf); String logMsg = bout.toString(); assertTrue(logMsg.contains("WARN - Usage of -Djava.library.path in " + "yarn.app.mapreduce.am.admin-command-opts can cause programs to no " + "longer function if hadoop native libraries are used. These values " + "should be set as part of the LD_LIBRARY_PATH in the app master JVM " + "env using yarn.app.mapreduce.am.admin.user.env config settings.")); assertTrue(logMsg.contains("WARN - Usage of -Djava.library.path in " + "yarn.app.mapreduce.am.command-opts can cause programs to no longer " + "function if hadoop native libraries are used. These values should " + "be set as part of the LD_LIBRARY_PATH in the app master JVM env " + "using yarn.app.mapreduce.am.env config settings.")); }
@Override protected void configureServlets() { Path confPath = new Path(testConfDir.toString(), MRJobConfig.JOB_CONF_FILE); Configuration config = new Configuration(); FileSystem localFs; try { localFs = FileSystem.getLocal(config); confPath = localFs.makeQualified(confPath); OutputStream out = localFs.create(confPath); try { conf.writeXml(out); } finally { out.close(); } if (!localFs.exists(confPath)) { fail("error creating config file: " + confPath); } } catch (IOException e) { fail("error creating config file: " + e.getMessage()); } appContext = new MockAppContext(0, 2, 1, confPath); bind(JAXBContextResolver.class); bind(AMWebServices.class); bind(GenericExceptionHandler.class); bind(AppContext.class).toInstance(appContext); bind(Configuration.class).toInstance(conf); serve("/*").with(GuiceContainer.class); }
/** * Create a local reduce input file name. * * @param mapId a map task id * @param size the size of the file * @return path * @throws IOException */ @Override public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId, long size) throws IOException { return lDirAlloc.getLocalPathForWrite(String.format( REDUCE_INPUT_FILE_FORMAT_STRING, MRJobConfig.OUTPUT, mapId.getId()), size, getConf()); }
/** * Create the desired number of splits, dividing the number of rows * between the mappers. */ public List<InputSplit> getSplits(JobContext job) { long totalRows = getNumberOfRows(job); int numSplits = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1); LOG.info("Generating " + totalRows + " using " + numSplits); List<InputSplit> splits = new ArrayList<InputSplit>(); long currentRow = 0; for(int split = 0; split < numSplits; ++split) { long goal = (long) Math.ceil(totalRows * (double)(split + 1) / numSplits); splits.add(new RangeInputSplit(currentRow, goal - currentRow)); currentRow = goal; } return splits; }
/** * Return a local reduce input file created earlier * * @param mapId a map task id * @return path * @throws IOException */ @Override public Path getInputFile(int mapId) throws IOException { return lDirAlloc.getLocalPathToRead(String.format( REDUCE_INPUT_FILE_FORMAT_STRING, MRJobConfig.OUTPUT, Integer .valueOf(mapId)), getConf()); }
@Test(timeout=20000) public void testCheckJobCompleteSuccess() throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); AsyncDispatcher dispatcher = new AsyncDispatcher(); dispatcher.init(conf); dispatcher.start(); CyclicBarrier syncBarrier = new CyclicBarrier(2); OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true); CommitterEventHandler commitHandler = createCommitterEventHandler(dispatcher, committer); commitHandler.init(conf); commitHandler.start(); JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null); completeJobTasks(job); assertJobState(job, JobStateInternal.COMMITTING); // let the committer complete and verify the job succeeds syncBarrier.await(); assertJobState(job, JobStateInternal.SUCCEEDED); job.handle(new JobEvent(job.getID(), JobEventType.JOB_TASK_ATTEMPT_COMPLETED)); assertJobState(job, JobStateInternal.SUCCEEDED); job.handle(new JobEvent(job.getID(), JobEventType.JOB_MAP_TASK_RESCHEDULED)); assertJobState(job, JobStateInternal.SUCCEEDED); dispatcher.stop(); commitHandler.stop(); }
@Test(timeout=20000) public void testRebootedDuringCommit() throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 2); AsyncDispatcher dispatcher = new AsyncDispatcher(); dispatcher.init(conf); dispatcher.start(); CyclicBarrier syncBarrier = new CyclicBarrier(2); OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true); CommitterEventHandler commitHandler = createCommitterEventHandler(dispatcher, committer); commitHandler.init(conf); commitHandler.start(); AppContext mockContext = mock(AppContext.class); when(mockContext.isLastAMRetry()).thenReturn(true); when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false); JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext); completeJobTasks(job); assertJobState(job, JobStateInternal.COMMITTING); syncBarrier.await(); job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT)); assertJobState(job, JobStateInternal.REBOOT); // return the external state as ERROR since this is last retry. Assert.assertEquals(JobState.RUNNING, job.getState()); when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true); Assert.assertEquals(JobState.ERROR, job.getState()); dispatcher.stop(); commitHandler.stop(); }
@Test(timeout=20000) public void testKilledDuringSetup() throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); AsyncDispatcher dispatcher = new AsyncDispatcher(); dispatcher.init(conf); dispatcher.start(); OutputCommitter committer = new StubbedOutputCommitter() { @Override public synchronized void setupJob(JobContext jobContext) throws IOException { while (!Thread.interrupted()) { try { wait(); } catch (InterruptedException e) { } } } }; CommitterEventHandler commitHandler = createCommitterEventHandler(dispatcher, committer); commitHandler.init(conf); commitHandler.start(); JobImpl job = createStubbedJob(conf, dispatcher, 2, null); JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); job.handle(new JobStartEvent(jobId)); assertJobState(job, JobStateInternal.SETUP); job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL)); assertJobState(job, JobStateInternal.KILLED); dispatcher.stop(); commitHandler.stop(); }
@Test (timeout = 120000) public void testGetJobFileWithUser() { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/my/path/to/staging"); String jobFile = MRApps.getJobFile(conf, "dummy-user", new JobID("dummy-job", 12345)); assertNotNull("getJobFile results in null.", jobFile); assertEquals("jobFile with specified user is not as expected.", "/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile); }