public LocalFetcher(JobConf job, TaskAttemptID reduceId, ShuffleSchedulerImpl<K, V> scheduler, MergeManager<K,V> merger, Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, SecretKey shuffleKey, Map<TaskAttemptID, MapOutputFile> localMapFiles) { super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter, shuffleKey); this.job = job; this.localMapFiles = localMapFiles; setName("localfetcher#" + id); setDaemon(true); }
public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl<K,V> merger, long size, JobConf conf, MapOutputFile mapOutputFile, int fetcher, boolean primaryMapOutput) throws IOException { this(mapId, reduceId, merger, size, conf, mapOutputFile, fetcher, primaryMapOutput, FileSystem.getLocal(conf).getRaw(), mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size)); }
@VisibleForTesting OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl<K,V> merger, long size, JobConf conf, MapOutputFile mapOutputFile, int fetcher, boolean primaryMapOutput, FileSystem fs, Path outputPath) throws IOException { super(mapId, size, primaryMapOutput); this.fs = fs; this.merger = merger; this.outputPath = outputPath; tmpOutputPath = getTempPath(outputPath, fetcher); disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath)); this.conf = conf; }
@Deprecated public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl<K,V> merger, long size, JobConf conf, MapOutputFile mapOutputFile, int fetcher, boolean primaryMapOutput) throws IOException { this(mapId, merger, size, conf, fetcher, primaryMapOutput, FileSystem.getLocal(conf).getRaw(), mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size)); }
@Deprecated OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl<K,V> merger, long size, JobConf conf, MapOutputFile mapOutputFile, int fetcher, boolean primaryMapOutput, FileSystem fs, Path outputPath) throws IOException { this(mapId, merger, size, conf, fetcher, primaryMapOutput, fs, outputPath); }
public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl<K,V> merger, long size, JobConf conf, MapOutputFile mapOutputFile, int fetcher, boolean primaryMapOutput) throws IOException { this(mapId, reduceId, merger, size, conf, mapOutputFile, fetcher, primaryMapOutput, FileSystem.getLocal(conf), mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size)); }
@VisibleForTesting OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl<K,V> merger, long size, JobConf conf, MapOutputFile mapOutputFile, int fetcher, boolean primaryMapOutput, FileSystem fs, Path outputPath) throws IOException { super(mapId, size, primaryMapOutput); this.fs = fs; this.merger = merger; this.outputPath = outputPath; tmpOutputPath = getTempPath(outputPath, fetcher); disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath)); }
@VisibleForTesting OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl<K,V> merger, long size, JobConf conf, MapOutputFile mapOutputFile, int fetcher, boolean primaryMapOutput, FileSystem fs, Path outputPath) throws IOException { super(mapId, size, primaryMapOutput); this.fs = fs; this.merger = merger; this.outputPath = outputPath; tmpOutputPath = getTempPath(outputPath, fetcher); disk = fs.create(tmpOutputPath); }
public LinkMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl<K,V> merger, long size, JobConf conf, MapOutputFile mapOutputFile, int fetcher, boolean primaryMapOutput) throws IOException { this(mapId, reduceId, merger, size, conf, mapOutputFile, fetcher, primaryMapOutput, FileSystem.getLocal(conf), mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size)); }
@VisibleForTesting LinkMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl<K,V> merger, long size, JobConf conf, MapOutputFile mapOutputFile, int fetcher, boolean primaryMapOutput, FileSystem fs, Path outputPath) throws IOException { super(mapId, size, primaryMapOutput); this.fs = fs; this.merger = merger; this.outputPath = outputPath; tmpOutputPath = getTempPath(outputPath, fetcher); this.reduceId = reduceId; this.conf = conf; }
@Test /** * A testing method verifying availability and accessibility of API that is needed * for sub-classes of ShuffleConsumerPlugin */ public void testConsumerApi() { JobConf jobConf = new JobConf(); ShuffleConsumerPlugin<K, V> shuffleConsumerPlugin = new TestShuffleConsumerPlugin<K, V>(); //mock creation ReduceTask mockReduceTask = mock(ReduceTask.class); TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class); Reporter mockReporter = mock(Reporter.class); FileSystem mockFileSystem = mock(FileSystem.class); Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = jobConf.getCombinerClass(); @SuppressWarnings("unchecked") // needed for mock with generic CombineOutputCollector<K, V> mockCombineOutputCollector = (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class); org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID = mock(org.apache.hadoop.mapreduce.TaskAttemptID.class); LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class); CompressionCodec mockCompressionCodec = mock(CompressionCodec.class); Counter mockCounter = mock(Counter.class); TaskStatus mockTaskStatus = mock(TaskStatus.class); Progress mockProgress = mock(Progress.class); MapOutputFile mockMapOutputFile = mock(MapOutputFile.class); Task mockTask = mock(Task.class); try { String [] dirs = jobConf.getLocalDirs(); // verify that these APIs are available through super class handler ShuffleConsumerPlugin.Context<K, V> context = new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, jobConf, mockFileSystem, mockUmbilical, mockLocalDirAllocator, mockReporter, mockCompressionCodec, combinerClass, mockCombineOutputCollector, mockCounter, mockCounter, mockCounter, mockCounter, mockCounter, mockCounter, mockTaskStatus, mockProgress, mockProgress, mockTask, mockMapOutputFile, null); shuffleConsumerPlugin.init(context); shuffleConsumerPlugin.run(); shuffleConsumerPlugin.close(); } catch (Exception e) { assertTrue("Threw exception:" + e, false); } // verify that these APIs are available for 3rd party plugins mockReduceTask.getTaskID(); mockReduceTask.getJobID(); mockReduceTask.getNumMaps(); mockReduceTask.getPartition(); mockReporter.progress(); }
@SuppressWarnings("rawtypes") @Test public <K, V> void TestSucceedAndFailedCopyMap() throws Exception { JobConf job = new JobConf(); job.setNumMapTasks(2); //mock creation TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class); Reporter mockReporter = mock(Reporter.class); FileSystem mockFileSystem = mock(FileSystem.class); Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = job.getCombinerClass(); @SuppressWarnings("unchecked") // needed for mock with generic CombineOutputCollector<K, V> mockCombineOutputCollector = (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class); org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID = mock(org.apache.hadoop.mapreduce.TaskAttemptID.class); LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class); CompressionCodec mockCompressionCodec = mock(CompressionCodec.class); Counter mockCounter = mock(Counter.class); TaskStatus mockTaskStatus = mock(TaskStatus.class); Progress mockProgress = mock(Progress.class); MapOutputFile mockMapOutputFile = mock(MapOutputFile.class); Task mockTask = mock(Task.class); @SuppressWarnings("unchecked") MapOutput<K, V> output = mock(MapOutput.class); ShuffleConsumerPlugin.Context<K, V> context = new ShuffleConsumerPlugin.Context<K, V>( mockTaskAttemptID, job, mockFileSystem, mockUmbilical, mockLocalDirAllocator, mockReporter, mockCompressionCodec, combinerClass, mockCombineOutputCollector, mockCounter, mockCounter, mockCounter, mockCounter, mockCounter, mockCounter, mockTaskStatus, mockProgress, mockProgress, mockTask, mockMapOutputFile, null); TaskStatus status = new TaskStatus() { @Override public boolean getIsMap() { return false; } @Override public void addFetchFailedMap(TaskAttemptID mapTaskId) { } }; Progress progress = new Progress(); ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job, status, null, null, progress, context.getShuffledMapsCounter(), context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); MapHost host1 = new MapHost("host1", null); TaskAttemptID failedAttemptID = new TaskAttemptID( new org.apache.hadoop.mapred.TaskID( new JobID("test",0), TaskType.MAP, 0), 0); TaskAttemptID succeedAttemptID = new TaskAttemptID( new org.apache.hadoop.mapred.TaskID( new JobID("test",0), TaskType.MAP, 1), 1); // handle output fetch failure for failedAttemptID, part I scheduler.hostFailed(host1.getHostName()); // handle output fetch succeed for succeedAttemptID long bytes = (long)500 * 1024 * 1024; scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output); // handle output fetch failure for failedAttemptID, part II // for MAPREDUCE-6361: verify no NPE exception get thrown out scheduler.copyFailed(failedAttemptID, host1, true, false); }
@Test(timeout=10000) public void testInterruptOnDisk() throws Exception { final int FETCHER = 7; Path p = new Path("file:///tmp/foo"); Path pTmp = OnDiskMapOutput.getTempPath(p, FETCHER); FileSystem mFs = mock(FileSystem.class, RETURNS_DEEP_STUBS); MapOutputFile mof = mock(MapOutputFile.class); when(mof.getInputFileForWrite(any(TaskID.class), anyLong())).thenReturn(p); OnDiskMapOutput<Text,Text> odmo = spy(new OnDiskMapOutput<Text,Text>(map1ID, id, mm, 100L, job, mof, FETCHER, true, mFs, p)); when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) .thenReturn(odmo); doNothing().when(mm).waitForResource(); when(ss.getHost()).thenReturn(host); String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); when(connection.getResponseCode()).thenReturn(200); when(connection.getHeaderField( SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash); ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); ByteArrayOutputStream bout = new ByteArrayOutputStream(); header.write(new DataOutputStream(bout)); final StuckInputStream in = new StuckInputStream(new ByteArrayInputStream(bout.toByteArray())); when(connection.getInputStream()).thenReturn(in); when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); doAnswer(new Answer<Void>() { public Void answer(InvocationOnMock ignore) throws IOException { in.close(); return null; } }).when(connection).disconnect(); Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm, r, metrics, except, key, connection, FETCHER); underTest.start(); // wait for read in inputstream in.waitForFetcher(); underTest.shutDown(); underTest.join(); // rely on test timeout to kill if stuck assertTrue(in.wasClosedProperly()); verify(mFs).create(eq(pTmp)); verify(mFs).delete(eq(pTmp), eq(false)); verify(odmo).abort(); }
public StubbedMergeManager(JobConf conf, ExceptionReporter reporter, CyclicBarrier mergeStart, CyclicBarrier mergeComplete) { super(null, conf, mock(LocalFileSystem.class), null, null, null, null, null, null, null, null, reporter, null, mock(MapOutputFile.class)); mergeThread.setSyncBarriers(mergeStart, mergeComplete); }
@SuppressWarnings({ "unchecked", "deprecation" }) @Test(timeout=10000) public void testOnDiskMerger() throws IOException, URISyntaxException, InterruptedException { JobConf jobConf = new JobConf(); final int SORT_FACTOR = 5; jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR); MapOutputFile mapOutputFile = new MROutputFiles(); FileSystem fs = FileSystem.getLocal(jobConf); MergeManagerImpl<IntWritable, IntWritable> manager = new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null , null, null, null, null, null, null, null, null, null, mapOutputFile); MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable> onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>) Whitebox.getInternalState(manager, "onDiskMerger"); int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger, "mergeFactor"); // make sure the io.sort.factor is set properly assertEquals(mergeFactor, SORT_FACTOR); // Stop the onDiskMerger thread so that we can intercept the list of files // waiting to be merged. onDiskMerger.suspend(); //Send the list of fake files waiting to be merged Random rand = new Random(); for(int i = 0; i < 2*SORT_FACTOR; ++i) { Path path = new Path("somePath"); CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt()); manager.closeOnDiskFile(cap); } //Check that the files pending to be merged are in sorted order. LinkedList<List<CompressAwarePath>> pendingToBeMerged = (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState( onDiskMerger, "pendingToBeMerged"); assertTrue("No inputs were added to list pending to merge", pendingToBeMerged.size() > 0); for(int i = 0; i < pendingToBeMerged.size(); ++i) { List<CompressAwarePath> inputs = pendingToBeMerged.get(i); for(int j = 1; j < inputs.size(); ++j) { assertTrue("Not enough / too many inputs were going to be merged", inputs.size() > 0 && inputs.size() <= SORT_FACTOR); assertTrue("Inputs to be merged were not sorted according to size: ", inputs.get(j).getCompressedSize() >= inputs.get(j-1).getCompressedSize()); } } }