private org.apache.hadoop.mapreduce.OutputCommitter createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws IOException, InterruptedException, ClassNotFoundException { org.apache.hadoop.mapreduce.OutputCommitter committer = null; LOG.info("OutputCommitter set in config " + conf.get("mapred.output.committer.class")); if (newApiCommitter) { HadoopVersionSpecificCode hadoopVersionSpecificCode = HadoopVersionSpecificCode.getInstance(VersionInfo.getVersion(), conf); org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID = hadoopVersionSpecificCode.createTaskAttemptId(jobId, true, 0); org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = hadoopVersionSpecificCode.createTaskAttemptContext(conf, taskAttemptID); OutputFormat outputFormat = ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), conf); committer = outputFormat.getOutputCommitter(taskContext); } else { committer = ReflectionUtils.newInstance(conf.getClass( "mapred.output.committer.class", FileOutputCommitter.class, org.apache.hadoop.mapred.OutputCommitter.class), conf); } LOG.info("OutputCommitter is " + committer.getClass().getName()); return committer; }
/** * Clean up after successful HIVE import. * * @param outputPath path to the output directory * @throws IOException */ private void cleanUp(Path outputPath) throws IOException { FileSystem fs = FileSystem.get(configuration); // HIVE is not always removing input directory after LOAD DATA statement // (which is our export directory). We're removing export directory in case // that is blank for case that user wants to periodically populate HIVE // table (for example with --hive-overwrite). try { if (outputPath != null && fs.exists(outputPath)) { FileStatus[] statuses = fs.listStatus(outputPath); if (statuses.length == 0) { LOG.info("Export directory is empty, removing it."); fs.delete(outputPath, true); } else if (statuses.length == 1 && statuses[0].getPath().getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) { LOG.info("Export directory is contains the _SUCCESS file only, removing the directory."); fs.delete(outputPath, true); } else { LOG.info("Export directory is not empty, keeping it."); } } } catch(IOException e) { LOG.error("Issue with cleaning (safe to ignore)", e); } }
public Path getTempTaskOutputPath(JobConf conf, TaskAttemptID taskAttemptID) { Path outputPath = FileOutputFormat.getOutputPath(conf); if (outputPath != null) { Path p = new Path(outputPath, (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + "_" + taskAttemptID.toString())); try { FileSystem fs = p.getFileSystem(conf); return p.makeQualified(fs); } catch (IOException ie) { LOG.warn(StringUtils.stringifyException(ie)); return p; } } return null; }
public void localizeConfiguration(JobConf jobConf) throws IOException, InterruptedException { jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString()); jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString()); jobConf.setInt(JobContext.TASK_PARTITION, taskAttemptId.getTaskID().getId()); jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString()); jobConf.setBoolean(MRJobConfig.TASK_ISMAP, isMap); Path outputPath = FileOutputFormat.getOutputPath(jobConf); if (outputPath != null) { if ((committer instanceof FileOutputCommitter)) { FileOutputFormat.setWorkOutputPath(jobConf, ((FileOutputCommitter)committer).getTaskAttemptPath(taskAttemptContext)); } else { FileOutputFormat.setWorkOutputPath(jobConf, outputPath); } } }
private OutputCommitter createOutputCommitter(Configuration conf) { OutputCommitter committer = null; LOG.info("OutputCommitter set in config " + conf.get("mapred.output.committer.class")); if (newApiCommitter) { org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils .newTaskId(jobId, 0, TaskType.MAP); org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils .newTaskAttemptId(taskID, 0); TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf, TypeConverter.fromYarn(attemptID)); OutputFormat outputFormat; try { outputFormat = ReflectionUtils.newInstance(taskContext .getOutputFormatClass(), conf); committer = outputFormat.getOutputCommitter(taskContext); } catch (Exception e) { throw new YarnRuntimeException(e); } } else { committer = ReflectionUtils.newInstance(conf.getClass( "mapred.output.committer.class", FileOutputCommitter.class, org.apache.hadoop.mapred.OutputCommitter.class), conf); } LOG.info("OutputCommitter is " + committer.getClass().getName()); return committer; }
private void initialize(int index, JobConf conf) throws Exception { if (!(conf.getOutputFormat() instanceof NullOutputFormat)) { boolean isMap = conf.getNumReduceTasks() == 0; TaskAttemptID taskAttempId = new TaskAttemptID("0", index, isMap, index, index); conf.set("mapred.task.id", taskAttempId.toString()); String suffix = new String("part-00000"); suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length())); suffix = suffix + index; outputPath = new Path(conf.get("mapred.output.dir")); tempDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME); FileSystem fileSys = tempDir.getFileSystem(conf); if (!fileSys.mkdirs(tempDir)) { throw new IOException("Mkdirs failed to create " + tempDir.toString()); } tempOutputFile = new Path(tempDir, new Path("_" + taskAttempId.toString())); tempOutputFile = new Path(tempOutputFile, suffix); finalOutputFile = new Path(outputPath, suffix); if (conf.getUseNewMapper()) { org.apache.hadoop.mapreduce.JobContext jobContext = new ContextFactory().createJobContext(conf); org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat) ReflectionUtils .newInstance(jobContext.getOutputFormatClass(), conf); recordWriter = newOutputFormat.getRecordWriter(new ContextFactory().createContext(conf, taskAttempId)); } else { recordWriter = conf.getOutputFormat().getRecordWriter(FileSystem.get(conf), conf, suffix, new Progressable() { @Override public void progress() { } }); } } }
public void setupJob(JobConf conf) throws IOException { Path outputPath = FileOutputFormat.getOutputPath(conf); if (outputPath != null) { Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME); FileSystem fileSys = tmpDir.getFileSystem(conf); if (!fileSys.mkdirs(tmpDir)) { LOG.error("Mkdirs failed to create " + tmpDir.toString()); } } }
public void cleanupJob(JobConf conf) throws IOException { // do the clean up of temporary directory Path outputPath = FileOutputFormat.getOutputPath(conf); if (outputPath != null) { Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME); FileSystem fileSys = tmpDir.getFileSystem(conf); if (fileSys.exists(tmpDir)) { fileSys.delete(tmpDir, true); } } else { LOG.warn("Output path is null in cleanup"); } }
public void initCommitter(JobConf job, boolean useNewApi) throws IOException, InterruptedException { if (useNewApi) { if (LOG.isDebugEnabled()) { LOG.debug("using new api for output committer"); } this.committer = newOutputFormat.getOutputCommitter( newApiTaskAttemptContext); } else { this.committer = job.getOutputCommitter(); } Path outputPath = FileOutputFormat.getOutputPath(job); if (outputPath != null) { if ((this.committer instanceof FileOutputCommitter)) { FileOutputFormat.setWorkOutputPath(job, ((FileOutputCommitter) this.committer).getTaskAttemptPath( oldApiTaskAttemptContext)); } else { FileOutputFormat.setWorkOutputPath(job, outputPath); } } if (useNewApi) { this.committer.setupTask(newApiTaskAttemptContext); } else { this.committer.setupTask(oldApiTaskAttemptContext); } }
@Test public void testWriteFile() throws Exception { int messageCount = 20; String mapName = randomMapName(); JetInstance instance = createJetMember(); createJetMember(); Map<IntWritable, IntWritable> map = IntStream.range(0, messageCount).boxed() .collect(toMap(IntWritable::new, IntWritable::new)); instance.getMap(mapName).putAll(map); DAG dag = new DAG(); Vertex producer = dag.newVertex("producer", readMapP(mapName)) .localParallelism(1); Path path = getPath(); JobConf conf = new JobConf(); conf.setOutputFormat(outputFormatClass); conf.setOutputCommitter(FileOutputCommitter.class); conf.setOutputKeyClass(IntWritable.class); conf.setOutputValueClass(IntWritable.class); FileOutputFormat.setOutputPath(conf, path); Vertex consumer = dag.newVertex("consumer", HdfsProcessors.<Entry<IntWritable, IntWritable>, IntWritable, IntWritable>writeHdfsP( conf, Entry::getKey, Entry::getValue)) .localParallelism(4); dag.edge(between(producer, consumer)); Future<Void> future = instance.newJob(dag).getFuture(); assertCompletesEventually(future); dag = new DAG(); JobConf readJobConf = new JobConf(); readJobConf.setInputFormat(inputFormatClass); FileInputFormat.addInputPath(readJobConf, path); producer = dag.newVertex("producer", readHdfsP(readJobConf, Util::entry)) .localParallelism(8); consumer = dag.newVertex("consumer", writeListP("results")) .localParallelism(1); dag.edge(between(producer, consumer)); future = instance.newJob(dag).getFuture(); assertCompletesEventually(future); IList<Object> results = instance.getList("results"); assertEquals(messageCount, results.size()); }
@SuppressWarnings("rawtypes") private org.apache.hadoop.mapreduce.OutputCommitter getOutputCommitter(OutputCommitterContext context) { org.apache.hadoop.mapreduce.OutputCommitter committer = null; newApiCommitter = false; if (jobConf.getBoolean("mapred.reducer.new-api", false) || jobConf.getBoolean("mapred.mapper.new-api", false)) { newApiCommitter = true; LOG.info("Using mapred newApiCommitter."); } LOG.info("OutputCommitter set in config for outputName=" + context.getOutputName() + ", vertexName=" + context.getVertexName() + ", outputCommitterClass=" + jobConf.get("mapred.output.committer.class")); if (newApiCommitter) { TaskAttemptID taskAttemptID = new TaskAttemptID( Long.toString(context.getApplicationId().getClusterTimestamp()), context.getApplicationId().getId(), ((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ? TaskType.MAP : TaskType.REDUCE)), 0, context.getDAGAttemptNumber()); TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf, taskAttemptID); try { OutputFormat outputFormat = ReflectionUtils.newInstance(taskContext .getOutputFormatClass(), jobConf); committer = outputFormat.getOutputCommitter(taskContext); } catch (Exception e) { throw new TezUncheckedException(e); } } else { committer = ReflectionUtils.newInstance(jobConf.getClass( "mapred.output.committer.class", FileOutputCommitter.class, org.apache.hadoop.mapred.OutputCommitter.class), jobConf); } LOG.info("OutputCommitter for outputName=" + context.getOutputName() + ", vertexName=" + context.getVertexName() + ", outputCommitterClass=" + committer.getClass().getName()); return committer; }
@SuppressWarnings("rawtypes") private org.apache.hadoop.mapreduce.OutputCommitter getOutputCommitter(OutputCommitterContext context) { org.apache.hadoop.mapreduce.OutputCommitter committer = null; newApiCommitter = false; if (jobConf.getBoolean("mapred.reducer.new-api", false) || jobConf.getBoolean("mapred.mapper.new-api", false)) { newApiCommitter = true; } LOG.info("Committer for " + getContext().getVertexName() + ":" + getContext().getOutputName() + " using " + (newApiCommitter ? "new" : "old") + "mapred API"); if (newApiCommitter) { TaskAttemptID taskAttemptID = new TaskAttemptID( Long.toString(context.getApplicationId().getClusterTimestamp()), context.getApplicationId().getId(), ((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ? TaskType.MAP : TaskType.REDUCE)), 0, context.getDAGAttemptNumber()); TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf, taskAttemptID); try { OutputFormat outputFormat = ReflectionUtils.newInstance(taskContext .getOutputFormatClass(), jobConf); committer = outputFormat.getOutputCommitter(taskContext); } catch (Exception e) { throw new TezUncheckedException(e); } } else { committer = ReflectionUtils.newInstance(jobConf.getClass( "mapred.output.committer.class", FileOutputCommitter.class, org.apache.hadoop.mapred.OutputCommitter.class), jobConf); } LOG.info("OutputCommitter for outputName=" + context.getOutputName() + ", vertexName=" + context.getVertexName() + ", outputCommitterClass=" + committer.getClass().getName()); return committer; }