@Override public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException { final JobClient client = new JobClient(new JobConf(jobCtxt.getConfiguration())); ClusterStatus stat = client.getClusterStatus(true); final long toGen = jobCtxt.getConfiguration().getLong(GRIDMIX_GEN_BYTES, -1); if (toGen < 0) { throw new IOException("Invalid/missing generation bytes: " + toGen); } final int nTrackers = stat.getTaskTrackers(); final long bytesPerTracker = toGen / nTrackers; final ArrayList<InputSplit> splits = new ArrayList<InputSplit>(nTrackers); final Pattern trackerPattern = Pattern.compile("tracker_([^:]*):.*"); final Matcher m = trackerPattern.matcher(""); for (String tracker : stat.getActiveTrackerNames()) { m.reset(tracker); if (!m.find()) { System.err.println("Skipping node: " + tracker); continue; } final String name = m.group(1); splits.add(new GenSplit(bytesPerTracker, new String[] { name })); } return splits; }
private List<InputSplit> createSplits(JobContext jobContext, List<DynamicInputChunk> chunks) throws IOException { int numMaps = getNumMapTasks(jobContext.getConfiguration()); final int nSplits = Math.min(numMaps, chunks.size()); List<InputSplit> splits = new ArrayList<>(nSplits); for (int i = 0; i < nSplits; ++i) { TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i); chunks.get(i).assignTo(taskId); splits.add(new FileSplit(chunks.get(i).getPath(), 0, // Setting non-zero length for FileSplit size, to avoid a possible // future when 0-sized file-splits are considered "empty" and skipped // over. getMinRecordsPerChunk(jobContext.getConfiguration()), null)); } ConfigurationUtil.publish(jobContext.getConfiguration(), CONF_LABEL_NUM_SPLITS, splits.size()); return splits; }
@Override public List<InputSplit> getSplits(JobContext context) throws IOException { List<InputSplit> allSplits = new ArrayList<InputSplit>(); Scan originalScan = getScan(); Scan[] scans = rowKeyDistributor.getDistributedScans(originalScan); for (Scan scan : scans) { // Internally super.getSplits(...) uses scan object stored in private variable, // to re-use the code of super class we switch scan object with scans we setScan(scan); List<InputSplit> splits = super.getSplits(context); allSplits.addAll(splits); } // Setting original scan back setScan(originalScan); return allSplits; }
private int getDesiredNumberOfMappers(JobContext jobContext) { int desiredNumberOfMappers = jobContext.getConfiguration().getInt( OraOopConstants.ORAOOP_DESIRED_NUMBER_OF_MAPPERS, -1); int minMappersAcceptedByOraOop = OraOopUtilities.getMinNumberOfImportMappersAcceptedByOraOop(jobContext .getConfiguration()); if (desiredNumberOfMappers < minMappersAcceptedByOraOop) { LOG.warn(String.format("%s should not be used to perform a sqoop import " + "when the number of mappers is %d\n " + "i.e. OraOopManagerFactory.accept() should only appect jobs " + "where the number of mappers is at least %d", OraOopConstants.ORAOOP_PRODUCT_NAME, desiredNumberOfMappers, minMappersAcceptedByOraOop)); } return desiredNumberOfMappers; }
public void checkOutputSpecs(JobContext job ) throws FileAlreadyExistsException, IOException{ // Ensure that the output directory is set and not already there Path outDir = getOutputPath(job); if (outDir == null) { throw new InvalidJobConfException("Output directory not set."); } // get delegation token for outDir's file system TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outDir }, job.getConfiguration()); if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) { throw new FileAlreadyExistsException("Output directory " + outDir + " already exists"); } }
@Override /** {@inheritDoc} */ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); DBConfiguration dbConf = new DBConfiguration(conf); // Sanity check all the configuration values we need. if (null == conf.get(DBConfiguration.URL_PROPERTY)) { throw new IOException("Database connection URL is not set."); } else if (null == dbConf.getOutputTableName()) { throw new IOException("Table name is not set for export."); } else if (null == dbConf.getOutputFieldNames()) { throw new IOException( "Output field names are null."); } else if (null == conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY)) { throw new IOException("Update key column is not set for export."); } }
@Override /** {@inheritDoc} */ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); DBConfiguration dbConf = new DBConfiguration(conf); // Sanity check all the configuration values we need. if (null == conf.get(DBConfiguration.URL_PROPERTY)) { throw new IOException("Database connection URL is not set."); } else if (null == dbConf.getOutputTableName()) { throw new IOException("Procedure name is not set for export"); } else if (null == dbConf.getOutputFieldNames() && 0 == dbConf.getOutputFieldCount()) { throw new IOException( "Output field names are null and zero output field count set."); } }
/** {@inheritDoc} */ public List<InputSplit> getSplits(JobContext context) { //get the property values final int startDigit = context.getConfiguration().getInt( DIGIT_START_PROPERTY, 1); final int nDigits = context.getConfiguration().getInt( DIGIT_SIZE_PROPERTY, 100); final int nMaps = context.getConfiguration().getInt( DIGIT_PARTS_PROPERTY, 1); //create splits final List<InputSplit> splits = new ArrayList<InputSplit>(nMaps); final int[] parts = partition(startDigit - 1, nDigits, nMaps); for (int i = 0; i < parts.length; ++i) { final int k = i < parts.length - 1 ? parts[i+1]: nDigits+startDigit-1; splits.add(new BbpSplit(i, parts[i], k - parts[i])); } return splits; }
@Override /** {@inheritDoc} */ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); DBConfiguration dbConf = new DBConfiguration(conf); // Sanity check all the configuration values we need. if (null == conf.get(DBConfiguration.URL_PROPERTY)) { throw new IOException("Database connection URL is not set."); } else if (null == dbConf.getOutputTableName()) { throw new IOException("Table name is not set for export"); } else if (null == dbConf.getOutputFieldNames() && 0 == dbConf.getOutputFieldCount()) { throw new IOException( "Output field names are null and zero output field count set."); } }
@Override @Deprecated public void cleanupJob(JobContext context) throws IOException { if (hasOutputPath()) { Path pendingJobAttemptsPath = getPendingJobAttemptsPath(); FileSystem fs = pendingJobAttemptsPath .getFileSystem(context.getConfiguration()); fs.delete(pendingJobAttemptsPath, true); } else { LOG.warn("Output Path is null in cleanupJob()"); } }
/** * Create a local map output index file name on the same volume. */ public Path getOutputIndexFileForWriteInVolume(Path existing) { Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR); Path attemptOutputDir = new Path(outputDir, conf.get(JobContext.TASK_ATTEMPT_ID)); return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING); }
/** * The job has completed so move all committed tasks to the final output dir. * Delete the temporary directory, including all of the work directories. * Create a _SUCCESS file to make it as successful. * @param context the job's context */ public void commitJob(JobContext context) throws IOException { if (hasOutputPath()) { Path finalOutput = getOutputPath(); FileSystem fs = finalOutput.getFileSystem(context.getConfiguration()); if (algorithmVersion == 1) { for (FileStatus stat: getAllCommittedTaskPaths(context)) { mergePaths(fs, stat, finalOutput); } } // delete the _temporary folder and create a _done file in the o/p folder cleanupJob(context); // True if the job requires output.dir marked on successful job. // Note that by default it is set to true. if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) { Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME); fs.create(markerPath).close(); } } else { LOG.warn("Output Path is null in commitJob()"); } }
/** * Returns a split for each store files directory using the block location * of each file as locality reference. */ @Override public List<InputSplit> getSplits(JobContext job) throws IOException { List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); Text key = new Text(); for (FileStatus file: files) { Path path = file.getPath(); FileSystem fs = path.getFileSystem(job.getConfiguration()); LineReader reader = new LineReader(fs.open(path)); long pos = 0; int n; try { while ((n = reader.readLine(key)) > 0) { String[] hosts = getStoreDirHosts(fs, path); splits.add(new FileSplit(path, pos, n, hosts)); pos += n; } } finally { reader.close(); } } return splits; }
@Test public void testTransitionsAtFailed() throws IOException { Configuration conf = new Configuration(); AsyncDispatcher dispatcher = new AsyncDispatcher(); dispatcher.init(conf); dispatcher.start(); OutputCommitter committer = mock(OutputCommitter.class); doThrow(new IOException("forcefail")) .when(committer).setupJob(any(JobContext.class)); CommitterEventHandler commitHandler = createCommitterEventHandler(dispatcher, committer); commitHandler.init(conf); commitHandler.start(); AppContext mockContext = mock(AppContext.class); when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false); JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext); JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); job.handle(new JobStartEvent(jobId)); assertJobState(job, JobStateInternal.FAILED); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED)); assertJobState(job, JobStateInternal.FAILED); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED)); assertJobState(job, JobStateInternal.FAILED); job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED)); assertJobState(job, JobStateInternal.FAILED); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)); assertJobState(job, JobStateInternal.FAILED); Assert.assertEquals(JobState.RUNNING, job.getState()); when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true); Assert.assertEquals(JobState.FAILED, job.getState()); dispatcher.stop(); commitHandler.stop(); }
@Override public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException { // get the total data to be generated long toGen = jobCtxt.getConfiguration().getLong(GenerateData.GRIDMIX_GEN_BYTES, -1); if (toGen < 0) { throw new IOException("Invalid/missing generation bytes: " + toGen); } // get the total number of mappers configured int totalMappersConfigured = jobCtxt.getConfiguration().getInt(MRJobConfig.NUM_MAPS, -1); if (totalMappersConfigured < 0) { throw new IOException("Invalid/missing num mappers: " + totalMappersConfigured); } final long bytesPerTracker = toGen / totalMappersConfigured; final ArrayList<InputSplit> splits = new ArrayList<InputSplit>(totalMappersConfigured); for (int i = 0; i < totalMappersConfigured; ++i) { splits.add(new GenSplit(bytesPerTracker, new String[] { "tracker_local" })); } return splits; }
public void testEmptyOutput() throws Exception { Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); // setup committer.setupJob(jContext); committer.setupTask(tContext); // Do not write any output // do commit committer.commitTask(tContext); committer.commitJob(jContext); FileUtil.fullyDelete(new File(outDir.toString())); }
/** Partitions the summation into parts and then return them as splits */ @Override public List<InputSplit> getSplits(JobContext context) { //read sigma from conf final Configuration conf = context.getConfiguration(); final Summation sigma = SummationWritable.read(DistSum.class, conf); final int nParts = conf.getInt(N_PARTS, 0); //create splits final List<InputSplit> splits = new ArrayList<InputSplit>(nParts); final Summation[] parts = sigma.partition(nParts); for(int i = 0; i < parts.length; ++i) { splits.add(new SummationSplit(parts[i])); //LOG.info("parts[" + i + "] = " + parts[i]); } return splits; }
/** * Get the {@link CompressionCodec} for compressing the job outputs. * @param job the {@link Job} to look in * @param defaultValue the {@link CompressionCodec} to return if not set * @return the {@link CompressionCodec} to be used to compress the * job outputs * @throws IllegalArgumentException if the class was specified, but not found */ public static Class<? extends CompressionCodec> getOutputCompressorClass(JobContext job, Class<? extends CompressionCodec> defaultValue) { Class<? extends CompressionCodec> codecClass = defaultValue; Configuration conf = job.getConfiguration(); String name = conf.get(FileOutputFormat.COMPRESS_CODEC); if (name != null) { try { codecClass = conf.getClassByName(name).asSubclass(CompressionCodec.class); } catch (ClassNotFoundException e) { throw new IllegalArgumentException("Compression codec " + name + " was not found.", e); } } return codecClass; }
@SuppressWarnings("unchecked") NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { collector = createSortingCollector(job, reporter); partitions = jobContext.getNumReduceTasks(); if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() { @Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1; } }; } }
@Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); int mappers = conf.getInt(CONF_NUM_SPLITS, 0); if (mappers == 0 && snapshotFiles.size() > 0) { mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); mappers = Math.min(mappers, snapshotFiles.size()); conf.setInt(CONF_NUM_SPLITS, mappers); conf.setInt(MR_NUM_MAPS, mappers); } List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers); List<InputSplit> splits = new ArrayList(groups.size()); for (List<Pair<SnapshotFileInfo, Long>> files: groups) { splits.add(new ExportSnapshotInputSplit(files)); } return splits; }
/** * Create a file output committer * @param outputPath the job's output path, or null if you want the output * committer to act as a noop. * @param context the task's context * @throws IOException */ @Private public FileOutputCommitter(Path outputPath, JobContext context) throws IOException { Configuration conf = context.getConfiguration(); algorithmVersion = conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT); LOG.info("File Output Committer Algorithm version is " + algorithmVersion); if (algorithmVersion != 1 && algorithmVersion != 2) { throw new IOException("Only 1 or 2 algorithm version is supported"); } if (outputPath != null) { FileSystem fs = outputPath.getFileSystem(context.getConfiguration()); this.outputPath = fs.makeQualified(outputPath); } }
/** * Logically splits the set of input files for the job, splits N lines * of the input as one split. * * @see FileInputFormat#getSplits(JobContext) */ public List<InputSplit> getSplits(JobContext job) throws IOException { List<InputSplit> splits = new ArrayList<InputSplit>(); int numLinesPerSplit = getNumLinesPerSplit(job); for (FileStatus status : listStatus(job)) { splits.addAll(getSplitsForFile(status, job.getConfiguration(), numLinesPerSplit)); } return splits; }
/** * Get the list of input {@link Path}s for the map-reduce job. * * @param context The job * @return the list of input {@link Path}s for the map-reduce job. */ public static Path[] getInputPaths(JobContext context) { String dirs = context.getConfiguration().get(INPUT_DIR, ""); String [] list = StringUtils.split(dirs); Path[] result = new Path[list.length]; for (int i = 0; i < list.length; i++) { result[i] = new Path(StringUtils.unEscapeString(list[i])); } return result; }
private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs, PathFilter inputFilter, boolean recursive) throws IOException { List<FileStatus> result = new ArrayList<FileStatus>(); List<IOException> errors = new ArrayList<IOException>(); for (int i=0; i < dirs.length; ++i) { Path p = dirs[i]; FileSystem fs = p.getFileSystem(job.getConfiguration()); FileStatus[] matches = fs.globStatus(p, inputFilter); if (matches == null) { errors.add(new IOException("Input path does not exist: " + p)); } else if (matches.length == 0) { errors.add(new IOException("Input Pattern " + p + " matches 0 files")); } else { for (FileStatus globStat: matches) { if (globStat.isDirectory()) { RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(globStat.getPath()); while (iter.hasNext()) { LocatedFileStatus stat = iter.next(); if (inputFilter.accept(stat.getPath())) { if (recursive && stat.isDirectory()) { addInputPathRecursively(result, fs, stat.getPath(), inputFilter); } else { result.add(stat); } } } } else { result.add(globStat); } } } } if (!errors.isEmpty()) { throw new InvalidInputException(errors); } return result; }
/** * Build a CompositeInputSplit from the child InputFormats by assigning the * ith split from each child to the ith composite split. */ @SuppressWarnings("unchecked") public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { setFormat(job.getConfiguration()); job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.minsize", Long.MAX_VALUE); return root.getSplits(job); }
/** @inheritDoc */ @Override public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { try { super.abortJob(jobContext, state); } finally { cleanup(jobContext.getConfiguration()); } }
/** * Implementation of InputFormat::getSplits(). Returns a list of InputSplits, such that the number of bytes to be * copied for all the splits are approximately equal. * * @param context JobContext for the job. * @return The list of uniformly-distributed input-splits. * @throws IOException: On failure. * @throws InterruptedException */ @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { Configuration configuration = context.getConfiguration(); int numSplits = ConfigurationUtil.getInt(configuration, MRJobConfig.NUM_MAPS); if (numSplits == 0) { return new ArrayList<>(); } return getSplits(configuration, numSplits, ConfigurationUtil.getLong(configuration, S3MapReduceCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED)); }
/** @inheritDoc */ @Override public void checkOutputSpecs(JobContext context) throws IOException { Configuration conf = context.getConfiguration(); Path workingPath = getCommitDirectory(conf); if (getCommitDirectory(conf) == null) { throw new IllegalStateException("Commit directory not configured"); } // get delegation token for outDir's file system TokenCache.obtainTokensForNamenodes(context.getCredentials(), new Path[] { workingPath }, conf); }
@Test public void getSplits() throws Exception { S3MapReduceCpOptions options = getOptions(); Configuration configuration = new Configuration(); configuration.set("mapred.map.tasks", String.valueOf(options.getMaxMaps())); CopyListing.getCopyListing(configuration, CREDENTIALS, options).buildListing( new Path(cluster.getFileSystem().getUri().toString() + "/tmp/testDynInputFormat/fileList.seq"), options); JobContext jobContext = new JobContextImpl(configuration, new JobID()); DynamicInputFormat<Text, CopyListingFileStatus> inputFormat = new DynamicInputFormat<>(); List<InputSplit> splits = inputFormat.getSplits(jobContext); int nFiles = 0; int taskId = 0; for (InputSplit split : splits) { RecordReader<Text, CopyListingFileStatus> recordReader = inputFormat.createRecordReader(split, null); StubContext stubContext = new StubContext(jobContext.getConfiguration(), recordReader, taskId); final TaskAttemptContext taskAttemptContext = stubContext.getContext(); recordReader.initialize(splits.get(0), taskAttemptContext); float previousProgressValue = 0f; while (recordReader.nextKeyValue()) { CopyListingFileStatus fileStatus = recordReader.getCurrentValue(); String source = fileStatus.getPath().toString(); assertTrue(expectedFilePaths.contains(source)); final float progress = recordReader.getProgress(); assertTrue(progress >= previousProgressValue); assertTrue(progress >= 0.0f); assertTrue(progress <= 1.0f); previousProgressValue = progress; ++nFiles; } assertTrue(recordReader.getProgress() == 1.0f); ++taskId; } Assert.assertEquals(expectedFilePaths.size(), nFiles); }
@Override public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException { List<TableSnapshotInputFormatImpl.InputSplit> splits = delegate.getSplits(jobContext.getConfiguration()); List<InputSplit> rtn = Lists.newArrayListWithCapacity(splits.size()); for (TableSnapshotInputFormatImpl.InputSplit split : splits) { rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split)); } return rtn; }
@Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { int targetNumTasks = ConfigurationHelper.getJobNumMaps(context); List<InputSplit> splits = new ArrayList<InputSplit>(targetNumTasks); for (int i = 0; i < targetNumTasks; ++i) { splits.add(new NetezzaExternalTableInputSplit(i)); } return splits; }
/** * Create the desired number of splits, dividing the number of rows * between the mappers. */ public List<InputSplit> getSplits(JobContext job) { long totalRows = getNumberOfRows(job); int numSplits = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1); LOG.info("Generating " + totalRows + " using " + numSplits); List<InputSplit> splits = new ArrayList<InputSplit>(); long currentRow = 0; for(int split = 0; split < numSplits; ++split) { long goal = (long) Math.ceil(totalRows * (double)(split + 1) / numSplits); splits.add(new RangeInputSplit(currentRow, goal - currentRow)); currentRow = goal; } return splits; }
/** * Reset the config properties related to Distributed Cache in the given job * configuration <code>jobConf</code>. * * @param jobConf * job configuration */ private void resetDistCacheConfigProperties(Configuration jobConf) { // reset current/latest property names jobConf.setStrings(MRJobConfig.CACHE_FILES, ""); jobConf.setStrings(MRJobConfig.CACHE_FILES_SIZES, ""); jobConf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, ""); jobConf.setStrings(JobContext.CACHE_FILE_VISIBILITIES, ""); // reset old property names jobConf.setStrings("mapred.cache.files", ""); jobConf.setStrings("mapred.cache.files.filesizes", ""); jobConf.setStrings("mapred.cache.files.visibilities", ""); jobConf.setStrings("mapred.cache.files.timestamps", ""); }
/** * Get the (hinted) number of map tasks for a job. */ public static int getJobNumMaps(JobContext job) { if (isLocalJobTracker(job.getConfiguration())) { return numLocalModeMaps; } else { return job.getConfiguration().getInt( ConfigurationConstants.PROP_MAPRED_MAP_TASKS, 1); } }
@Override public void checkOutputSpecs(JobContext job ) throws InvalidJobConfException, IOException { // Ensure that the output directory is set Path outDir = getOutputPath(job); if (outDir == null) { throw new InvalidJobConfException("Output directory not set in JobConf."); } final Configuration jobConf = job.getConfiguration(); // get delegation token for outDir's file system TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outDir }, jobConf); final FileSystem fs = outDir.getFileSystem(jobConf); if (fs.exists(outDir)) { // existing output dir is considered empty iff its only content is the // partition file. // final FileStatus[] outDirKids = fs.listStatus(outDir); boolean empty = false; if (outDirKids != null && outDirKids.length == 1) { final FileStatus st = outDirKids[0]; final String fname = st.getPath().getName(); empty = !st.isDirectory() && TeraInputFormat.PARTITION_FILENAME.equals(fname); } if (TeraSort.getUseSimplePartitioner(job) || !empty) { throw new FileAlreadyExistsException("Output directory " + outDir + " already exists"); } } }
@Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { int count = getNumMapTasks(context.getConfiguration()); List<InputSplit> splits = new ArrayList<InputSplit>(count); for (int i = 0; i < count; i++) { splits.add(new NullInputSplit()); } return splits; }
@Override protected boolean isSplitable(JobContext context, Path file) { final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; }
@Test public void testAbortJobCalledAfterKillingTasks() throws IOException { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000"); InlineDispatcher dispatcher = new InlineDispatcher(); dispatcher.init(conf); dispatcher.start(); OutputCommitter committer = Mockito.mock(OutputCommitter.class); CommitterEventHandler commitHandler = createCommitterEventHandler(dispatcher, committer); commitHandler.init(conf); commitHandler.start(); JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null); //Fail one task. This should land the JobImpl in the FAIL_WAIT state job.handle(new JobTaskEvent( MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP), TaskState.FAILED)); //Verify abort job hasn't been called Mockito.verify(committer, Mockito.never()) .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); assertJobState(job, JobStateInternal.FAIL_WAIT); //Verify abortJob is called once and the job failed Mockito.verify(committer, Mockito.timeout(2000).times(1)) .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); assertJobState(job, JobStateInternal.FAILED); dispatcher.stop(); }
@Override protected void initialize(JobContext context) throws IOException { // Do we have to worry about mis-matches between the Configuration from setConf and the one // in this context? TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); try { initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); } }