Java 类org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume 实例源码

项目:hadoop-EAR    文件:TestDirectoryScannerInlineFiles.java   
/** Create block file and corresponding metafile in a rondom volume */
private long createInlineBlockFile(int checksumType) throws IOException {
  FSVolume[] volumes = data.volumes.getVolumes();
  int index = rand.nextInt(volumes.length - 1);
  long id = getFreeBlockId();
  File finalizedDir = volumes[index].getNamespaceSlice(nsid).getCurrentDir();
  int checksumSize = DataChecksum.getChecksumSizeByType(checksumType);
  String inlineFileName = getInlineBlockFileName(id, checksumType, checksumSize);
  File file = new File(finalizedDir, inlineFileName);
  assertTrue(file.createNewFile());
  PrintWriter pw = new PrintWriter(file);
  int desiredLength = (int)BlockInlineChecksumReader.getFileLengthFromBlockSize(1, 1, checksumSize);
  for(int i = 0; i < desiredLength; i++) {
    pw.write(Character.getNumericValue('0'));
  }
  pw.close();
  LOG.info("Created block file " + file.getName());
  return id;
}
项目:hadoop-EAR    文件:DirectoryScanner.java   
static DiskScanInfo getSeparateFilesLayoutScanInfo(long blockId,
    FileInfo blockFileInfo, FileInfo metaFileInfo, FSVolume vol) {
  File metaFile = null;
  long genStamp = Block.GRANDFATHER_GENERATION_STAMP;
  if (metaFileInfo != null) {
    metaFile = metaFileInfo.file;
    genStamp = metaFileInfo.getStamp;

  }
  File blockFile = null;
  long fileLength = 0;
  if (blockFileInfo != null) {
    blockFile = blockFileInfo.file;
    fileLength = blockFile.length();
  }
  return new DiskScanInfo(SEPARATE_FILES_LAYOUT, blockId, blockFile,
      metaFile, vol, fileLength, genStamp);
}
项目:hadoop-EAR    文件:DirectoryScanner.java   
static DiskScanInfo getInlineFilesLayoutScanInfo(long blockId,
    FileInfo singleFileInfo, FSVolume vol) {
  String[] groundSeparated = StringUtils
      .split(singleFileInfo.fileName, '_');
  if (groundSeparated.length != 6) {
    throw new IllegalStateException("FileName \"" + singleFileInfo.fileName
        + "\" doesn't " + "reflect new layout format!");
  }
  int checksumType = Integer.parseInt(groundSeparated[4]);
  int bytesPerChecksum = Integer.parseInt(groundSeparated[5]);
  long fileLength = BlockInlineChecksumReader.getBlockSizeFromFileLength(
      singleFileInfo.file.length(), checksumType, bytesPerChecksum);
  return new DiskScanInfo(INLINE_CHECKSUM_LAYOUT, blockId,
      singleFileInfo.file, singleFileInfo.file, vol, fileLength,
      singleFileInfo.getStamp);
}
项目:hadoop-EAR    文件:DataNode.java   
/**
 * Returned information is a JSON representation of a map with
 * volume name as the key and value is a map of volume attribute
 * keys to its values
 */
@Override // DataNodeMXBean
public String getVolumeInfo() {
  final Map<String, Object> info = new HashMap<String, Object>();
  try {
    FSVolume[] volumes = ((FSDataset)this.data).volumes.getVolumes();
    for (FSVolume v : volumes) {
      final Map<String, Object> innerInfo = new HashMap<String, Object>();
      innerInfo.put("usedSpace", v.getDfsUsed());
      innerInfo.put("freeSpace", v.getAvailable());
      innerInfo.put("reservedSpace", v.getReserved());
      info.put(v.getDir().toString(), innerInfo);
    }
    return JSON.toString(info);
  } catch (IOException e) {
    LOG.info("Cannot get volume info.", e);
    return "ERROR";
  }
}
项目:hadoop-EAR    文件:NamespaceMap.java   
synchronized int removeUnhealthyVolumes(Collection<FSVolume> failed_vols,
    FSDatasetDeltaInterface datasetDelta) {
  int removed_blocks = 0;

  Iterator<Entry<Block, DatanodeBlockInfo>> dbi = blockInfoMap.entrySet()
      .iterator();
  while (dbi.hasNext()) {
    Entry<Block, DatanodeBlockInfo> entry = dbi.next();
    for (FSVolume v : failed_vols) {
      if (entry.getValue().getBlockDataFile().getVolume() == v) {
        DataNode.LOG.warn("removing block " + entry.getKey().getBlockId()
            + " from vol " + v.toString() + ", form namespace: "
            + namespaceId);
        dbi.remove();
        if (datasetDelta != null) {
          datasetDelta.removeBlock(namespaceId, entry.getKey());
        }
        removed_blocks++;
        break;
      }
    }
  }
  return removed_blocks;
}
项目:hadoop-EAR    文件:NamespaceMap.java   
/**
 * get a list of block info with CRC information per FS volume.
 * 
 * @param volumes
 *          Volumes are interested in get the list
 * @return a map from FSVolume to buckets -> (Block -> DatanodeBlockInfo) in
 *         the volume and has CRC information. The first level value is a
 *         list, each one on the list is for a bucket. The order on the list
 *         is the bucket ID. The third level is a map from block to datablock
 *         info.
 */
Map<FSVolume, List<Map<Block, DatanodeBlockInfo>>> getBlockCrcPerVolume(
    List<FSVolume> volumes) {
  Map<FSVolume, List<Map<Block, DatanodeBlockInfo>>> retMap =
      new HashMap<FSVolume, List<Map<Block, DatanodeBlockInfo>>>();
  for (FSVolume volume : volumes) {
    List<Map<Block, DatanodeBlockInfo>> newSubMap = new ArrayList<Map<Block, DatanodeBlockInfo>>(
        numBucket);
    for (int i = 0; i < numBucket; i++) {
      newSubMap.add(new HashMap<Block, DatanodeBlockInfo>());
    }
    retMap.put(volume, newSubMap);
  }
  for (BlockBucket bb : blockBuckets) {
    bb.getBlockCrcPerVolume(retMap);
  }
  return retMap;
}
项目:cumulus    文件:TestWriteToReplica.java   
private void setup(FSDataset dataSet) throws IOException {
  // setup replicas map
  ReplicasMap replicasMap = dataSet.volumeMap;
  FSVolume vol = dataSet.volumes.getNextVolume(0);
  ReplicaInfo replicaInfo = new FinalizedReplica(
      blocks[FINALIZED], vol, vol.getDir());
  replicasMap.add(replicaInfo);
  replicaInfo.getBlockFile().createNewFile();
  replicaInfo.getMetaFile().createNewFile();

  replicasMap.add(new ReplicaInPipeline(
      blocks[TEMPORARY].getBlockId(),
      blocks[TEMPORARY].getGenerationStamp(), vol, 
      vol.createTmpFile(blocks[TEMPORARY]).getParentFile()));

  replicaInfo = new ReplicaBeingWritten(blocks[RBW], vol, 
      vol.createRbwFile(blocks[RBW]).getParentFile(), null);
  replicasMap.add(replicaInfo);
  replicaInfo.getBlockFile().createNewFile();
  replicaInfo.getMetaFile().createNewFile();

  replicasMap.add(new ReplicaWaitingToBeRecovered(blocks[RWR], vol, 
      vol.createRbwFile(blocks[RWR]).getParentFile()));
  replicasMap.add(new ReplicaUnderRecovery(
      new FinalizedReplica(blocks[RUR], vol, vol.getDir()), 2007));    
}
项目:RDFS    文件:DataNode.java   
/**
 * Returned information is a JSON representation of a map with
 * volume name as the key and value is a map of volume attribute
 * keys to its values
 */
@Override // DataNodeMXBean
public String getVolumeInfo() {
  final Map<String, Object> info = new HashMap<String, Object>();
  try {
    FSVolume[] volumes = ((FSDataset)this.data).volumes.getVolumes();
    for (FSVolume v : volumes) {
      final Map<String, Object> innerInfo = new HashMap<String, Object>();
      innerInfo.put("usedSpace", v.getDfsUsed());
      innerInfo.put("freeSpace", v.getAvailable());
      innerInfo.put("reservedSpace", v.getReserved());
      info.put(v.getDir().toString(), innerInfo);
    }
    return JSON.toString(info);
  } catch (IOException e) {
    LOG.info("Cannot get volume info.", e);
    return "ERROR";
  }
}
项目:RDFS    文件:VolumeMap.java   
synchronized int removeUnhealthyVolumes(Collection<FSVolume> failed_vols) {
  int removed_blocks = 0;

  for (Integer namespaceId : namespaceMap.keySet()) {
    Map<Block, DatanodeBlockInfo> m = namespaceMap.get(namespaceId);
    Iterator<Entry<Block, DatanodeBlockInfo>> dbi = m.entrySet().iterator();
    while (dbi.hasNext()) {
      Entry<Block, DatanodeBlockInfo> entry = dbi.next();
      for (FSVolume v : failed_vols) {
        if (entry.getValue().getVolume() == v) {
          DataNode.LOG.warn("removing block " + entry.getKey().getBlockId()
              + " from vol " + v.toString() + ", form namespace: "
              + namespaceId);
          dbi.remove();
          removed_blocks++;
          break;
        }
      }
    }
  }
  return removed_blocks;
}
项目:hadoop-EAR    文件:TestDirectoryScannerInlineFiles.java   
@BeforeClass
public static void setUpCluster() {
  LOG.info("setting up!");
  Configuration CONF = new Configuration();
  CONF.setLong("dfs.block.size", 100);
  CONF.setInt("io.bytes.per.checksum", 1);
  CONF.setLong("dfs.heartbeat.interval", 1L);
  CONF.setInt("dfs.datanode.directoryscan.interval", 1000);

  try{
    cluster = new MiniDFSCluster(CONF, 1, true, null);
    cluster.waitActive();

    dn = cluster.getDataNodes().get(0);
    nsid = dn.getAllNamespaces()[0];
    scanner = dn.directoryScanner;
    data = (FSDataset)dn.data;
    Field f = DirectoryScanner.class.getDeclaredField("delta");
    f.setAccessible(true);
    delta = (FSDatasetDelta)f.get(scanner);

    fs = cluster.getFileSystem();

    List<File> volumes = new ArrayList<File>();
    for(FSVolume vol : data.volumes.getVolumes()) {
      volumes.add(vol.getDir());
    }
    data.asyncDiskService = new TestDirectoryScanner.FSDatasetAsyncDiscServiceMock(
        volumes.toArray(new File[volumes.size()]), CONF);

  } catch (Exception e) {
    e.printStackTrace();
    fail("setup failed");
  }
}
项目:hadoop-EAR    文件:TestDirectoryScanner.java   
/** Create a block file in a random volume*/
private long createBlockFile() throws IOException {
  FSVolume[] volumes = fds.volumes.getVolumes();
  int index = rand.nextInt(volumes.length - 1);
  long id = getFreeBlockId();
  File finalizedDir = volumes[index].getNamespaceSlice(nsid).getCurrentDir();
  File file = new File(finalizedDir, getBlockFile(id));
  if (file.createNewFile()) {
    LOG.info("Created block file " + file.getName());
  }
  return id;
}
项目:hadoop-EAR    文件:TestDirectoryScanner.java   
/** Create a metafile in a random volume*/
private long createMetaFile() throws IOException {
  FSVolume[] volumes = fds.volumes.getVolumes();
  int index = rand.nextInt(volumes.length - 1);
  long id = getFreeBlockId();
  File finalizedDir = volumes[index].getNamespaceSlice(nsid).getCurrentDir();
  File file = new File(finalizedDir, getMetaFile(id));
  if (file.createNewFile()) {
    LOG.info("Created metafile " + file.getName());
  }
  return id;
}
项目:hadoop-EAR    文件:TestDirectoryScanner.java   
/** Create block file and corresponding metafile in a rondom volume */
private long createBlockMetaFile() throws IOException {
  FSVolume[] volumes = fds.volumes.getVolumes();
  int index = rand.nextInt(volumes.length - 1);
  long id = getFreeBlockId();
  File finalizedDir = volumes[index].getNamespaceSlice(nsid).getCurrentDir();
  File file = new File(finalizedDir, getBlockFile(id));
  if (file.createNewFile()) {
    LOG.info("Created block file " + file.getName());

    // Create files with same prefix as block file but extension names
    // such that during sorting, these files appear around meta file
    // to test how DirectoryScanner handles extraneous files
    String name1 = file.getAbsolutePath() + ".l";
    String name2 = file.getAbsolutePath() + ".n";
    file = new File(name1);
    if (file.createNewFile()) {
      LOG.info("Created extraneous file " + name1);
    }

    file = new File(name2);
    if (file.createNewFile()) {
      LOG.info("Created extraneous file " + name2);
    }

    file = new File(finalizedDir, getMetaFile(id));
    if (file.createNewFile()) {
      LOG.info("Created metafile " + file.getName());
    }
  }
  return id;
}
项目:hadoop-EAR    文件:DatanodeBlockInfo.java   
DatanodeBlockInfo(FSVolume vol, File file, long finalizedSize,
    boolean visible, boolean inlineChecksum, int checksumType,
    int bytesPerChecksum, boolean blockCrcValid, int blockCrc) {
  this.finalizedSize = finalizedSize;
  detached = false;
  this.visible = visible;
  this.inlineChecksum = inlineChecksum;
  this.checksumType = checksumType;
  this.bytesPerChecksum = bytesPerChecksum;
  this.blockCrcValid = blockCrcValid;
  this.blockCrc = blockCrc;
  this.block = null;
  this.blockDataFile = new BlockDataFile(file, vol);
}
项目:hadoop-EAR    文件:DirectoryScanner.java   
DiskScanInfo(int layout, long blockId, File blockFile, File metaFile,
    FSVolume vol, long fileLength, long genStamp) {
  this.blockId = blockId;
  this.metaFile = metaFile;
  this.blockFile = blockFile;
  this.volume = vol;
  this.fileLength = fileLength;
  this.genStamp = genStamp;
  this.layout = layout;
}
项目:hadoop-EAR    文件:VolumeMap.java   
synchronized int removeUnhealthyVolumes(Collection<FSVolume> failed_vols) {
  int removed_blocks = 0;

  for (Integer namespaceId : nsMap.keySet()) {
    removed_blocks += nsMap.get(namespaceId).removeUnhealthyVolumes(
        failed_vols, datasetDelta);
  }
  return removed_blocks;
}
项目:hadoop-EAR    文件:NamespaceMap.java   
synchronized void getBlockCrcPerVolume(
    Map<FSVolume, List<Map<Block, DatanodeBlockInfo>>> fsVolumeMap) {
  for (Map.Entry<Block, DatanodeBlockInfo> entry: blockInfoMap.entrySet()) {
    Block block = entry.getKey();
    DatanodeBlockInfo binfo = entry.getValue();
    if (fsVolumeMap.containsKey(binfo.getBlockDataFile().getVolume())
        && binfo.hasBlockCrcInfo()) {
      fsVolumeMap.get(binfo.getBlockDataFile().getVolume()).get(bucketId)
          .put(block, binfo);
    }
  }
}
项目:hadoop-EAR    文件:NamespaceMap.java   
int removeUnhealthyVolumes(Collection<FSVolume> failed_vols,
    FSDatasetDeltaInterface datasetDelta) {
  int removed_blocks = 0;

  for (BlockBucket blockBucket : blockBuckets) {
    removed_blocks += blockBucket.removeUnhealthyVolumes(failed_vols,
        datasetDelta);
  }
  return removed_blocks;
}
项目:cumulus    文件:TestDirectoryScanner.java   
/** Create a block file in a random volume*/
private long createBlockFile() throws IOException {
  FSVolume[] volumes = fds.volumes.volumes;
  int index = rand.nextInt(volumes.length - 1);
  long id = getFreeBlockId();
  File file = new File(volumes[index].getDir().getPath(), getBlockFile(id));
  if (file.createNewFile()) {
    LOG.info("Created block file " + file.getName());
  }
  return id;
}
项目:cumulus    文件:TestDirectoryScanner.java   
/** Create a metafile in a random volume*/
private long createMetaFile() throws IOException {
  FSVolume[] volumes = fds.volumes.volumes;
  int index = rand.nextInt(volumes.length - 1);
  long id = getFreeBlockId();
  File file = new File(volumes[index].getDir().getPath(), getMetaFile(id));
  if (file.createNewFile()) {
    LOG.info("Created metafile " + file.getName());
  }
  return id;
}
项目:cumulus    文件:TestDirectoryScanner.java   
/** Create block file and corresponding metafile in a rondom volume */
private long createBlockMetaFile() throws IOException {
  FSVolume[] volumes = fds.volumes.volumes;
  int index = rand.nextInt(volumes.length - 1);
  long id = getFreeBlockId();
  File file = new File(volumes[index].getDir().getPath(), getBlockFile(id));
  if (file.createNewFile()) {
    LOG.info("Created block file " + file.getName());

    // Create files with same prefix as block file but extension names
    // such that during sorting, these files appear around meta file
    // to test how DirectoryScanner handles extraneous files
    String name1 = file.getAbsolutePath() + ".l";
    String name2 = file.getAbsolutePath() + ".n";
    file = new File(name1);
    if (file.createNewFile()) {
      LOG.info("Created extraneous file " + name1);
    }

    file = new File(name2);
    if (file.createNewFile()) {
      LOG.info("Created extraneous file " + name2);
    }

    file = new File(volumes[index].getDir().getPath(), getMetaFile(id));
    if (file.createNewFile()) {
      LOG.info("Created metafile " + file.getName());
    }
  }
  return id;
}
项目:hadoop-EAR    文件:TestDatanodeRestart.java   
private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt)
    throws IOException {
  FSDataOutputStream out = null;
  try {
    FileSystem fs = cluster.getFileSystem();
    NamespaceInfo nsInfo = cluster.getNameNode().versionRequest();
    final int fileLen = 515;
    // create some rbw replicas on disk
    byte[] writeBuf = new byte[fileLen];
    new Random().nextBytes(writeBuf);
    final Path src = new Path("/test.txt");
    out = fs.create(src);
    out.write(writeBuf);
    out.sync();
    DataNode dn = cluster.getDataNodes().get(0);
    // corrupt rbw replicas
    for (FSVolume volume : ((FSDataset) dn.data).volumes.getVolumes()) {
      File rbwDir = volume.getRbwDir(nsInfo.getNamespaceID());
      for (File file : rbwDir.listFiles()) {
        if (isCorrupt) {
          if (Block.isSeparateChecksumBlockFilename(file.getName())) {
            new RandomAccessFile(file, "rw").setLength(fileLen - 1); // corrupt
          } else if (Block.isInlineChecksumBlockFilename(file.getName())) {
            new RandomAccessFile(file, "rw").setLength(file.length() - 1); // corrupt
          }
        }
      }
    }
    cluster.restartDataNodes();
    cluster.waitActive();
    dn = cluster.getDataNodes().get(0);

    // check volumeMap: one rbw replica
    NamespaceMap volumeMap = ((FSDataset) (dn.data)).volumeMap
        .getNamespaceMap(nsInfo.getNamespaceID());
    assertEquals(1, volumeMap.size());
    Block replica = null;
    for (int i = 0; i < volumeMap.getNumBucket(); i++) {
      Set<Block> blockSet = volumeMap.getBucket(i).blockInfoMap.keySet();
      if (blockSet.isEmpty()) {
        continue;
      }
      Block r = blockSet.iterator().next();
      if (r != null) {
        replica = r;
        break;
      }
    }
    if (isCorrupt) {
      assertEquals((fileLen - 1), replica.getNumBytes());
    } else {
      assertEquals(fileLen, replica.getNumBytes());
    }
    dn.data.invalidate(nsInfo.getNamespaceID(), new Block[] { replica });
    fs.delete(src, false);
  } finally {
    IOUtils.closeStream(out);
  }
}
项目:hadoop-EAR    文件:TestDirectoryScanner.java   
@Override
void deleteAsyncFile(FSVolume volume, File file) {
  DataNode.LOG.info("Scheduling file " + file.toString() + " for deletion");
  new FileDeleteTask(volume, file).run();
}
项目:hadoop-EAR    文件:DirectoryScanner.java   
FSVolume getVolume() {
  return volume;
}
项目:hadoop-EAR    文件:DirectoryScanner.java   
FSVolume getVolume() {
  return diskScanInfo.getVolume();
}
项目:hadoop-EAR    文件:DirectoryScanner.java   
public ReportCompiler(FSVolume volume, DataNode datanode) {
  this.volume = volume;
  this.datanode = datanode;
}
项目:hadoop-EAR    文件:DataBlockScanner.java   
void init() throws IOException {
  // get the list of blocks and arrange them in random order
  Block arr[] = dataset.getBlockReport(namespaceId);
  Collections.shuffle(Arrays.asList(arr));

  blockInfoSet = new LightWeightLinkedSet<BlockScanInfo>();
  blockMap = new HashMap<Block, BlockScanInfo>();

  long scanTime = -1;
  for (Block block : arr) {
    BlockScanInfo info = new BlockScanInfo( block );
    info.lastScanTime = scanTime--; 
    //still keep 'info.lastScanType' to NONE.
    addBlockInfo(info);
  }

  /* Pick the first directory that has any existing scanner log.
   * otherwise, pick the first directory.
   */
  File dir = null;
  FSDataset.FSVolume[] volumes = dataset.volumes.getVolumes();
  for(FSDataset.FSVolume vol : volumes) { 
    File nsDir = vol.getNamespaceSlice(namespaceId).getDirectory();
    if (LogFileHandler.isFilePresent(nsDir, verificationLogFile)) {
      dir = nsDir;
      break;
    }
  }
  if (dir == null) {
    dir = volumes[0].getNamespaceSlice(namespaceId).getDirectory();
  }

  try {
    // max lines will be updated later during initialization.
    verificationLog = new LogFileHandler(dir, verificationLogFile, 100);
  } catch (IOException e) {
    LOG.warn("Could not open verfication log. " +
             "Verification times are not stored.");
  }

  synchronized (this) {
    throttler = new DataTransferThrottler(MAX_SCAN_RATE);
  }
}
项目:hadoop-EAR    文件:DataBlockScanner.java   
static File getCurrentFile(FSVolume vol, int namespaceId) throws IOException {
  return LogFileHandler.getCurrentFile(vol.getNamespaceSlice(namespaceId).getDirectory(),
      DataBlockScanner.verificationLogFile);
}
项目:hadoop-EAR    文件:BlockDataFile.java   
public BlockDataFile (File file, FSVolume volume) {
  this.file = file;
  this.volume = volume;
}
项目:hadoop-EAR    文件:BlockDataFile.java   
public FSVolume getVolume() {
  return volume;
}
项目:hadoop-on-lustre    文件:DatanodeBlockInfo.java   
DatanodeBlockInfo(FSVolume vol, File file) {
  this.volume = vol;
  this.file = file;
  detached = false;
}
项目:hadoop-on-lustre    文件:DatanodeBlockInfo.java   
DatanodeBlockInfo(FSVolume vol) {
  this.volume = vol;
  this.file = null;
  detached = false;
}
项目:hadoop-on-lustre    文件:DatanodeBlockInfo.java   
FSVolume getVolume() {
  return volume;
}
项目:cumulus    文件:DirectoryScanner.java   
ScanInfo(long blockId, File blockFile, File metaFile, FSVolume vol) {
  this.blockId = blockId;
  this.metaFile = metaFile;
  this.blockFile = blockFile;
  this.volume = vol;
}
项目:cumulus    文件:DirectoryScanner.java   
FSVolume getVolume() {
  return volume;
}
项目:cumulus    文件:DirectoryScanner.java   
public ReportCompiler(FSVolume volume, File dir) {
  this.dir = dir;
  this.volume = volume;
}
项目:cumulus    文件:ReplicaInfo.java   
/**
 * Set the volume where this replica is located on disk
 */
void setVolume(FSVolume vol) {
  this.volume = vol;
}
项目:cumulus    文件:ReplicaUnderRecovery.java   
@Override //ReplicaInfo
void setVolume(FSVolume vol) {
  super.setVolume(vol);
  original.setVolume(vol);
}
项目:cumulus    文件:TestDatanodeRestart.java   
private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt) 
throws IOException {
  FSDataOutputStream out = null;
  FileSystem fs = cluster.getFileSystem();
  final Path src = new Path("/test.txt");
  try {
    final int fileLen = 515;
    // create some rbw replicas on disk
    byte[] writeBuf = new byte[fileLen];
    new Random().nextBytes(writeBuf);
    out = fs.create(src);
    out.write(writeBuf);
    out.hflush();
    DataNode dn = cluster.getDataNodes().get(0);
    for (FSVolume volume : ((FSDataset)dn.data).volumes.volumes) {
      File currentDir = volume.getDir().getParentFile();
      File rbwDir = new File(currentDir, "rbw");
      for (File file : rbwDir.listFiles()) {
        if (isCorrupt && Block.isBlockFilename(file)) {
          new RandomAccessFile(file, "rw").setLength(fileLen-1); // corrupt
        }
      }
    }
    cluster.restartDataNodes();
    cluster.waitActive();
    dn = cluster.getDataNodes().get(0);

    // check volumeMap: one rwr replica
    ReplicasMap replicas = ((FSDataset)(dn.data)).volumeMap;
    Assert.assertEquals(1, replicas.size());
    ReplicaInfo replica = replicas.replicas().iterator().next();
    Assert.assertEquals(ReplicaState.RWR, replica.getState());
    if (isCorrupt) {
      Assert.assertEquals((fileLen-1)/512*512, replica.getNumBytes());
    } else {
      Assert.assertEquals(fileLen, replica.getNumBytes());
    }
    dn.data.invalidate(new Block[]{replica});
  } finally {
    IOUtils.closeStream(out);
    if (fs.exists(src)) {
      fs.delete(src, false);
    }
    fs.close();
  }      
}
项目:RDFS    文件:TestDatanodeRestart.java   
private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt)
    throws IOException {
  FSDataOutputStream out = null;
  try {
    FileSystem fs = cluster.getFileSystem();
    NamespaceInfo nsInfo = cluster.getNameNode().versionRequest();
    final int fileLen = 515;
    // create some rbw replicas on disk
    byte[] writeBuf = new byte[fileLen];
    new Random().nextBytes(writeBuf);
    final Path src = new Path("/test.txt");
    out = fs.create(src);
    out.write(writeBuf);
    out.sync();
    DataNode dn = cluster.getDataNodes().get(0);
    // corrupt rbw replicas
    for (FSVolume volume : ((FSDataset) dn.data).volumes.getVolumes()) {
      File rbwDir = volume.getRbwDir(nsInfo.getNamespaceID());
      for (File file : rbwDir.listFiles()) {
        if (isCorrupt && Block.isBlockFilename(file.getName())) {
          new RandomAccessFile(file, "rw").setLength(fileLen - 1); // corrupt
        }
      }
    }
    cluster.restartDataNodes();
    cluster.waitActive();
    dn = cluster.getDataNodes().get(0);

    // check volumeMap: one rbw replica
    Map<Block, DatanodeBlockInfo> volumeMap =
      ((FSDataset) (dn.data)).volumeMap.getNamespaceMap(nsInfo.getNamespaceID());
    assertEquals(1, volumeMap.size());
    Block replica = volumeMap.keySet().iterator().next();
    if (isCorrupt) {
      assertEquals((fileLen - 1), replica.getNumBytes());
    } else {
      assertEquals(fileLen, replica.getNumBytes());
    }
    dn.data.invalidate(nsInfo.getNamespaceID(), new Block[] { replica });
    fs.delete(src, false);
  } finally {
    IOUtils.closeStream(out);
  }
}