@Before @SuppressWarnings("unchecked") // mocked generics public void setup() { LOG.info(">>>> " + name.getMethodName()); job = new JobConf(); job.setBoolean(MRJobConfig.SHUFFLE_FETCH_RETRY_ENABLED, false); jobWithRetry = new JobConf(); jobWithRetry.setBoolean(MRJobConfig.SHUFFLE_FETCH_RETRY_ENABLED, true); id = TaskAttemptID.forName("attempt_0_1_r_1_1"); ss = mock(ShuffleSchedulerImpl.class); mm = mock(MergeManagerImpl.class); r = mock(Reporter.class); metrics = mock(ShuffleClientMetrics.class); except = mock(ExceptionReporter.class); key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0}); connection = mock(HttpURLConnection.class); allErrs = mock(Counters.Counter.class); when(r.getCounter(anyString(), anyString())).thenReturn(allErrs); ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1); maps.add(map1ID); maps.add(map2ID); when(ss.getMapsForHost(host)).thenReturn(maps); }
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // Make one reducer slower for speculative execution TaskAttemptID taid = context.getTaskAttemptID(); long sleepTime = 100; Configuration conf = context.getConfiguration(); boolean test_speculate_reduce = conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); // IF TESTING REDUCE SPECULATIVE EXECUTION: // Make the "*_r_000000_0" attempt take much longer than the others. // When speculative execution is enabled, this should cause the attempt // to be killed and restarted. At that point, the attempt ID will be // "*_r_000000_1", so sleepTime will still remain 100ms. if ( (taid.getTaskType() == TaskType.REDUCE) && test_speculate_reduce && (taid.getTaskID().getId() == 0) && (taid.getId() == 0)) { sleepTime = 10000; } try{ Thread.sleep(sleepTime); } catch(InterruptedException ie) { // Ignore } context.write(key,new IntWritable(0)); }
public void setDatum(Object odatum) { this.datum = (TaskAttemptUnsuccessfulCompletion)odatum; this.attemptId = TaskAttemptID.forName(datum.attemptId.toString()); this.taskType = TaskType.valueOf(datum.taskType.toString()); this.finishTime = datum.finishTime; this.hostname = datum.hostname.toString(); this.rackName = datum.rackname.toString(); this.port = datum.port; this.status = datum.status.toString(); this.error = datum.error.toString(); this.counters = EventReader.fromAvro(datum.counters); this.clockSplits = AvroArrayUtils.fromAvro(datum.clockSplits); this.cpuUsages = AvroArrayUtils.fromAvro(datum.cpuUsages); this.gpuUsages = AvroArrayUtils.fromAvro(datum.gpuUsages); this.vMemKbytes = AvroArrayUtils.fromAvro(datum.vMemKbytes); this.physMemKbytes = AvroArrayUtils.fromAvro(datum.physMemKbytes); }
public void setDatum(Object odatum) { this.datum = (TaskFailed)odatum; this.id = TaskID.forName(datum.taskid.toString()); this.taskType = TaskType.valueOf(datum.taskType.toString()); this.finishTime = datum.finishTime; this.error = datum.error.toString(); this.failedDueToAttempt = datum.failedDueToAttempt == null ? null : TaskAttemptID.forName( datum.failedDueToAttempt.toString()); this.status = datum.status.toString(); this.counters = EventReader.fromAvro(datum.counters); }
public void setDatum(Object oDatum) { this.datum = (MapAttemptFinished)oDatum; this.attemptId = TaskAttemptID.forName(datum.attemptId.toString()); this.taskType = TaskType.valueOf(datum.taskType.toString()); this.taskStatus = datum.taskStatus.toString(); this.mapFinishTime = datum.mapFinishTime; this.finishTime = datum.finishTime; this.hostname = datum.hostname.toString(); this.rackName = datum.rackname.toString(); this.port = datum.port; this.state = datum.state.toString(); this.counters = EventReader.fromAvro(datum.counters); this.clockSplits = AvroArrayUtils.fromAvro(datum.clockSplits); this.cpuUsages = AvroArrayUtils.fromAvro(datum.cpuUsages); this.gpuUsages = AvroArrayUtils.fromAvro(datum.gpuUsages); this.vMemKbytes = AvroArrayUtils.fromAvro(datum.vMemKbytes); this.physMemKbytes = AvroArrayUtils.fromAvro(datum.physMemKbytes); }
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, HistoryEventEmitter thatg) { if (taskAttemptIDName == null) { return null; } TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); String finishTime = line.get("FINISH_TIME"); String status = line.get("TASK_STATUS"); if (finishTime != null && status != null && status.equalsIgnoreCase("success")) { String hostName = line.get("HOSTNAME"); String counters = line.get("COUNTERS"); String state = line.get("STATE_STRING"); String shuffleFinish = line.get("SHUFFLE_FINISHED"); String sortFinish = line.get("SORT_FINISHED"); if (shuffleFinish != null && sortFinish != null && "success".equalsIgnoreCase(status)) { ReduceAttempt20LineHistoryEventEmitter that = (ReduceAttempt20LineHistoryEventEmitter) thatg; return new ReduceAttemptFinishedEvent (taskAttemptID, that.originalTaskType, status, Long.parseLong(shuffleFinish), Long.parseLong(sortFinish), Long.parseLong(finishTime), hostName, -1, null, state, maybeParseCounters(counters), null); } } return null; }
/** * Mask the job ID part in a {@link TaskAttemptID}. * * @param attemptId * raw {@link TaskAttemptID} read from trace * @return masked {@link TaskAttemptID} with empty {@link JobID}. */ private TaskAttemptID maskAttemptID(TaskAttemptID attemptId) { JobID jobId = new JobID(); TaskType taskType = attemptId.getTaskType(); TaskID taskId = attemptId.getTaskID(); return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), taskType, taskId.getId(), attemptId.getId()); }
public static String getLocalLogDir(TaskAttemptID attemptId) { int tid = attemptId.getTaskID().getId(); int aid = attemptId.getId(); String jid = attemptId.getJobID().toString(); StringBuilder sb = new StringBuilder(jid).append('-'); sb.append(tid).append('-').append(aid); String localLogDir = sb.toString(); return localLogDir; }
public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter .toYarn(arg0); GetDiagnosticsRequest request = recordFactory .newRecordInstance(GetDiagnosticsRequest.class); request.setTaskAttemptId(attemptID); List<String> list = ((GetDiagnosticsResponse) invoke("getDiagnostics", GetDiagnosticsRequest.class, request)).getDiagnosticsList(); String[] result = new String[list.size()]; int i = 0; for (String c : list) { result[i++] = c.toString(); } return result; }
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, HistoryEventEmitter thatg) { if (taskAttemptIDName == null) { return null; } TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); String startTime = line.get("START_TIME"); String taskType = line.get("TASK_TYPE"); String trackerName = line.get("TRACKER_NAME"); String httpPort = line.get("HTTP_PORT"); String locality = line.get("LOCALITY"); if (locality == null) { locality = ""; } String avataar = line.get("AVATAAR"); if (avataar == null) { avataar = ""; } if (startTime != null && taskType != null) { TaskAttempt20LineEventEmitter that = (TaskAttempt20LineEventEmitter) thatg; that.originalStartTime = Long.parseLong(startTime); that.originalTaskType = Version20LogInterfaceUtils.get20TaskType(taskType); int port = httpPort.equals("") ? DEFAULT_HTTP_PORT : Integer .parseInt(httpPort); return new TaskAttemptStartedEvent(taskAttemptID, that.originalTaskType, that.originalStartTime, trackerName, port, -1, locality, avataar); } return null; }
@Test public void testReinit() throws Exception { // Test that a split containing multiple files works correctly, // with the child RecordReader getting its initialize() method // called a second time. TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0); Configuration conf = new Configuration(); TaskAttemptContext context = new TaskAttemptContextImpl(conf, taskId); // This will create a CombineFileRecordReader that itself contains a // DummyRecordReader. InputFormat inputFormat = new ChildRRInputFormat(); Path [] files = { new Path("file1"), new Path("file2") }; long [] lengths = { 1, 1 }; CombineFileSplit split = new CombineFileSplit(files, lengths); RecordReader rr = inputFormat.createRecordReader(split, context); assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader); // first initialize() call comes from MapTask. We'll do it here. rr.initialize(split, context); // First value is first filename. assertTrue(rr.nextKeyValue()); assertEquals("file1", rr.getCurrentValue().toString()); // The inner RR will return false, because it only emits one (k, v) pair. // But there's another sub-split to process. This returns true to us. assertTrue(rr.nextKeyValue()); // And the 2nd rr will have its initialize method called correctly. assertEquals("file2", rr.getCurrentValue().toString()); // But after both child RR's have returned their singleton (k, v), this // should also return false. assertFalse(rr.nextKeyValue()); }
/** * Create the map-output-url. This will contain all the map ids * separated by commas * @param host * @param maps * @return * @throws MalformedURLException */ private URL getMapOutputURL(MapHost host, Collection<TaskAttemptID> maps ) throws MalformedURLException { // Get the base url StringBuffer url = new StringBuffer(host.getBaseUrl()); boolean first = true; for (TaskAttemptID mapId : maps) { if (!first) { url.append(","); } url.append(mapId); first = false; } LOG.debug("MapOutput URL for " + host + " -> " + url.toString()); return new URL(url.toString()); }
/** {@inheritDoc} */ public RecordWriter<K, V> getRecordWriter(FileSystem filesystem, JobConf job, String name, Progressable progress) throws IOException { org.apache.hadoop.mapreduce.RecordWriter<K, V> w = super.getRecordWriter( new TaskAttemptContextImpl(job, TaskAttemptID.forName(job.get(MRJobConfig.TASK_ATTEMPT_ID)))); org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter writer = (org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter) w; try { return new DBRecordWriter(writer.getConnection(), writer.getStatement()); } catch(SQLException se) { throw new IOException(se); } }
public FakeFetcher(JobConf job, TaskAttemptID reduceId, ShuffleSchedulerImpl<K,V> scheduler, MergeManagerImpl<K,V> merger, Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, SecretKey jobTokenSecret, HttpURLConnection connection) { super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter, jobTokenSecret); this.connection = connection; }
public synchronized void addKnownMapOutput(String hostName, String hostUrl, TaskAttemptID mapId) { MapHost host = mapLocations.get(hostName); if (host == null) { host = new MapHost(hostName, hostUrl); mapLocations.put(hostName, host); } host.addKnownMap(mapId); // Mark the host as pending if (host.getState() == State.PENDING) { pendingHosts.add(host); notifyAll(); } }
/** * Create an event to record completion of a reduce attempt * @param id Attempt Id * @param taskType Type of task * @param taskStatus Status of the task * @param shuffleFinishTime Finish time of the shuffle phase * @param sortFinishTime Finish time of the sort phase * @param finishTime Finish time of the attempt * @param hostname Name of the host where the attempt executed * @param port RPC port for the tracker host. * @param rackName Name of the rack where the attempt executed * @param state State of the attempt * @param counters Counters for the attempt * @param allSplits the "splits", or a pixelated graph of various * measurable worker node state variables against progress. * Currently there are four; wallclock time, CPU time, * virtual memory and physical memory. */ public ReduceAttemptFinishedEvent (TaskAttemptID id, TaskType taskType, String taskStatus, long shuffleFinishTime, long sortFinishTime, long finishTime, String hostname, int port, String rackName, String state, Counters counters, int[][] allSplits) { this.attemptId = id; this.taskType = taskType; this.taskStatus = taskStatus; this.shuffleFinishTime = shuffleFinishTime; this.sortFinishTime = sortFinishTime; this.finishTime = finishTime; this.hostname = hostname; this.rackName = rackName; this.port = port; this.state = state; this.counters = counters; this.allSplits = allSplits; this.clockSplits = ProgressSplitsBlock.arrayGetWallclockTime(allSplits); this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits); this.gpuUsages = ProgressSplitsBlock.arrayGetGPUTime(allSplits); this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits); this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits); }
private static SortedSet<byte[]> readFileToSearch(final Configuration conf, final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException, InterruptedException { SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR); // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is // what is missing. TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr = new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) { InputSplit is = new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {}); rr.initialize(is, context); while (rr.nextKeyValue()) { rr.getCurrentKey(); BytesWritable bw = rr.getCurrentValue(); if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) { byte[] key = new byte[rr.getCurrentKey().getLength()]; System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey() .getLength()); result.add(key); } } } return result; }
private void setupConnectionsWithRetry(MapHost host, Set<TaskAttemptID> remaining, URL url) throws IOException { openConnectionWithRetry(host, remaining, url); if (stopped) { return; } // generate hash of the url String msgToEncode = SecureShuffleUtils.buildMsgFrom(url); String encHash = SecureShuffleUtils.hashFromString(msgToEncode, shuffleSecretKey); setupShuffleConnection(encHash); connect(connection, connectionTimeout); // verify that the thread wasn't stopped during calls to connect if (stopped) { return; } verifyConnection(url, msgToEncode, encHash); }
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, HistoryEventEmitter thatg) { if (taskAttemptIDName == null) { return null; } TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); String finishTime = line.get("FINISH_TIME"); String status = line.get("TASK_STATUS"); if (finishTime != null && status != null && !status.equalsIgnoreCase("success")) { String hostName = line.get("HOSTNAME"); String error = line.get("ERROR"); TaskAttempt20LineEventEmitter that = (TaskAttempt20LineEventEmitter) thatg; ParsedHost pHost = ParsedHost.parse(hostName); String rackName = null; // Earlier versions of MR logged on hostnames (without rackname) for // unsuccessful attempts if (pHost != null) { rackName = pHost.getRackName(); hostName = pHost.getNodeName(); } return new TaskAttemptUnsuccessfulCompletionEvent (taskAttemptID, that.originalTaskType, status, Long.parseLong(finishTime), hostName, -1, rackName, error, null); } return null; }
public LocalFetcher(JobConf job, TaskAttemptID reduceId, ShuffleSchedulerImpl<K, V> scheduler, MergeManager<K,V> merger, Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, SecretKey shuffleKey, Map<TaskAttemptID, MapOutputFile> localMapFiles) { super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter, shuffleKey); this.job = job; this.localMapFiles = localMapFiles; setName("localfetcher#" + id); setDaemon(true); }
/** * The crux of the matter... */ private void doCopy(Set<TaskAttemptID> maps) throws IOException { Iterator<TaskAttemptID> iter = maps.iterator(); while (iter.hasNext()) { TaskAttemptID map = iter.next(); LOG.debug("LocalFetcher " + id + " going to fetch: " + map); if (copyMapOutput(map)) { // Successful copy. Remove this from our worklist. iter.remove(); } else { // We got back a WAIT command; go back to the outer loop // and block for InMemoryMerge. break; } } }
public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId, MergeManagerImpl<K, V> merger, int size, CompressionCodec codec, boolean primaryMapOutput) { super(mapId, (long)size, primaryMapOutput); this.conf = conf; this.merger = merger; this.codec = codec; byteStream = new BoundedByteArrayOutputStream(size); memory = byteStream.getBuffer(); if (codec != null) { decompressor = CodecPool.getDecompressor(codec); } else { decompressor = null; } }
/** * test some methods of CompletedTaskAttempt */ @Test (timeout=5000) public void testCompletedTaskAttempt(){ TaskAttemptInfo attemptInfo= mock(TaskAttemptInfo.class); when(attemptInfo.getRackname()).thenReturn("Rackname"); when(attemptInfo.getShuffleFinishTime()).thenReturn(11L); when(attemptInfo.getSortFinishTime()).thenReturn(12L); when(attemptInfo.getShufflePort()).thenReturn(10); JobID jobId= new JobID("12345",0); TaskID taskId =new TaskID(jobId,TaskType.REDUCE, 0); TaskAttemptID taskAttemptId= new TaskAttemptID(taskId, 0); when(attemptInfo.getAttemptId()).thenReturn(taskAttemptId); CompletedTaskAttempt taskAttemt= new CompletedTaskAttempt(null,attemptInfo); assertEquals( "Rackname", taskAttemt.getNodeRackName()); assertEquals( Phase.CLEANUP, taskAttemt.getPhase()); assertTrue( taskAttemt.isFinished()); assertEquals( 11L, taskAttemt.getShuffleFinishTime()); assertEquals( 12L, taskAttemt.getSortFinishTime()); assertEquals( 10, taskAttemt.getShufflePort()); }
protected void mergeMapOutput(TaskAttemptID mapId) throws IOException { int mapid = mapId.getTaskID().getId(); for(int i=0; true; i++) { Path file = new Path(mapOutputDir, String.format(SharedFsPlugins.MAP_OUTPUT, reduceId.getTaskID().getId(), mapid, i)); if(!lustrefs.exists(file)) { // if(i == 0) { // throw new IOException("No map outputs found. At least one is expected!"); // } return; } addMapOutputSegments(file); } }
@Test public void testReduceOutOfDiskSpace() throws Throwable { LOG.info("testReduceOutOfDiskSpace"); Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm, r, metrics, except, key, connection); String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); ByteArrayOutputStream bout = new ByteArrayOutputStream(); header.write(new DataOutputStream(bout)); ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); when(connection.getResponseCode()).thenReturn(200); when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) .thenReturn(replyHash); when(connection.getInputStream()).thenReturn(in); when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) .thenThrow(new DiskErrorException("No disk space available")); underTest.copyFromHost(host); verify(ss).reportLocalError(any(IOException.class)); }
/** * Do some basic verification on the input received -- Being defensive * @param compressedLength * @param decompressedLength * @param forReduce * @param remaining * @param mapId * @return true/false, based on if the verification succeeded or not */ private boolean verifySanity(long compressedLength, long decompressedLength, int forReduce, Set<TaskAttemptID> remaining, TaskAttemptID mapId) { if (compressedLength < 0 || decompressedLength < 0) { wrongLengthErrs.increment(1); LOG.warn(getName() + " invalid lengths in map output header: id: " + mapId + " len: " + compressedLength + ", decomp len: " + decompressedLength); return false; } if (forReduce != reduce) { wrongReduceErrs.increment(1); LOG.warn(getName() + " data for the wrong reduce map: " + mapId + " len: " + compressedLength + " decomp len: " + decompressedLength + " for reduce " + forReduce); return false; } // Sanity check if (!remaining.contains(mapId)) { wrongMapErrs.increment(1); LOG.warn("Invalid map-output! Received output for " + mapId); return false; } return true; }
@SuppressWarnings("unchecked") @Test(timeout=10000) public void testCopyFromHostWithRetryUnreserve() throws Exception { InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class); Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(jobWithRetry, id, ss, mm, r, metrics, except, key, connection); String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); when(connection.getResponseCode()).thenReturn(200); when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) .thenReturn(replyHash); ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); ByteArrayOutputStream bout = new ByteArrayOutputStream(); header.write(new DataOutputStream(bout)); ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); when(connection.getInputStream()).thenReturn(in); when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); // Verify that unreserve occurs if an exception happens after shuffle // buffer is reserved. when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) .thenReturn(immo); doThrow(new IOException("forced error")).when(immo).shuffle( any(MapHost.class), any(InputStream.class), anyLong(), anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class)); underTest.copyFromHost(host); verify(immo).abort(); }
void setAttemptID(String attemptID) { this.attemptID = TaskAttemptID.forName(attemptID); }
public TaskAttemptContextImpl(Configuration conf, TaskAttemptID taskId, StatusReporter reporter) { super(conf, taskId.getJobID()); this.taskId = taskId; this.reporter = reporter; }
@Test public void testRecordReaderInit() throws InterruptedException, IOException { // Test that we properly initialize the child recordreader when // CombineFileInputFormat and CombineFileRecordReader are used. TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0); Configuration conf1 = new Configuration(); conf1.set(DUMMY_KEY, "STATE1"); TaskAttemptContext context1 = new TaskAttemptContextImpl(conf1, taskId); // This will create a CombineFileRecordReader that itself contains a // DummyRecordReader. InputFormat inputFormat = new ChildRRInputFormat(); Path [] files = { new Path("file1") }; long [] lengths = { 1 }; CombineFileSplit split = new CombineFileSplit(files, lengths); RecordReader rr = inputFormat.createRecordReader(split, context1); assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader); // Verify that the initial configuration is the one being used. // Right after construction the dummy key should have value "STATE1" assertEquals("Invalid initial dummy key value", "STATE1", rr.getCurrentKey().toString()); // Switch the active context for the RecordReader... Configuration conf2 = new Configuration(); conf2.set(DUMMY_KEY, "STATE2"); TaskAttemptContext context2 = new TaskAttemptContextImpl(conf2, taskId); rr.initialize(split, context2); // And verify that the new context is updated into the child record reader. assertEquals("Invalid secondary dummy key value", "STATE2", rr.getCurrentKey().toString()); }
public synchronized void copySucceeded(TaskAttemptID mapId, MapHost host, long bytes, long startMillis, long endMillis, MapOutput<K,V> output ) throws IOException { failureCounts.remove(mapId); hostFailures.remove(host.getHostName()); int mapIndex = mapId.getTaskID().getId(); if (!finishedMaps[mapIndex]) { output.commit(); finishedMaps[mapIndex] = true; shuffledMapsCounter.increment(1); if (--remainingMaps == 0) { notifyAll(); } // update single copy task status long copyMillis = (endMillis - startMillis); if (copyMillis == 0) copyMillis = 1; float bytesPerMillis = (float) bytes / copyMillis; float transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS; String individualProgress = "copy task(" + mapId + " succeeded" + " at " + mbpsFormat.format(transferRate) + " MB/s)"; // update the aggregated status copyTimeTracker.add(startMillis, endMillis); totalBytesShuffledTillNow += bytes; updateStatus(individualProgress); reduceShuffleBytes.increment(bytes); lastProgressTime = Time.monotonicNow(); LOG.debug("map " + mapId + " done " + status.getStateString()); } }
public FakeFetcher(JobConf job, TaskAttemptID reduceId, ShuffleSchedulerImpl<K,V> scheduler, MergeManagerImpl<K,V> merger, Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, SecretKey jobTokenSecret, HttpURLConnection connection, int id) { super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter, jobTokenSecret, id); this.connection = connection; }
static URI getBaseURI(TaskAttemptID reduceId, String url) { StringBuffer baseUrl = new StringBuffer(url); if (!url.endsWith("/")) { baseUrl.append("/"); } baseUrl.append("mapOutput?job="); baseUrl.append(reduceId.getJobID()); baseUrl.append("&reduce="); baseUrl.append(reduceId.getTaskID().getId()); baseUrl.append("&map="); URI u = URI.create(baseUrl.toString()); return u; }
/** * Unconditional Reserve is used by the Memory-to-Memory thread * @return */ private synchronized InMemoryMapOutput<K, V> unconditionalReserve( TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) { usedMemory += requestedSize; return new InMemoryMapOutput<K,V>(jobConf, mapId, this, (int)requestedSize, codec, primaryMapOutput); }
/** * From each split sampled, take the first numSamples / numSplits records. */ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type public K[] getSample(InputFormat<K,V> inf, Job job) throws IOException, InterruptedException { List<InputSplit> splits = inf.getSplits(job); ArrayList<K> samples = new ArrayList<K>(numSamples); int splitsToSample = Math.min(maxSplitsSampled, splits.size()); int samplesPerSplit = numSamples / splitsToSample; long records = 0; for (int i = 0; i < splitsToSample; ++i) { TaskAttemptContext samplingContext = new TaskAttemptContextImpl( job.getConfiguration(), new TaskAttemptID()); RecordReader<K,V> reader = inf.createRecordReader( splits.get(i), samplingContext); reader.initialize(splits.get(i), samplingContext); while (reader.nextKeyValue()) { samples.add(ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), null)); ++records; if ((i+1) * samplesPerSplit <= records) { break; } } reader.close(); } return (K[])samples.toArray(); }
/** * For each split sampled, emit when the ratio of the number of records * retained to the total record count is less than the specified * frequency. */ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type public K[] getSample(InputFormat<K,V> inf, Job job) throws IOException, InterruptedException { List<InputSplit> splits = inf.getSplits(job); ArrayList<K> samples = new ArrayList<K>(); int splitsToSample = Math.min(maxSplitsSampled, splits.size()); long records = 0; long kept = 0; for (int i = 0; i < splitsToSample; ++i) { TaskAttemptContext samplingContext = new TaskAttemptContextImpl( job.getConfiguration(), new TaskAttemptID()); RecordReader<K,V> reader = inf.createRecordReader( splits.get(i), samplingContext); reader.initialize(splits.get(i), samplingContext); while (reader.nextKeyValue()) { ++records; if ((double) kept / records < freq) { samples.add(ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), null)); ++kept; } } reader.close(); } return (K[])samples.toArray(); }
@Private public void commitTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException { TaskAttemptID attemptId = context.getTaskAttemptID(); if (hasOutputPath()) { context.progress(); if(taskAttemptPath == null) { taskAttemptPath = getTaskAttemptPath(context); } FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); FileStatus taskAttemptDirStatus; try { taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath); } catch (FileNotFoundException e) { taskAttemptDirStatus = null; } if (taskAttemptDirStatus != null) { if (algorithmVersion == 1) { Path committedTaskPath = getCommittedTaskPath(context); if (fs.exists(committedTaskPath)) { if (!fs.delete(committedTaskPath, true)) { throw new IOException("Could not delete " + committedTaskPath); } } if (!fs.rename(taskAttemptPath, committedTaskPath)) { throw new IOException("Could not rename " + taskAttemptPath + " to " + committedTaskPath); } LOG.info("Saved output of task '" + attemptId + "' to " + committedTaskPath); } else { // directly merge everything from taskAttemptPath to output directory mergePaths(fs, taskAttemptDirStatus, outputPath); LOG.info("Saved output of task '" + attemptId + "' to " + outputPath); } } else { LOG.warn("No Output found for " + attemptId); } } else { LOG.warn("Output Path is null in commitTask()"); } }
private void printTaskAttempts(TaskReport report) { if (report.getCurrentStatus() == TIPStatus.COMPLETE) { System.out.println(report.getSuccessfulTaskAttemptId()); } else if (report.getCurrentStatus() == TIPStatus.RUNNING) { for (TaskAttemptID t : report.getRunningTaskAttemptIds()) { System.out.println(t); } } }
@SuppressWarnings({"unchecked", "rawtypes"}) @Test (timeout=30000) public void testSleepMapper() throws Exception { SleepJob.SleepMapper test = new SleepJob.SleepMapper(); Configuration conf = new Configuration(); conf.setInt(JobContext.NUM_REDUCES, 2); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); TaskAttemptID taskId = new TaskAttemptID(); FakeRecordLLReader reader = new FakeRecordLLReader(); LoadRecordGkNullWriter writer = new LoadRecordGkNullWriter(); OutputCommitter committer = new CustomOutputCommitter(); StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter(); SleepSplit split = getSleepSplit(); MapContext<LongWritable, LongWritable, GridmixKey, NullWritable> mapcontext = new MapContextImpl<LongWritable, LongWritable, GridmixKey, NullWritable>( conf, taskId, reader, writer, committer, reporter, split); Context context = new WrappedMapper<LongWritable, LongWritable, GridmixKey, NullWritable>() .getMapContext(mapcontext); long start = System.currentTimeMillis(); LOG.info("start:" + start); LongWritable key = new LongWritable(start + 2000); LongWritable value = new LongWritable(start + 2000); // should slip 2 sec test.map(key, value, context); LOG.info("finish:" + System.currentTimeMillis()); assertTrue(System.currentTimeMillis() >= (start + 2000)); test.cleanup(context); assertEquals(1, writer.getData().size()); }
public void setDatum(Object oDatum) { this.datum = (TaskAttemptFinished)oDatum; this.attemptId = TaskAttemptID.forName(datum.attemptId.toString()); this.taskType = TaskType.valueOf(datum.taskType.toString()); this.taskStatus = datum.taskStatus.toString(); this.finishTime = datum.finishTime; this.rackName = datum.rackname.toString(); this.hostname = datum.hostname.toString(); this.state = datum.state.toString(); this.counters = EventReader.fromAvro(datum.counters); }