@Override public void reportNextRecordRange(TaskAttemptID taskAttemptID, Range range) throws IOException { // This is used when the feature of skipping records is enabled. // This call exists as a hadoop mapreduce legacy wherein all changes in // counters/progress/phase/output-size are reported through statusUpdate() // call but not the next record range information. throw new IOException("Not yet implemented."); }
public void testAdd() { SortedRanges sr = new SortedRanges(); sr.add(new Range(2,9)); assertEquals(9, sr.getIndicesCount()); sr.add(new SortedRanges.Range(3,5)); assertEquals(9, sr.getIndicesCount()); sr.add(new SortedRanges.Range(7,1)); assertEquals(9, sr.getIndicesCount()); sr.add(new Range(1,12)); assertEquals(12, sr.getIndicesCount()); sr.add(new Range(7,9)); assertEquals(15, sr.getIndicesCount()); sr.add(new Range(31,10)); sr.add(new Range(51,10)); sr.add(new Range(66,10)); assertEquals(45, sr.getIndicesCount()); sr.add(new Range(21,50)); assertEquals(70, sr.getIndicesCount()); LOG.debug(sr); Iterator<Long> it = sr.skipRangeIterator(); int i = 0; assertEquals(i, it.next().longValue()); for(i=16;i<21;i++) { assertEquals(i, it.next().longValue()); } assertEquals(76, it.next().longValue()); assertEquals(77, it.next().longValue()); }
public void testRemove() { SortedRanges sr = new SortedRanges(); sr.add(new Range(2,19)); assertEquals(19, sr.getIndicesCount()); sr.remove(new SortedRanges.Range(15,8)); assertEquals(13, sr.getIndicesCount()); sr.remove(new SortedRanges.Range(6,5)); assertEquals(8, sr.getIndicesCount()); sr.remove(new SortedRanges.Range(8,4)); assertEquals(7, sr.getIndicesCount()); sr.add(new Range(18,5)); assertEquals(12, sr.getIndicesCount()); sr.add(new Range(25,1)); assertEquals(13, sr.getIndicesCount()); sr.remove(new SortedRanges.Range(7,24)); assertEquals(4, sr.getIndicesCount()); sr.remove(new SortedRanges.Range(5,1)); assertEquals(3, sr.getIndicesCount()); LOG.debug(sr); }
synchronized void add(Range failedRange) { LOG.warn("FailedRange:"+ failedRange); if(divide!=null) { LOG.warn("FailedRange:"+ failedRange +" test:"+divide.test + " pass:"+divide.testPassed); if(divide.testPassed) { //test range passed //other range would be bad. test it failedRange = divide.other; } else { //test range failed //other range would be good. failedRange = divide.test; } //reset divide = null; } if(maxSkipRecords==0 || failedRange.getLength()<=maxSkipRecords) { skipRanges.add(failedRange); } else { //start dividing the range to narrow down the skipped //records until maxSkipRecords are met OR all attempts //get exhausted divide = new Divide(failedRange); } }
Divide(Range range){ long half = range.getLength()/2; test = new Range(range.getStartIndex(), half); other = new Range(test.getEndIndex(), range.getLength()-half); //construct the skip range from the skipRanges skipRange = new SortedRanges(); for(Range r : skipRanges.getRanges()) { skipRange.add(r); } skipRange.add(new Range(0,test.getStartIndex())); skipRange.add(new Range(test.getEndIndex(), (Long.MAX_VALUE-test.getEndIndex()))); }
@Test public void testAdd() { SortedRanges sr = new SortedRanges(); sr.add(new Range(2,9)); assertEquals(9, sr.getIndicesCount()); sr.add(new SortedRanges.Range(3,5)); assertEquals(9, sr.getIndicesCount()); sr.add(new SortedRanges.Range(7,1)); assertEquals(9, sr.getIndicesCount()); sr.add(new Range(1,12)); assertEquals(12, sr.getIndicesCount()); sr.add(new Range(7,9)); assertEquals(15, sr.getIndicesCount()); sr.add(new Range(31,10)); sr.add(new Range(51,10)); sr.add(new Range(66,10)); assertEquals(45, sr.getIndicesCount()); sr.add(new Range(21,50)); assertEquals(70, sr.getIndicesCount()); LOG.debug(sr); Iterator<Long> it = sr.skipRangeIterator(); int i = 0; assertEquals(i, it.next().longValue()); for(i=16;i<21;i++) { assertEquals(i, it.next().longValue()); } assertEquals(76, it.next().longValue()); assertEquals(77, it.next().longValue()); }
@Test public void testRemove() { SortedRanges sr = new SortedRanges(); sr.add(new Range(2,19)); assertEquals(19, sr.getIndicesCount()); sr.remove(new SortedRanges.Range(15,8)); assertEquals(13, sr.getIndicesCount()); sr.remove(new SortedRanges.Range(6,5)); assertEquals(8, sr.getIndicesCount()); sr.remove(new SortedRanges.Range(8,4)); assertEquals(7, sr.getIndicesCount()); sr.add(new Range(18,5)); assertEquals(12, sr.getIndicesCount()); sr.add(new Range(25,1)); assertEquals(13, sr.getIndicesCount()); sr.remove(new SortedRanges.Range(7,24)); assertEquals(4, sr.getIndicesCount()); sr.remove(new SortedRanges.Range(5,1)); assertEquals(3, sr.getIndicesCount()); LOG.debug(sr); }