Java 类org.apache.hadoop.mapred.Merger.Segment 实例源码

项目:hadoop-2.6.0-cdh5.4.3    文件:TestMerger.java   
@SuppressWarnings( { "deprecation", "unchecked" })
public void testMergeShouldReturnProperProgress(
    List<Segment<Text, Text>> segments) throws IOException {
  Configuration conf = new Configuration();
  JobConf jobConf = new JobConf();
  FileSystem fs = FileSystem.getLocal(conf);
  Path tmpDir = new Path("localpath");
  Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
  Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
  RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
  Counter readsCounter = new Counter();
  Counter writesCounter = new Counter();
  RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
      valueClass, segments, 2, tmpDir, comparator, getReporter(),
      readsCounter, writesCounter);
  Assert.assertEquals(1.0f, mergeQueue.getProgress().get());
}
项目:hadoop-EAR    文件:BlockMapOutputBuffer.java   
public synchronized void flush() throws IOException, ClassNotFoundException,
    InterruptedException {
  if (numSpills > 0 && lastSpillInMem) {
    // if there is already one spills, we can try to hold this last spill in
    // memory.
    sortReduceParts();
    for (int i = 0; i < partitions; i++) {
      this.inMemorySegments[i] =
          new Segment<K, V>(this.reducePartitions[i].getIReader(),
              true);
    }
    hasInMemorySpill=true;
  } else {
    sortAndSpill();      
  }
  long mergeStartMilli = System.currentTimeMillis();
  ProcResourceValues mergeStartProcVals = task.getCurrentProcResourceValues();
  long mergeStart = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK");
  mergeParts();
  long mergeEndMilli = System.currentTimeMillis();
  ProcResourceValues mergeEndProcVals = task.getCurrentProcResourceValues();
  long mergeEnd = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK");
  mapSpillSortCounter.incMergeCounters(mergeStartProcVals, mergeEndProcVals,
      mergeEndMilli - mergeStartMilli);
  mapSpillSortCounter.incJVMCPUMerge(mergeStart, mergeEnd);
}
项目:RDFS    文件:BlockMapOutputBuffer.java   
public synchronized void flush() throws IOException, ClassNotFoundException,
    InterruptedException {
  if (numSpills > 0 && lastSpillInMem) {
    // if there is already one spills, we can try to hold this last spill in
    // memory.
    sortReduceParts();
    for (int i = 0; i < partitions; i++) {
      this.inMemorySegments[i] =
          new Segment<K, V>(this.reducePartitions[i].getIReader(),
              true);
    }
    hasInMemorySpill=true;
  } else {
    sortAndSpill();      
  }
  long mergeStartMilli = System.currentTimeMillis();
  ProcResourceValues mergeStartProcVals = task.getCurrentProcResourceValues();
  mergeParts();
  long mergeEndMilli = System.currentTimeMillis();
  ProcResourceValues mergeEndProcVals = task.getCurrentProcResourceValues();
  mapSpillSortCounter.incMergeCounters(mergeStartProcVals, mergeEndProcVals,
      mergeEndMilli - mergeStartMilli);
}
项目:hanoi-hadoop-2.0.0-cdh    文件:TestMerger.java   
@SuppressWarnings( { "deprecation", "unchecked" })
public void testMergeShouldReturnProperProgress(
    List<Segment<Text, Text>> segments) throws IOException {
  Configuration conf = new Configuration();
  JobConf jobConf = new JobConf();
  FileSystem fs = FileSystem.getLocal(conf);
  Path tmpDir = new Path("localpath");
  Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
  Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
  RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
  Counter readsCounter = new Counter();
  Counter writesCounter = new Counter();
  RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
      valueClass, segments, 2, tmpDir, comparator, getReporter(),
      readsCounter, writesCounter);
  Assert.assertEquals(1.0f, mergeQueue.getProgress().get());
}
项目:mammoth    文件:ReduceTask.java   
private int createInMemSegments(
    List<Segment<K, V>> inMemorySegments, int num)
    throws IOException {        
    int totalSize = 0;
  synchronized (mapOutputsFilesInMemory) {
    //LOG.info("create in mem 111");
    while(num > 0) {
      MapOutput mo = mapOutputsFilesInMemory.remove(0);
      totalSize += mo.data.getLen();            

      Reader<K, V> reader = new Reader(conf, ramManager, mo.data, mo.data.getLen());
      Segment<K, V> segment = 
        new Segment<K, V>(reader, true);
      inMemorySegments.add(segment);
      num--;
    }
    //LOG.info("create in mem 222");
  }
  return totalSize;
}
项目:lustre-connector-for-hadoop    文件:LustreFsShuffle.java   
@Override
public int compare(Segment<K, V> seg1, Segment<K, V> seg2) {
    if(seg1.getLength() < seg2.getLength()) {
        return -1;
    }
    else if(seg1.getLength() > seg2.getLength()) {
        return 1;
    }
    return SharedFsPlugins.getSegmentPath(seg1)
    .compareTo(SharedFsPlugins.getSegmentPath(seg2));
}
项目:lustre-connector-for-hadoop    文件:LustreFsShuffle.java   
@SuppressWarnings("unchecked")
public RawKeyValueIterator finish() throws Throwable {
    // merge config params
    Class<K> keyClass = (Class<K>) jobConf.getMapOutputKeyClass();
    Class<V> valueClass = (Class<V>) jobConf.getMapOutputValueClass();
    final RawComparator<K> comparator = (RawComparator<K>) jobConf.getOutputKeyComparator();

    // Wait for on-going merges to complete
    merger.close();

    LOG.info("finalMerge called with " + segmentsToBeMerged.size() + " on-disk map-outputs");

    List<Segment<K, V>> segments = new ArrayList<Segment<K, V>>();
    long onDiskBytes = 0;

    for (Segment<K, V> segment : segmentsToBeMerged) {
        long fileLength = segment.getLength();
        onDiskBytes += fileLength;
        LOG.debug("Disk file: " + segment + " Length is " + fileLength);
        segments.add(segment);
    }
    segmentsToBeMerged.clear();

    LOG.info("Merging " + segmentsToBeMerged.size() + " files, " + onDiskBytes + " bytes from disk");
    Collections.sort(segments, new Comparator<Segment<K, V>>() {

        public int compare(Segment<K, V> o1, Segment<K, V> o2) {
            if (o1.getLength() == o2.getLength()) {
                return 0;
            }
            return o1.getLength() < o2.getLength() ? -1 : 1;
        }
    });
    return Merger.merge(jobConf, lustrefs, keyClass, valueClass, segments, segments.size(), mergeTempDir,
                        comparator, reporter, spilledRecordsCounter, null, null);
}
项目:hadoop    文件:BackupStore.java   
public void mark() throws IOException {

    // We read one KV pair in advance in hasNext. 
    // If hasNext has read the next KV pair from a new segment, but the
    // user has not called next() for that KV, then reset the readSegmentIndex
    // to the previous segment

    if (nextKVOffset == 0) {
      assert (readSegmentIndex != 0);
      assert (currentKVOffset != 0);
      readSegmentIndex --;
    }

    // just drop segments before the current active segment

    int i = 0;
    Iterator<Segment<K,V>> itr = segmentList.iterator();
    while (itr.hasNext()) {
      Segment<K,V> s = itr.next();
      if (i == readSegmentIndex) {
        break;
      }
      s.close();
      itr.remove();
      i++;
      LOG.debug("Dropping a segment");
    }

    // FirstSegmentOffset is the offset in the current segment from where we
    // need to start reading on the next reset

    firstSegmentOffset = currentKVOffset;
    readSegmentIndex = 0;

    LOG.debug("Setting the FirsSegmentOffset to " + currentKVOffset);
  }
项目:hadoop    文件:BackupStore.java   
public void reset() throws IOException {

    // Create a new segment for the previously written records only if we
    // are not already in the reset mode

    if (!inReset) {
      if (fileCache.isActive) {
        fileCache.createInDiskSegment();
      } else {
        memCache.createInMemorySegment();
      }
    } 

    inReset = true;

    // Reset the segments to the correct position from where the next read
    // should begin. 
    for (int i = 0; i < segmentList.size(); i++) {
      Segment<K,V> s = segmentList.get(i);
      if (s.inMemory()) {
        int offset = (i == 0) ? firstSegmentOffset : 0;
        s.getReader().reset(offset);
      } else {
        s.closeReader();
        if (i == 0) {
          s.reinitReader(firstSegmentOffset);
          s.getReader().disableChecksumValidation();
        }
      }
    }

    currentKVOffset = firstSegmentOffset;
    nextKVOffset = -1;
    readSegmentIndex = 0;
    hasMore = false;
    lastSegmentEOF = false;

    LOG.debug("Reset - First segment offset is " + firstSegmentOffset +
        " Segment List Size is " + segmentList.size());
  }
项目:hadoop    文件:BackupStore.java   
private void clearSegmentList() throws IOException {
  for (Segment<K,V> segment: segmentList) {
    long len = segment.getLength();
    segment.close();
    if (segment.inMemory()) {
     memCache.unreserve(len);
    }
  }
  segmentList.clear();
}
项目:hadoop    文件:BackupStore.java   
/**
 * This method creates a memory segment from the existing buffer
 * @throws IOException
 */
void createInMemorySegment () throws IOException {

  // If nothing was written in this block because the record size
  // was greater than the allocated block size, just return.
  if (usedSize == 0) {
    ramManager.unreserve(blockSize);
    return;
  }

  // spaceAvailable would have ensured that there is enough space
  // left for the EOF markers.
  assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);

  WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
  WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);

  usedSize += EOF_MARKER_SIZE;

  ramManager.unreserve(blockSize - usedSize);

  Reader<K, V> reader = 
    new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null, 
        (org.apache.hadoop.mapred.TaskAttemptID) tid, 
        dataOut.getData(), 0, usedSize, conf);
  Segment<K, V> segment = new Segment<K, V>(reader, false);
  segmentList.add(segment);
  LOG.debug("Added Memory Segment to List. List Size is " + 
      segmentList.size());
}
项目:hadoop    文件:BackupStore.java   
void createInDiskSegment() throws IOException {
  assert (writer != null);
  writer.close();
  Segment<K,V> s = new Segment<K, V>(conf, fs, file, null, true);
  writer = null;
  segmentList.add(s);
  LOG.debug("Disk Segment added to List. Size is "  + segmentList.size());
}
项目:hadoop    文件:MergeManagerImpl.java   
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
  if (inputs == null || inputs.size() == 0) {
    return;
  }

  TaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
  long mergeOutputSize = 
    createInMemorySegments(inputs, inMemorySegments, 0);
  int noInMemorySegments = inMemorySegments.size();

  InMemoryMapOutput<K, V> mergedMapOutputs = 
    unconditionalReserve(dummyMapId, mergeOutputSize, false);

  Writer<K, V> writer = 
    new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());

  LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
           " segments of total-size: " + mergeOutputSize);

  RawKeyValueIterator rIter = 
    Merger.merge(jobConf, rfs,
                 (Class<K>)jobConf.getMapOutputKeyClass(),
                 (Class<V>)jobConf.getMapOutputValueClass(),
                 inMemorySegments, inMemorySegments.size(),
                 new Path(reduceId.toString()),
                 (RawComparator<K>)jobConf.getOutputKeyComparator(),
                 reporter, null, null, null);
  Merger.writeFile(rIter, writer, reporter, jobConf);
  writer.close();

  LOG.info(reduceId +  
           " Memory-to-Memory merge of the " + noInMemorySegments +
           " files in-memory complete.");

  // Note the output of the merge
  closeInMemoryMergedFile(mergedMapOutputs);
}
项目:hadoop    文件:TestMerger.java   
private List<Segment<Text, Text>> getUncompressedSegments() throws IOException {
  List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
  for (int i = 0; i < 2; i++) {
    segments.add(getUncompressedSegment(i));
  }
  return segments;
}
项目:hadoop    文件:TestMerger.java   
private List<Segment<Text, Text>> getCompressedSegments() throws IOException {
  List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
  for (int i = 0; i < 2; i++) {
    segments.add(getCompressedSegment(i));
  }
  return segments;
}
项目:aliyun-oss-hadoop-fs    文件:BackupStore.java   
public void mark() throws IOException {

    // We read one KV pair in advance in hasNext. 
    // If hasNext has read the next KV pair from a new segment, but the
    // user has not called next() for that KV, then reset the readSegmentIndex
    // to the previous segment

    if (nextKVOffset == 0) {
      assert (readSegmentIndex != 0);
      assert (currentKVOffset != 0);
      readSegmentIndex --;
    }

    // just drop segments before the current active segment

    int i = 0;
    Iterator<Segment<K,V>> itr = segmentList.iterator();
    while (itr.hasNext()) {
      Segment<K,V> s = itr.next();
      if (i == readSegmentIndex) {
        break;
      }
      s.close();
      itr.remove();
      i++;
      LOG.debug("Dropping a segment");
    }

    // FirstSegmentOffset is the offset in the current segment from where we
    // need to start reading on the next reset

    firstSegmentOffset = currentKVOffset;
    readSegmentIndex = 0;

    LOG.debug("Setting the FirsSegmentOffset to " + currentKVOffset);
  }
项目:aliyun-oss-hadoop-fs    文件:BackupStore.java   
public void reset() throws IOException {

    // Create a new segment for the previously written records only if we
    // are not already in the reset mode

    if (!inReset) {
      if (fileCache.isActive) {
        fileCache.createInDiskSegment();
      } else {
        memCache.createInMemorySegment();
      }
    } 

    inReset = true;

    // Reset the segments to the correct position from where the next read
    // should begin. 
    for (int i = 0; i < segmentList.size(); i++) {
      Segment<K,V> s = segmentList.get(i);
      if (s.inMemory()) {
        int offset = (i == 0) ? firstSegmentOffset : 0;
        s.getReader().reset(offset);
      } else {
        s.closeReader();
        if (i == 0) {
          s.reinitReader(firstSegmentOffset);
          s.getReader().disableChecksumValidation();
        }
      }
    }

    currentKVOffset = firstSegmentOffset;
    nextKVOffset = -1;
    readSegmentIndex = 0;
    hasMore = false;
    lastSegmentEOF = false;

    LOG.debug("Reset - First segment offset is " + firstSegmentOffset +
        " Segment List Size is " + segmentList.size());
  }
项目:aliyun-oss-hadoop-fs    文件:BackupStore.java   
private void clearSegmentList() throws IOException {
  for (Segment<K,V> segment: segmentList) {
    long len = segment.getLength();
    segment.close();
    if (segment.inMemory()) {
     memCache.unreserve(len);
    }
  }
  segmentList.clear();
}
项目:aliyun-oss-hadoop-fs    文件:BackupStore.java   
/**
 * This method creates a memory segment from the existing buffer
 * @throws IOException
 */
void createInMemorySegment () throws IOException {

  // If nothing was written in this block because the record size
  // was greater than the allocated block size, just return.
  if (usedSize == 0) {
    ramManager.unreserve(blockSize);
    return;
  }

  // spaceAvailable would have ensured that there is enough space
  // left for the EOF markers.
  assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);

  WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
  WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);

  usedSize += EOF_MARKER_SIZE;

  ramManager.unreserve(blockSize - usedSize);

  Reader<K, V> reader = 
    new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null, 
        (org.apache.hadoop.mapred.TaskAttemptID) tid, 
        dataOut.getData(), 0, usedSize, conf);
  Segment<K, V> segment = new Segment<K, V>(reader, false);
  segmentList.add(segment);
  LOG.debug("Added Memory Segment to List. List Size is " + 
      segmentList.size());
}
项目:aliyun-oss-hadoop-fs    文件:BackupStore.java   
void createInDiskSegment() throws IOException {
  assert (writer != null);
  writer.close();
  Segment<K,V> s = new Segment<K, V>(conf, fs, file, null, true);
  writer = null;
  segmentList.add(s);
  LOG.debug("Disk Segment added to List. Size is "  + segmentList.size());
}
项目:aliyun-oss-hadoop-fs    文件:MergeManagerImpl.java   
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
  if (inputs == null || inputs.size() == 0) {
    return;
  }

  TaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
  long mergeOutputSize = 
    createInMemorySegments(inputs, inMemorySegments, 0);
  int noInMemorySegments = inMemorySegments.size();

  InMemoryMapOutput<K, V> mergedMapOutputs = 
    unconditionalReserve(dummyMapId, mergeOutputSize, false);

  Writer<K, V> writer = 
    new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());

  LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
           " segments of total-size: " + mergeOutputSize);

  RawKeyValueIterator rIter = 
    Merger.merge(jobConf, rfs,
                 (Class<K>)jobConf.getMapOutputKeyClass(),
                 (Class<V>)jobConf.getMapOutputValueClass(),
                 inMemorySegments, inMemorySegments.size(),
                 new Path(reduceId.toString()),
                 (RawComparator<K>)jobConf.getOutputKeyComparator(),
                 reporter, null, null, null);
  Merger.writeFile(rIter, writer, reporter, jobConf);
  writer.close();

  LOG.info(reduceId +  
           " Memory-to-Memory merge of the " + noInMemorySegments +
           " files in-memory complete.");

  // Note the output of the merge
  closeInMemoryMergedFile(mergedMapOutputs);
}
项目:aliyun-oss-hadoop-fs    文件:TestMerger.java   
private List<Segment<Text, Text>> getUncompressedSegments() throws IOException {
  List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
  for (int i = 0; i < 2; i++) {
    segments.add(getUncompressedSegment(i));
  }
  return segments;
}
项目:aliyun-oss-hadoop-fs    文件:TestMerger.java   
private List<Segment<Text, Text>> getCompressedSegments() throws IOException {
  List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
  for (int i = 0; i < 2; i++) {
    segments.add(getCompressedSegment(i));
  }
  return segments;
}
项目:big-c    文件:BackupStore.java   
public void mark() throws IOException {

    // We read one KV pair in advance in hasNext. 
    // If hasNext has read the next KV pair from a new segment, but the
    // user has not called next() for that KV, then reset the readSegmentIndex
    // to the previous segment

    if (nextKVOffset == 0) {
      assert (readSegmentIndex != 0);
      assert (currentKVOffset != 0);
      readSegmentIndex --;
    }

    // just drop segments before the current active segment

    int i = 0;
    Iterator<Segment<K,V>> itr = segmentList.iterator();
    while (itr.hasNext()) {
      Segment<K,V> s = itr.next();
      if (i == readSegmentIndex) {
        break;
      }
      s.close();
      itr.remove();
      i++;
      LOG.debug("Dropping a segment");
    }

    // FirstSegmentOffset is the offset in the current segment from where we
    // need to start reading on the next reset

    firstSegmentOffset = currentKVOffset;
    readSegmentIndex = 0;

    LOG.debug("Setting the FirsSegmentOffset to " + currentKVOffset);
  }
项目:big-c    文件:BackupStore.java   
public void reset() throws IOException {

    // Create a new segment for the previously written records only if we
    // are not already in the reset mode

    if (!inReset) {
      if (fileCache.isActive) {
        fileCache.createInDiskSegment();
      } else {
        memCache.createInMemorySegment();
      }
    } 

    inReset = true;

    // Reset the segments to the correct position from where the next read
    // should begin. 
    for (int i = 0; i < segmentList.size(); i++) {
      Segment<K,V> s = segmentList.get(i);
      if (s.inMemory()) {
        int offset = (i == 0) ? firstSegmentOffset : 0;
        s.getReader().reset(offset);
      } else {
        s.closeReader();
        if (i == 0) {
          s.reinitReader(firstSegmentOffset);
          s.getReader().disableChecksumValidation();
        }
      }
    }

    currentKVOffset = firstSegmentOffset;
    nextKVOffset = -1;
    readSegmentIndex = 0;
    hasMore = false;
    lastSegmentEOF = false;

    LOG.debug("Reset - First segment offset is " + firstSegmentOffset +
        " Segment List Size is " + segmentList.size());
  }
项目:big-c    文件:BackupStore.java   
private void clearSegmentList() throws IOException {
  for (Segment<K,V> segment: segmentList) {
    long len = segment.getLength();
    segment.close();
    if (segment.inMemory()) {
     memCache.unreserve(len);
    }
  }
  segmentList.clear();
}
项目:big-c    文件:BackupStore.java   
/**
 * This method creates a memory segment from the existing buffer
 * @throws IOException
 */
void createInMemorySegment () throws IOException {

  // If nothing was written in this block because the record size
  // was greater than the allocated block size, just return.
  if (usedSize == 0) {
    ramManager.unreserve(blockSize);
    return;
  }

  // spaceAvailable would have ensured that there is enough space
  // left for the EOF markers.
  assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);

  WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
  WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);

  usedSize += EOF_MARKER_SIZE;

  ramManager.unreserve(blockSize - usedSize);

  Reader<K, V> reader = 
    new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null, 
        (org.apache.hadoop.mapred.TaskAttemptID) tid, 
        dataOut.getData(), 0, usedSize, conf);
  Segment<K, V> segment = new Segment<K, V>(reader, false);
  segmentList.add(segment);
  LOG.debug("Added Memory Segment to List. List Size is " + 
      segmentList.size());
}
项目:big-c    文件:BackupStore.java   
void createInDiskSegment() throws IOException {
  assert (writer != null);
  writer.close();
  Segment<K,V> s = new Segment<K, V>(conf, fs, file, null, true);
  writer = null;
  segmentList.add(s);
  LOG.debug("Disk Segment added to List. Size is "  + segmentList.size());
}
项目:big-c    文件:MergeManagerImpl.java   
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
  if (inputs == null || inputs.size() == 0) {
    return;
  }

  TaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
  long mergeOutputSize = 
    createInMemorySegments(inputs, inMemorySegments, 0);
  int noInMemorySegments = inMemorySegments.size();

  InMemoryMapOutput<K, V> mergedMapOutputs = 
    unconditionalReserve(dummyMapId, mergeOutputSize, false);

  Writer<K, V> writer = 
    new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());

  LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
           " segments of total-size: " + mergeOutputSize);

  RawKeyValueIterator rIter = 
    Merger.merge(jobConf, rfs,
                 (Class<K>)jobConf.getMapOutputKeyClass(),
                 (Class<V>)jobConf.getMapOutputValueClass(),
                 inMemorySegments, inMemorySegments.size(),
                 new Path(reduceId.toString()),
                 (RawComparator<K>)jobConf.getOutputKeyComparator(),
                 reporter, null, null, null);
  Merger.writeFile(rIter, writer, reporter, jobConf);
  writer.close();

  LOG.info(reduceId +  
           " Memory-to-Memory merge of the " + noInMemorySegments +
           " files in-memory complete.");

  // Note the output of the merge
  closeInMemoryMergedFile(mergedMapOutputs);
}
项目:big-c    文件:TestMerger.java   
private List<Segment<Text, Text>> getUncompressedSegments() throws IOException {
  List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
  for (int i = 0; i < 2; i++) {
    segments.add(getUncompressedSegment(i));
  }
  return segments;
}
项目:big-c    文件:TestMerger.java   
private List<Segment<Text, Text>> getCompressedSegments() throws IOException {
  List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
  for (int i = 0; i < 2; i++) {
    segments.add(getCompressedSegment(i));
  }
  return segments;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:BackupStore.java   
public void mark() throws IOException {

    // We read one KV pair in advance in hasNext. 
    // If hasNext has read the next KV pair from a new segment, but the
    // user has not called next() for that KV, then reset the readSegmentIndex
    // to the previous segment

    if (nextKVOffset == 0) {
      assert (readSegmentIndex != 0);
      assert (currentKVOffset != 0);
      readSegmentIndex --;
    }

    // just drop segments before the current active segment

    int i = 0;
    Iterator<Segment<K,V>> itr = segmentList.iterator();
    while (itr.hasNext()) {
      Segment<K,V> s = itr.next();
      if (i == readSegmentIndex) {
        break;
      }
      s.close();
      itr.remove();
      i++;
      LOG.debug("Dropping a segment");
    }

    // FirstSegmentOffset is the offset in the current segment from where we
    // need to start reading on the next reset

    firstSegmentOffset = currentKVOffset;
    readSegmentIndex = 0;

    LOG.debug("Setting the FirsSegmentOffset to " + currentKVOffset);
  }
项目:hadoop-2.6.0-cdh5.4.3    文件:BackupStore.java   
public void reset() throws IOException {

    // Create a new segment for the previously written records only if we
    // are not already in the reset mode

    if (!inReset) {
      if (fileCache.isActive) {
        fileCache.createInDiskSegment();
      } else {
        memCache.createInMemorySegment();
      }
    } 

    inReset = true;

    // Reset the segments to the correct position from where the next read
    // should begin. 
    for (int i = 0; i < segmentList.size(); i++) {
      Segment<K,V> s = segmentList.get(i);
      if (s.inMemory()) {
        int offset = (i == 0) ? firstSegmentOffset : 0;
        s.getReader().reset(offset);
      } else {
        s.closeReader();
        if (i == 0) {
          s.reinitReader(firstSegmentOffset);
          s.getReader().disableChecksumValidation();
        }
      }
    }

    currentKVOffset = firstSegmentOffset;
    nextKVOffset = -1;
    readSegmentIndex = 0;
    hasMore = false;
    lastSegmentEOF = false;

    LOG.debug("Reset - First segment offset is " + firstSegmentOffset +
        " Segment List Size is " + segmentList.size());
  }
项目:hadoop-2.6.0-cdh5.4.3    文件:BackupStore.java   
private void clearSegmentList() throws IOException {
  for (Segment<K,V> segment: segmentList) {
    long len = segment.getLength();
    segment.close();
    if (segment.inMemory()) {
     memCache.unreserve(len);
    }
  }
  segmentList.clear();
}
项目:hadoop-2.6.0-cdh5.4.3    文件:BackupStore.java   
/**
 * This method creates a memory segment from the existing buffer
 * @throws IOException
 */
void createInMemorySegment () throws IOException {

  // If nothing was written in this block because the record size
  // was greater than the allocated block size, just return.
  if (usedSize == 0) {
    ramManager.unreserve(blockSize);
    return;
  }

  // spaceAvailable would have ensured that there is enough space
  // left for the EOF markers.
  assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);

  WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
  WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);

  usedSize += EOF_MARKER_SIZE;

  ramManager.unreserve(blockSize - usedSize);

  Reader<K, V> reader = 
    new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null, 
        (org.apache.hadoop.mapred.TaskAttemptID) tid, 
        dataOut.getData(), 0, usedSize, conf);
  Segment<K, V> segment = new Segment<K, V>(reader, false);
  segmentList.add(segment);
  LOG.debug("Added Memory Segment to List. List Size is " + 
      segmentList.size());
}
项目:hadoop-2.6.0-cdh5.4.3    文件:BackupStore.java   
void createInDiskSegment() throws IOException {
  assert (writer != null);
  writer.close();
  Segment<K,V> s = new Segment<K, V>(conf, fs, file, null, true);
  writer = null;
  segmentList.add(s);
  LOG.debug("Disk Segment added to List. Size is "  + segmentList.size());
}
项目:hadoop-2.6.0-cdh5.4.3    文件:MergeManagerImpl.java   
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
  if (inputs == null || inputs.size() == 0) {
    return;
  }

  TaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
  long mergeOutputSize = 
    createInMemorySegments(inputs, inMemorySegments, 0);
  int noInMemorySegments = inMemorySegments.size();

  InMemoryMapOutput<K, V> mergedMapOutputs = 
    unconditionalReserve(dummyMapId, mergeOutputSize, false);

  Writer<K, V> writer = 
    new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());

  LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
           " segments of total-size: " + mergeOutputSize);

  RawKeyValueIterator rIter = 
    Merger.merge(jobConf, rfs,
                 (Class<K>)jobConf.getMapOutputKeyClass(),
                 (Class<V>)jobConf.getMapOutputValueClass(),
                 inMemorySegments, inMemorySegments.size(),
                 new Path(reduceId.toString()),
                 (RawComparator<K>)jobConf.getOutputKeyComparator(),
                 reporter, null, null, null);
  Merger.writeFile(rIter, writer, reporter, jobConf);
  writer.close();

  LOG.info(reduceId +  
           " Memory-to-Memory merge of the " + noInMemorySegments +
           " files in-memory complete.");

  // Note the output of the merge
  closeInMemoryMergedFile(mergedMapOutputs);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestMerger.java   
private List<Segment<Text, Text>> getUncompressedSegments() throws IOException {
  List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
  for (int i = 0; i < 2; i++) {
    segments.add(getUncompressedSegment(i));
  }
  return segments;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestMerger.java   
private List<Segment<Text, Text>> getCompressedSegments() throws IOException {
  List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
  for (int i = 0; i < 2; i++) {
    segments.add(getCompressedSegment(i));
  }
  return segments;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestMerger.java   
private List<Segment<Text, Text>> getUncompressedSegments() throws IOException {
  List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
  for (int i = 1; i < 10; i++) {
    segments.add(getUncompressedSegment(i));
  }
  return segments;
}