@Test public void testCloseWithTaskCommit() throws Exception { OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class); DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class); when(outputCommitter.needsTaskCommit(any(TaskAttemptContext.class))).thenReturn(true); DummyRecordWriter recordWriter = mock(DummyRecordWriter.class); JobConf jobConf = mock(JobConf.class); HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf); outputFormat.recordWriter = recordWriter; outputFormat.outputCommitter = outputCommitter; outputFormat.close(); verify(recordWriter, times(1)).close(any(Reporter.class)); verify(outputCommitter, times(1)).commitTask(any(TaskAttemptContext.class)); }
@Test public void testCloseWithoutTaskCommit() throws Exception { OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class); DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class); when(outputCommitter.needsTaskCommit(any(TaskAttemptContext.class))).thenReturn(false); DummyRecordWriter recordWriter = mock(DummyRecordWriter.class); JobConf jobConf = mock(JobConf.class); HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf); outputFormat.recordWriter = recordWriter; outputFormat.outputCommitter = outputCommitter; outputFormat.close(); verify(recordWriter, times(1)).close(any(Reporter.class)); verify(outputCommitter, times(0)).commitTask(any(TaskAttemptContext.class)); }
@Override public void commitTask(TaskAttemptContext context) throws IOException { JobConf conf = context.getJobConf(); TaskAttemptID attemptId = context.getTaskAttemptID(); // get the mapping between index to output filename outputs = MRJobConfiguration.getOutputs(conf); // get temp task output path (compatible with hadoop1 and hadoop2) Path taskOutPath = FileOutputFormat.getWorkOutputPath(conf); FileSystem fs = taskOutPath.getFileSystem(conf); if( !fs.exists(taskOutPath) ) throw new IOException("Task output path "+ taskOutPath.toString() + "does not exist."); // move the task outputs to their final places context.getProgressible().progress(); moveFinalTaskOutputs(context, fs, taskOutPath); // delete the temporary task-specific output directory if( !fs.delete(taskOutPath, true) ) LOG.debug("Failed to delete the temporary output directory of task: " + attemptId + " - " + taskOutPath); }
private void moveFileToDestination(TaskAttemptContext context, FileSystem fs, Path file) throws IOException { TaskAttemptID attemptId = context.getTaskAttemptID(); // get output index and final destination String name = file.getName(); //e.g., 0-r-00000 int index = Integer.parseInt(name.substring(0, name.indexOf("-"))); Path dest = new Path(outputs[index], name); //e.g., outX/0-r-00000 // move file from 'file' to 'finalPath' if( !fs.rename(file, dest) ) { if (!fs.delete(dest, true)) throw new IOException("Failed to delete earlier output " + dest + " for rename of " + file + " in task " + attemptId); if (!fs.rename(file, dest)) throw new IOException("Failed to save output " + dest + " for rename of " + file + " in task: " + attemptId); } }
private void initMapreduceOutputCommitter(TaskAttemptContext taskContext) throws IOException { if (mapreduceOutputCommitter != null) { LOG.debug("Using existing mapreduceOutputCommitter"); return; } // It would be nice to use the BigQueryOutputFormat that already exists // (there is one wrapped inside our BigQueryMapredOutputFormat), but // there does not seem to be an easy way to do that. So make another one. LOG.debug("Creating BigQueryOutputFormat"); BigQueryOutputFormat<Object, JsonObject> mapreduceOutputFormat = new BigQueryOutputFormat<Object, JsonObject>(); // Fortunately, mapred.TaskAttemptContext is a subclass of // mapreduce.TaskAttemptContext, so we can use it directly. try { LOG.debug("Creating mapreduce OutputCommit"); mapreduceOutputCommitter = mapreduceOutputFormat.getOutputCommitter( taskContext); } catch (InterruptedException ex) { throw new IOException(ex); } }
@Test public void testClose() throws IOException, InterruptedException { RecordWriter<LongWritable, JsonObject> recordWriter = new BigQueryMapredRecordWriter<LongWritable, JsonObject>( mockRecordWriter, mockTaskAttemptContext); Reporter reporter = null; // unused by code under test recordWriter.close(reporter); verify(mockRecordWriter).close(any(TaskAttemptContext.class)); doThrow(new IOException("test")). when(mockRecordWriter).close(any(TaskAttemptContext.class)); expectedException.expect(IOException.class); try { recordWriter.close(reporter); } finally { verify(mockRecordWriter, times(2)).close(any(TaskAttemptContext.class)); } }
public static JobContext instantiateJobContext(JobConf jobConf, JobID jobId) throws Exception { try { // for Hadoop 1.xx Class<?> clazz = null; if(!TaskAttemptContext.class.isInterface()) { clazz = Class.forName("org.apache.hadoop.mapred.JobContext", true, Thread.currentThread().getContextClassLoader()); } // for Hadoop 2.xx else { clazz = Class.forName("org.apache.hadoop.mapred.JobContextImpl", true, Thread.currentThread().getContextClassLoader()); } Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, org.apache.hadoop.mapreduce.JobID.class); // for Hadoop 1.xx constructor.setAccessible(true); JobContext context = (JobContext) constructor.newInstance(jobConf, jobId); return context; } catch(Exception e) { throw new Exception("Could not create instance of JobContext.", e); } }
public static TaskAttemptContext instantiateTaskAttemptContext(JobConf jobConf, TaskAttemptID taskAttemptID) throws Exception { try { // for Hadoop 1.xx Class<?> clazz = null; if(!TaskAttemptContext.class.isInterface()) { clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContext", true, Thread.currentThread().getContextClassLoader()); } // for Hadoop 2.xx else { clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContextImpl", true, Thread.currentThread().getContextClassLoader()); } Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, TaskAttemptID.class); // for Hadoop 1.xx constructor.setAccessible(true); TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(jobConf, taskAttemptID); return context; } catch(Exception e) { throw new Exception("Could not create instance of TaskAttemptContext.", e); } }
private void moveFinalTaskOutputs(TaskAttemptContext context, FileSystem fs, Path taskOutput) throws IOException { context.getProgressible().progress(); if( fs.getFileStatus(taskOutput).isDirectory() ) { FileStatus[] files = fs.listStatus(taskOutput); if (files != null) for (FileStatus file : files) //for all files if( !file.isDirectory() ) //skip directories moveFileToDestination(context, fs, file.getPath()); } }
/** * Commit task. * * @throws IOException In failed. */ public void commit() throws IOException { if (writer != null) { OutputCommitter outputCommitter = jobConf.getOutputCommitter(); TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt); if (outputCommitter.needsTaskCommit(taskCtx)) outputCommitter.commitTask(taskCtx); } }
@Override public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException { LOG.debug("needsTaskCommit"); initMapreduceOutputCommitter(taskContext); return mapreduceOutputCommitter.needsTaskCommit(taskContext); }
@Test public void testAbortTask() throws IOException { BigQueryMapredOutputCommitter outputCommitter = new BigQueryMapredOutputCommitter(); outputCommitter.setMapreduceOutputCommitter(mockOutputCommitter); outputCommitter.abortTask(mockTaskAttemptContext); verify(mockOutputCommitter).abortTask(any(TaskAttemptContext.class)); }
@Test public void testCommitTask() throws IOException { BigQueryMapredOutputCommitter outputCommitter = new BigQueryMapredOutputCommitter(); outputCommitter.setMapreduceOutputCommitter(mockOutputCommitter); outputCommitter.commitTask(mockTaskAttemptContext); verify(mockOutputCommitter).commitTask(any(TaskAttemptContext.class)); }
@Test public void testNeedsTaskCommit() throws IOException { BigQueryMapredOutputCommitter outputCommitter = new BigQueryMapredOutputCommitter(); outputCommitter.setMapreduceOutputCommitter(mockOutputCommitter); outputCommitter.needsTaskCommit(mockTaskAttemptContext); verify(mockOutputCommitter).needsTaskCommit(any(TaskAttemptContext.class)); }
@Test public void testSetupTask() throws IOException { BigQueryMapredOutputCommitter outputCommitter = new BigQueryMapredOutputCommitter(); outputCommitter.setMapreduceOutputCommitter(mockOutputCommitter); outputCommitter.setupTask(mockTaskAttemptContext); verify(mockOutputCommitter).setupTask(any(TaskAttemptContext.class)); }
@Override public void setupTask(TaskAttemptContext taskContext) throws IOException { writeFile(taskContext.getJobConf(), TASK_SETUP_FILE_NAME); }
@Override public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException { return true; }
@Override public void commitTask(TaskAttemptContext taskContext) throws IOException { writeFile(taskContext.getJobConf(), TASK_COMMIT_FILE_NAME); }
@Override public void abortTask(TaskAttemptContext taskContext) throws IOException { writeFile(taskContext.getJobConf(), TASK_ABORT_FILE_NAME); }
@Override public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException { return false; }