/** * LinuxTaskController runs task-controller and it runs TaskLogsTruncater * in truncateLogsAsUser(). task-controller should be prodived with all * necessary java properties to launch JobLocalizer successfully. */ public void testLTCCallTruncateLogsAsUser() throws Exception { if (!initialized) { initMyTest(); } List<Task> tasks = new ArrayList<Task>(); tasks.add(new MyMapTask()); try { ltc.truncateLogsAsUser(user, tasks); } catch (IOException ie) { fail("Missing argument when running task-controller from " + "truncateLogsAsUser()\n"); } }
public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID, ContainerLaunchContext containerLaunchContext, Container allocatedContainer, Task remoteTask) { super(taskAttemptID, allocatedContainer.getId(), StringInterner .weakIntern(allocatedContainer.getNodeId().toString()), allocatedContainer.getContainerToken(), ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH); this.allocatedContainer = allocatedContainer; this.containerLaunchContext = containerLaunchContext; this.task = remoteTask; }
/** * Set the current status of the task to the given string. */ @Override public void setStatus(String status) { String normalizedStatus = Task.normalizeStatus(status, conf); setStatusString(normalizedStatus); reporter.setStatus(normalizedStatus); }
public Shuffle(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS, TaskUmbilicalProtocol umbilical, LocalDirAllocator localDirAllocator, Reporter reporter, CompressionCodec codec, Class<? extends Reducer> combinerClass, CombineOutputCollector<K,V> combineCollector, Counters.Counter spilledRecordsCounter, Counters.Counter reduceCombineInputCounter, Counters.Counter shuffledMapsCounter, Counters.Counter reduceShuffleBytes, Counters.Counter failedShuffleCounter, Counters.Counter mergedMapOutputsCounter, TaskStatus status, Progress copyPhase, Progress mergePhase, Task reduceTask) { this.reduceId = reduceId; this.jobConf = jobConf; this.umbilical = umbilical; this.reporter = reporter; this.metrics = new ShuffleClientMetrics(reduceId, jobConf); this.copyPhase = copyPhase; this.taskStatus = status; this.reduceTask = reduceTask; scheduler = new ShuffleScheduler<K,V>(jobConf, status, this, copyPhase, shuffledMapsCounter, reduceShuffleBytes, failedShuffleCounter); merger = new MergeManager<K, V>(reduceId, jobConf, localFS, localDirAllocator, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter, mergedMapOutputsCounter, this, mergePhase); }
@SuppressWarnings("unchecked") @Override public void merge(List<Segment<K,V>> segments) throws IOException { // sanity check if (segments == null || segments.isEmpty()) { LOG.info("No ondisk files to merge..."); return; } Class<K> keyClass = (Class<K>) jobConf.getMapOutputKeyClass(); Class<V> valueClass = (Class<V>) jobConf.getMapOutputValueClass(); final RawComparator<K> comparator = (RawComparator<K>) jobConf.getOutputKeyComparator(); long approxOutputSize = 0; int bytesPerSum = jobConf.getInt("io.bytes.per.checksum", 512); LOG.info("OnDiskMerger: We have " + segments.size() + " map outputs on disk. Triggering merge..."); // 1. Prepare the list of files to be merged. for (Segment<K,V> segment : segments) { approxOutputSize += segment.getLength(); } // add the checksum length approxOutputSize += ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum); // 2. Start the on-disk merge process Path outputPath = new Path(reduceDir, "file-" + (numPasses++)).suffix(Task.MERGED_OUTPUT_PREFIX); Writer<K, V> writer = new Writer<K, V>(jobConf, lustrefs.create(outputPath), (Class<K>) jobConf.getMapOutputKeyClass(), (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true); RawKeyValueIterator iter = null; try { iter = Merger.merge(jobConf, lustrefs, keyClass, valueClass, segments, ioSortFactor, mergeTempDir, comparator, reporter, spilledRecordsCounter, mergedMapOutputsCounter, null); Merger.writeFile(iter, writer, reporter, jobConf); writer.close(); } catch (IOException e) { lustrefs.delete(outputPath, true); throw e; } addSegmentToMerge(new Segment<K, V>(jobConf, lustrefs, outputPath, codec, false, null)); LOG.info(reduceId + " Finished merging " + segments.size() + " map output files on disk of total-size " + approxOutputSize + "." + " Local output file is " + outputPath + " of size " + lustrefs.getFileStatus(outputPath).getLen()); }
static ContainerLaunchContext createContainerLaunchContext( Map<ApplicationAccessType, String> applicationACLs, Configuration conf, Token<JobTokenIdentifier> jobToken, Task remoteTask, final org.apache.hadoop.mapred.JobID oldJobId, WrappedJvmID jvmID, TaskAttemptListener taskAttemptListener, Credentials credentials) { synchronized (commonContainerSpecLock) { if (commonContainerSpec == null) { commonContainerSpec = createCommonContainerLaunchContext( applicationACLs, conf, jobToken, oldJobId, credentials); } } // Fill in the fields needed per-container that are missing in the common // spec. boolean userClassesTakesPrecedence = conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false); // Setup environment by cloning from common env. Map<String, String> env = commonContainerSpec.getEnvironment(); Map<String, String> myEnv = new HashMap<String, String>(env.size()); myEnv.putAll(env); if (userClassesTakesPrecedence) { myEnv.put(Environment.CLASSPATH_PREPEND_DISTCACHE.name(), "true"); } MapReduceChildJVM.setVMEnv(myEnv, remoteTask); // Set up the launch command List<String> commands = MapReduceChildJVM.getVMCommand( taskAttemptListener.getAddress(), remoteTask, jvmID); // Duplicate the ByteBuffers for access by multiple containers. Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>(); for (Entry<String, ByteBuffer> entry : commonContainerSpec .getServiceData().entrySet()) { myServiceData.put(entry.getKey(), entry.getValue().duplicate()); } // Construct the actual Container ContainerLaunchContext container = ContainerLaunchContext.newInstance( commonContainerSpec.getLocalResources(), myEnv, commands, myServiceData, commonContainerSpec.getTokens().duplicate(), applicationACLs); return container; }
public Task getRemoteTask() { return this.task; }
@Override protected Task createRemoteTask() { return new MockTask(taskType); }
@Override public void merge(List<CompressAwarePath> inputs) throws IOException { // sanity check if (inputs == null || inputs.isEmpty()) { LOG.info("No ondisk files to merge..."); return; } long approxOutputSize = 0; int bytesPerSum = jobConf.getInt("io.bytes.per.checksum", 512); LOG.info("OnDiskMerger: We have " + inputs.size() + " map outputs on disk. Triggering merge..."); // 1. Prepare the list of files to be merged. for (CompressAwarePath file : inputs) { approxOutputSize += localFS.getFileStatus(file).getLen(); } // add the checksum length approxOutputSize += ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum); // 2. Start the on-disk merge process Path outputPath = localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX); FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath)); Writer<K, V> writer = new Writer<K, V>(jobConf, out, (Class<K>) jobConf.getMapOutputKeyClass(), (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true); RawKeyValueIterator iter = null; CompressAwarePath compressAwarePath; Path tmpDir = new Path(reduceId.toString()); try { iter = Merger.merge(jobConf, rfs, (Class<K>) jobConf.getMapOutputKeyClass(), (Class<V>) jobConf.getMapOutputValueClass(), codec, inputs.toArray(new Path[inputs.size()]), true, ioSortFactor, tmpDir, (RawComparator<K>) jobConf.getOutputKeyComparator(), reporter, spilledRecordsCounter, null, mergedMapOutputsCounter, null); Merger.writeFile(iter, writer, reporter, jobConf); writer.close(); compressAwarePath = new CompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength()); } catch (IOException e) { localFS.delete(outputPath, true); throw e; } closeOnDiskFile(compressAwarePath); LOG.info(reduceId + " Finished merging " + inputs.size() + " map output files on disk of total-size " + approxOutputSize + "." + " Local output file is " + outputPath + " of size " + localFS.getFileStatus(outputPath).getLen()); }
@Test /** * A testing method verifying availability and accessibility of API that is needed * for sub-classes of ShuffleConsumerPlugin */ public void testConsumerApi() { JobConf jobConf = new JobConf(); ShuffleConsumerPlugin<K, V> shuffleConsumerPlugin = new TestShuffleConsumerPlugin<K, V>(); //mock creation ReduceTask mockReduceTask = mock(ReduceTask.class); TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class); Reporter mockReporter = mock(Reporter.class); FileSystem mockFileSystem = mock(FileSystem.class); Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = jobConf.getCombinerClass(); @SuppressWarnings("unchecked") // needed for mock with generic CombineOutputCollector<K, V> mockCombineOutputCollector = (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class); org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID = mock(org.apache.hadoop.mapreduce.TaskAttemptID.class); LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class); CompressionCodec mockCompressionCodec = mock(CompressionCodec.class); Counter mockCounter = mock(Counter.class); TaskStatus mockTaskStatus = mock(TaskStatus.class); Progress mockProgress = mock(Progress.class); MapOutputFile mockMapOutputFile = mock(MapOutputFile.class); Task mockTask = mock(Task.class); try { String [] dirs = jobConf.getLocalDirs(); // verify that these APIs are available through super class handler ShuffleConsumerPlugin.Context<K, V> context = new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, jobConf, mockFileSystem, mockUmbilical, mockLocalDirAllocator, mockReporter, mockCompressionCodec, combinerClass, mockCombineOutputCollector, mockCounter, mockCounter, mockCounter, mockCounter, mockCounter, mockCounter, mockTaskStatus, mockProgress, mockProgress, mockTask, mockMapOutputFile, null); shuffleConsumerPlugin.init(context); shuffleConsumerPlugin.run(); shuffleConsumerPlugin.close(); } catch (Exception e) { assertTrue("Threw exception:" + e, false); } // verify that these APIs are available for 3rd party plugins mockReduceTask.getTaskID(); mockReduceTask.getJobID(); mockReduceTask.getNumMaps(); mockReduceTask.getPartition(); mockReporter.progress(); }
@SuppressWarnings("rawtypes") @Test public <K, V> void TestSucceedAndFailedCopyMap() throws Exception { JobConf job = new JobConf(); job.setNumMapTasks(2); //mock creation TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class); Reporter mockReporter = mock(Reporter.class); FileSystem mockFileSystem = mock(FileSystem.class); Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = job.getCombinerClass(); @SuppressWarnings("unchecked") // needed for mock with generic CombineOutputCollector<K, V> mockCombineOutputCollector = (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class); org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID = mock(org.apache.hadoop.mapreduce.TaskAttemptID.class); LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class); CompressionCodec mockCompressionCodec = mock(CompressionCodec.class); Counter mockCounter = mock(Counter.class); TaskStatus mockTaskStatus = mock(TaskStatus.class); Progress mockProgress = mock(Progress.class); MapOutputFile mockMapOutputFile = mock(MapOutputFile.class); Task mockTask = mock(Task.class); @SuppressWarnings("unchecked") MapOutput<K, V> output = mock(MapOutput.class); ShuffleConsumerPlugin.Context<K, V> context = new ShuffleConsumerPlugin.Context<K, V>( mockTaskAttemptID, job, mockFileSystem, mockUmbilical, mockLocalDirAllocator, mockReporter, mockCompressionCodec, combinerClass, mockCombineOutputCollector, mockCounter, mockCounter, mockCounter, mockCounter, mockCounter, mockCounter, mockTaskStatus, mockProgress, mockProgress, mockTask, mockMapOutputFile, null); TaskStatus status = new TaskStatus() { @Override public boolean getIsMap() { return false; } @Override public void addFetchFailedMap(TaskAttemptID mapTaskId) { } }; Progress progress = new Progress(); ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job, status, null, null, progress, context.getShuffledMapsCounter(), context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); MapHost host1 = new MapHost("host1", null); TaskAttemptID failedAttemptID = new TaskAttemptID( new org.apache.hadoop.mapred.TaskID( new JobID("test",0), TaskType.MAP, 0), 0); TaskAttemptID succeedAttemptID = new TaskAttemptID( new org.apache.hadoop.mapred.TaskID( new JobID("test",0), TaskType.MAP, 1), 1); // handle output fetch failure for failedAttemptID, part I scheduler.hostFailed(host1.getHostName()); // handle output fetch succeed for succeedAttemptID long bytes = (long)500 * 1024 * 1024; scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output); // handle output fetch failure for failedAttemptID, part II // for MAPREDUCE-6361: verify no NPE exception get thrown out scheduler.copyFailed(failedAttemptID, host1, true, false); }
@Override public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException { if (inputs == null || inputs.size() == 0) { return; } //name this output file same as the name of the first file that is //there in the current list of inmem files (this is guaranteed to //be absent on the disk currently. So we don't overwrite a prev. //created spill). Also we need to create the output file now since //it is not guaranteed that this file will be present after merge //is called (we delete empty files as soon as we see them //in the merge method) //figure out the mapId TaskAttemptID mapId = inputs.get(0).getMapId(); TaskID mapTaskId = mapId.getTaskID(); List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>(); long mergeOutputSize = createInMemorySegments(inputs, inMemorySegments,0); int noInMemorySegments = inMemorySegments.size(); Path outputPath = mapOutputFile.getInputFileForWrite(mapTaskId, mergeOutputSize).suffix( Task.MERGED_OUTPUT_PREFIX); FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath)); Writer<K, V> writer = new Writer<K, V>(jobConf, out, (Class<K>) jobConf.getMapOutputKeyClass(), (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true); RawKeyValueIterator rIter = null; CompressAwarePath compressAwarePath; try { LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments..."); rIter = Merger.merge(jobConf, rfs, (Class<K>)jobConf.getMapOutputKeyClass(), (Class<V>)jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(reduceId.toString()), (RawComparator<K>)jobConf.getOutputKeyComparator(), reporter, spilledRecordsCounter, null, null); if (null == combinerClass) { Merger.writeFile(rIter, writer, reporter, jobConf); } else { combineCollector.setWriter(writer); combineAndSpill(rIter, reduceCombineInputCounter); } writer.close(); compressAwarePath = new CompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength()); LOG.info(reduceId + " Merge of the " + noInMemorySegments + " files in-memory complete." + " Local file is " + outputPath + " of size " + localFS.getFileStatus(outputPath).getLen()); } catch (IOException e) { //make sure that we delete the ondisk file that we created //earlier when we invoked cloneFileAttributes localFS.delete(outputPath, true); throw e; } // Note the output of the merge closeOnDiskFile(compressAwarePath); }