public static TaskID getTaskID(Configuration cfg) { // first try with the attempt since some Hadoop versions mix the two String taskAttemptId = HadoopCfgUtils.getTaskAttemptId(cfg); if (StringUtils.hasText(taskAttemptId)) { try { return TaskAttemptID.forName(taskAttemptId).getTaskID(); } catch (IllegalArgumentException ex) { // the task attempt is invalid (Tez in particular uses the wrong string - see #346) // try to fallback to task id return parseTaskIdFromTaskAttemptId(taskAttemptId); } } String taskIdProp = HadoopCfgUtils.getTaskId(cfg); // double-check task id bug in Hadoop 2.5.x if (StringUtils.hasText(taskIdProp) && !taskIdProp.contains("attempt")) { return TaskID.forName(taskIdProp); } return null; }
/** * Only get the locations that are fetchable (not copied or not made * obsolete). * * @param copiedMapOutputs Synchronized set of already copied map outputs * @param obsoleteMapIdsSet Synchronized set of obsolete map ids * @return List of fetchable locations (could be empty) */ List<MapOutputLocation> getFetchableLocations( Set<TaskID> copiedMapOutputs, Set<TaskAttemptID> obsoleteMapIdsSet) { List<MapOutputLocation> fetchableLocations = new ArrayList<MapOutputLocation>(locations.size()); for (MapOutputLocation location : locations) { // Check if we still need to copy the output from this location if (copiedMapOutputs.contains(location.getTaskId())) { location.errorType = CopyOutputErrorType.NO_ERROR; location.sizeRead = CopyResult.OBSOLETE; LOG.info("getFetchableLocations: Already " + "copied - " + location + ", will not try again"); } else if (obsoleteMapIds.contains(location.getTaskAttemptId())) { location.errorType = CopyOutputErrorType.NO_ERROR; location.sizeRead = CopyResult.OBSOLETE; LOG.info("getFetchableLocations: Obsolete - " + location + ", " + "will not try now."); } else { fetchableLocations.add(location); } } return fetchableLocations; }
@Override @Nonnull public List<Processor> get(int count) { return processorList = range(0, count).mapToObj(i -> { try { String uuid = context.jetInstance().getCluster().getLocalMember().getUuid(); TaskAttemptID taskAttemptID = new TaskAttemptID("jet-node-" + uuid, jobContext.getJobID().getId(), JOB_SETUP, i, 0); jobConf.set("mapred.task.id", taskAttemptID.toString()); jobConf.setInt("mapred.task.partition", i); TaskAttemptContextImpl taskAttemptContext = new TaskAttemptContextImpl(jobConf, taskAttemptID); @SuppressWarnings("unchecked") OutputFormat<K, V> outFormat = jobConf.getOutputFormat(); RecordWriter<K, V> recordWriter = outFormat.getRecordWriter( null, jobConf, uuid + '-' + valueOf(i), Reporter.NULL); return new WriteHdfsP<>( recordWriter, taskAttemptContext, outputCommitter, extractKeyFn, extractValueFn); } catch (IOException e) { throw new JetException(e); } }).collect(toList()); }
@Override public void commitTask(TaskAttemptContext context) throws IOException { JobConf conf = context.getJobConf(); TaskAttemptID attemptId = context.getTaskAttemptID(); // get the mapping between index to output filename outputs = MRJobConfiguration.getOutputs(conf); // get temp task output path (compatible with hadoop1 and hadoop2) Path taskOutPath = FileOutputFormat.getWorkOutputPath(conf); FileSystem fs = taskOutPath.getFileSystem(conf); if( !fs.exists(taskOutPath) ) throw new IOException("Task output path "+ taskOutPath.toString() + "does not exist."); // move the task outputs to their final places context.getProgressible().progress(); moveFinalTaskOutputs(context, fs, taskOutPath); // delete the temporary task-specific output directory if( !fs.delete(taskOutPath, true) ) LOG.debug("Failed to delete the temporary output directory of task: " + attemptId + " - " + taskOutPath); }
private void moveFileToDestination(TaskAttemptContext context, FileSystem fs, Path file) throws IOException { TaskAttemptID attemptId = context.getTaskAttemptID(); // get output index and final destination String name = file.getName(); //e.g., 0-r-00000 int index = Integer.parseInt(name.substring(0, name.indexOf("-"))); Path dest = new Path(outputs[index], name); //e.g., outX/0-r-00000 // move file from 'file' to 'finalPath' if( !fs.rename(file, dest) ) { if (!fs.delete(dest, true)) throw new IOException("Failed to delete earlier output " + dest + " for rename of " + file + " in task " + attemptId); if (!fs.rename(file, dest)) throw new IOException("Failed to save output " + dest + " for rename of " + file + " in task: " + attemptId); } }
/** * @param jobConf Job configuration. * @param taskCtx Task context. * @param directWrite Direct write flag. * @param fileName File name. * @throws IOException In case of IO exception. */ HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException { this.jobConf = jobConf; this.taskCtx = taskCtx; this.attempt = attempt; if (directWrite) { jobConf.set("mapreduce.task.attempt.id", attempt.toString()); OutputFormat outFormat = jobConf.getOutputFormat(); writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL); } else writer = null; }
@Test public void testCountStar() throws IOException, InterruptedException { Configuration config = new Configuration(); TextInputFormat.TextRecordReader reader = new TextInputFormat.TextRecordReader(); try { RecordServiceConfig.setInputQuery(config, "select count(*) from tpch.nation"); List<InputSplit> splits = PlanUtil.getSplits(config, new Credentials()).splits; int numRows = 0; for (InputSplit split: splits) { reader.initialize(split, new TaskAttemptContextImpl(new JobConf(config), new TaskAttemptID())); while (reader.nextKeyValue()) { ++numRows; } } assertEquals(25, numRows); } finally { reader.close(); } }
public static TaskAttemptContext instantiateTaskAttemptContext(JobConf jobConf, TaskAttemptID taskAttemptID) throws Exception { try { // for Hadoop 1.xx Class<?> clazz = null; if(!TaskAttemptContext.class.isInterface()) { clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContext", true, Thread.currentThread().getContextClassLoader()); } // for Hadoop 2.xx else { clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContextImpl", true, Thread.currentThread().getContextClassLoader()); } Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, TaskAttemptID.class); // for Hadoop 1.xx constructor.setAccessible(true); TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(jobConf, taskAttemptID); return context; } catch(Exception e) { throw new Exception("Could not create instance of TaskAttemptContext.", e); } }
public void commitTask(JobConf conf, TaskAttemptID taskAttemptID) throws IOException { Path taskOutputPath = getTempTaskOutputPath(conf, taskAttemptID); if (taskOutputPath != null) { FileSystem fs = taskOutputPath.getFileSystem(conf); if (fs.exists(taskOutputPath)) { Path jobOutputPath = taskOutputPath.getParent().getParent(); // Move the task outputs to their final place moveTaskOutputs(conf,taskAttemptID, fs, jobOutputPath, taskOutputPath); // Delete the temporary task-specific output directory if (!fs.delete(taskOutputPath, true)) { LOG.info("Failed to delete the temporary output" + " directory of task: " + taskAttemptID + " - " + taskOutputPath); } LOG.info("Saved output of task '" + taskAttemptID + "' to " + jobOutputPath); } } }
public boolean needsTaskCommit(JobConf conf, TaskAttemptID taskAttemptID) throws IOException { try { Path taskOutputPath = getTempTaskOutputPath(conf, taskAttemptID); if (taskOutputPath != null) { // Get the file-system for the task output directory FileSystem fs = taskOutputPath.getFileSystem(conf); // since task output path is created on demand, // if it exists, task needs a commit if (fs.exists(taskOutputPath)) { return true; } } } catch (IOException ioe) { throw ioe; } return false; }
public Path getTempTaskOutputPath(JobConf conf, TaskAttemptID taskAttemptID) { Path outputPath = FileOutputFormat.getOutputPath(conf); if (outputPath != null) { Path p = new Path(outputPath, (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + "_" + taskAttemptID.toString())); try { FileSystem fs = p.getFileSystem(conf); return p.makeQualified(fs); } catch (IOException ie) { LOG.warn(StringUtils.stringifyException(ie)); return p; } } return null; }
/** * clean previous std error and outs */ private void initStdOut(JobConf configuration) { TaskAttemptID taskId = TaskAttemptID.forName(configuration .get(MRJobConfig.TASK_ATTEMPT_ID)); File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT); File stdErr = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDERR); // prepare folder if (!stdOut.getParentFile().exists()) { stdOut.getParentFile().mkdirs(); } else { // clean logs stdOut.deleteOnExit(); stdErr.deleteOnExit(); } }
private String readStdOut(JobConf conf) throws Exception { TaskAttemptID taskId = TaskAttemptID.forName(conf .get(MRJobConfig.TASK_ATTEMPT_ID)); File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT); return readFile(stdOut); }
@SuppressWarnings("rawtypes") @Test public void testTipFailed() throws Exception { JobConf job = new JobConf(); job.setNumMapTasks(2); TaskStatus status = new TaskStatus() { @Override public boolean getIsMap() { return false; } @Override public void addFetchFailedMap(TaskAttemptID mapTaskId) { } }; Progress progress = new Progress(); TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE, 0, 0); ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status, reduceId, null, progress, null, null, null); JobID jobId = new JobID(); TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1); scheduler.tipFailed(taskId1); Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(), 0.0f); Assert.assertFalse(scheduler.waitUntilDone(1)); TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0); scheduler.tipFailed(taskId0); Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(), 0.0f); Assert.assertTrue(scheduler.waitUntilDone(1)); }
private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate( int startIdx, int numEvents) { ArrayList<TaskCompletionEvent> tceList = new ArrayList<TaskCompletionEvent>(numEvents); for (int i = 0; i < numEvents; ++i) { int eventIdx = startIdx + i; TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx, new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0), eventIdx, true, TaskCompletionEvent.Status.SUCCEEDED, "http://somehost:8888"); tceList.add(tce); } TaskCompletionEvent[] events = {}; return new MapTaskCompletionEventsUpdate(tceList.toArray(events), false); }
void validateTaskStderr(StreamJob job, TaskType type) throws IOException { TaskAttemptID attemptId = new TaskAttemptID(new TaskID(job.jobId_, type, 0), 0); String log = MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR, attemptId, false); // trim() is called on expectedStderr here because the method // MapReduceTestUtil.readTaskLog() returns trimmed String. assertTrue(log.equals(expectedStderr.trim())); }
public TaskContext(JobConf conf, Class<?> iKClass, Class<?> iVClass, Class<?> oKClass, Class<?> oVClass, TaskReporter reporter, TaskAttemptID id) { this.conf = conf; this.iKClass = iKClass; this.iVClass = iVClass; this.oKClass = oKClass; this.oVClass = oVClass; this.reporter = reporter; this.taskAttemptID = id; }
protected NativeCollectorOnlyHandler(TaskContext context, INativeHandler nativeHandler, BufferPusher<K, V> kvPusher, ICombineHandler combiner) throws IOException { Configuration conf = context.getConf(); TaskAttemptID id = context.getTaskAttemptId(); if (null == id) { this.output = OutputUtil.createNativeTaskOutput(conf, ""); } else { this.output = OutputUtil.createNativeTaskOutput(context.getConf(), context.getTaskAttemptId() .toString()); } this.combinerHandler = combiner; this.kvPusher = kvPusher; this.nativeHandler = nativeHandler; nativeHandler.setCommandDispatcher(this); }
/** * create the temporary output file for hadoop RecordWriter. * @param taskNumber The number of the parallel instance. * @param numTasks The number of parallel tasks. * @throws java.io.IOException */ @Override public void open(int taskNumber, int numTasks) throws IOException { // enforce sequential open() calls synchronized (OPEN_MUTEX) { if (Integer.toString(taskNumber + 1).length() > 6) { throw new IOException("Task id too large."); } TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0") + Integer.toString(taskNumber + 1) + "_0"); this.jobConf.set("mapred.task.id", taskAttemptID.toString()); this.jobConf.setInt("mapred.task.partition", taskNumber + 1); // for hadoop 2.2 this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString()); this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1); this.context = new TaskAttemptContextImpl(this.jobConf, taskAttemptID); this.outputCommitter = this.jobConf.getOutputCommitter(); JobContext jobContext = new JobContextImpl(this.jobConf, new JobID()); this.outputCommitter.setupJob(jobContext); this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable()); } }
public MapOutputLocation(TaskAttemptID taskAttemptId, String ttHost, String httpTaskTracker) { this.taskAttemptId = taskAttemptId; this.taskId = this.taskAttemptId.getTaskID(); this.ttHost = ttHost; this.httpTaskTracker = httpTaskTracker; }
public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, Configuration conf, Path file, long size) { this.mapId = mapId; this.mapAttemptId = mapAttemptId; this.conf = conf; this.file = file; this.compressedSize = size; this.data = null; this.inMemory = false; }