Java 类org.apache.hadoop.hdfs.RaidDFSUtil 实例源码

项目:hadoop-EAR    文件:TestDirectoryRaidEncoder.java   
public void testSmallFileDirectory() throws Exception {
  mySetup();
  int stripeLength = 4;
  long blockSize = 8192L;
  try {
    for (String code: RaidDFSUtil.codes) {
      LOG.info("testSmallFileDirectory: Test code " + code);
      Path sourceDir = new Path("/user/raid");
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[]{1000L, 4000L, 1000L}, blockSize, 4096L);
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[]{2000L, 3000L, 2000L, 3000L}, blockSize, 3072L);
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[]{3000L, 3000L, 3000L, 3000L}, blockSize, 3072L);
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[]{511L, 3584L, 3000L, 1234L, 512L, 1234L, 3000L,
          3234L, 511L}, blockSize, 3584L);
    }
  } finally {
    myTearDown();
  }
}
项目:hadoop-EAR    文件:TestDirectoryRaidEncoder.java   
public void testDifferentBlockSizeFileDirectory() throws Exception {
  mySetup();
  int stripeLength = 3;
  long blockSize = 8192L;
  try {
    for (String code: RaidDFSUtil.codes) {
      LOG.info("testDifferentBlockSizeFileDirectory: Test code " + code);
      Path sourceDir = new Path("/user/raid");
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[] {1000, blockSize, 2*blockSize, 2*blockSize + 1},
          new long[] {blockSize, blockSize, 2*blockSize, blockSize},
          2*blockSize);
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[] {blockSize, 2*blockSize, 3*blockSize, 4*blockSize},
          new long[] {blockSize, 2*blockSize, 3*blockSize, blockSize},
          3*blockSize);
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[] {blockSize+1, 9*blockSize+1, 2*blockSize+1,
          blockSize+1}, new long[]{blockSize, 2*blockSize, 3*blockSize,
          blockSize}, 2*blockSize+512);
    }
  } finally {
    myTearDown();
  }
}
项目:RDFS    文件:TestDirectoryRaidEncoder.java   
public void testSmallFileDirectory() throws Exception {
  mySetup();
  int stripeLength = 4;
  long blockSize = 8192L;
  try {
    for (String code: RaidDFSUtil.codes) {
      LOG.info("testSmallFileDirectory: Test code " + code);
      Path sourceDir = new Path("/user/raid");
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[]{1000L, 4000L, 1000L}, blockSize, 4096L);
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[]{2000L, 3000L, 2000L, 3000L}, blockSize, 3072L);
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[]{3000L, 3000L, 3000L, 3000L}, blockSize, 3072L);
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[]{511L, 3584L, 3000L, 1234L, 512L, 1234L, 3000L,
          3234L, 511L}, blockSize, 3584L);
    }
  } finally {
    myTearDown();
  }
}
项目:RDFS    文件:TestDirectoryRaidEncoder.java   
public void testDifferentBlockSizeFileDirectory() throws Exception {
  mySetup();
  int stripeLength = 3;
  long blockSize = 8192L;
  try {
    for (String code: RaidDFSUtil.codes) {
      LOG.info("testDifferentBlockSizeFileDirectory: Test code " + code);
      Path sourceDir = new Path("/user/raid");
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[] {1000, blockSize, 2*blockSize, 2*blockSize + 1},
          new long[] {blockSize, blockSize, 2*blockSize, blockSize},
          2*blockSize);
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[] {blockSize, 2*blockSize, 3*blockSize, 4*blockSize},
          new long[] {blockSize, 2*blockSize, 3*blockSize, blockSize},
          3*blockSize);
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[] {blockSize+1, 9*blockSize+1, 2*blockSize+1,
          blockSize+1}, new long[]{blockSize, 2*blockSize, 3*blockSize,
          blockSize}, 2*blockSize+512);
    }
  } finally {
    myTearDown();
  }
}
项目:mapreduce-fork    文件:DistBlockFixer.java   
/**
 * gets a list of corrupt files from the name node
 * and filters out files that are currently being fixed or 
 * that were recently fixed
 */
private List<Path> getCorruptFiles() throws IOException {
  DistributedFileSystem dfs = (DistributedFileSystem) 
    (new Path("/")).getFileSystem(getConf());

  String[] files = RaidDFSUtil.getCorruptFiles(dfs);
  List<Path> corruptFiles = new LinkedList<Path>();

  for (String f: files) {
    Path p = new Path(f);
    // filter out files that are being fixed or that were recently fixed
    if (!fileIndex.containsKey(p.toString())) {
      corruptFiles.add(p);
    }
  }
  RaidUtils.filterTrash(getConf(), corruptFiles);

  return corruptFiles;
}
项目:mapreduce-fork    文件:TestRaidShell.java   
private void waitForCorruptBlocks(
  int numCorruptBlocks, DistributedFileSystem dfs, Path file)
  throws Exception {
  String path = file.toUri().getPath();
  FileStatus stat = dfs.getFileStatus(file);
  long start = System.currentTimeMillis();
  long actual = 0;
  do {
    actual = RaidDFSUtil.corruptBlocksInFile(
        dfs, path, 0, stat.getLen()).size();
    if (actual == numCorruptBlocks) break;
    if (System.currentTimeMillis() - start > 120000) break;
    LOG.info("Waiting for " + numCorruptBlocks + " corrupt blocks in " +
      path + ", found " + actual);
    Thread.sleep(1000);
  } while (true);
  assertEquals(numCorruptBlocks, actual);
}
项目:mapreduce-fork    文件:TestRaidShellFsck.java   
/**
 * sleeps for up to 20s until the number of corrupt files 
 * in the file system is equal to the number specified
 */
private void waitUntilCorruptFileCount(DistributedFileSystem dfs,
                                       int corruptFiles)
  throws IOException {
  long waitStart = System.currentTimeMillis();
  while (RaidDFSUtil.getCorruptFiles(dfs).length != corruptFiles) {
    try {
      Thread.sleep(1000);
    } catch (InterruptedException ignore) {

    }

    if (System.currentTimeMillis() > waitStart + 20000L) {
      break;
    }
  }

  int corruptFilesFound = RaidDFSUtil.getCorruptFiles(dfs).length;
  if (corruptFilesFound != corruptFiles) {
    throw new IOException("expected " + corruptFiles + 
                          " corrupt files but got " +
                          corruptFilesFound);
  }
}
项目:hadoop-EAR    文件:TestRaidMissingBlocksQueue.java   
/**
 * corrupt a block in a raided file, and make sure it will be shown in the 
 * Raid missing blocks queue.
 */
@Test
public void testCorruptBlocks() throws IOException {
  MiniDFSCluster cluster = null;
  Configuration conf = new Configuration();
  try {
    cluster = new MiniDFSCluster(conf, 3, true, null);

    DistributedFileSystem dfs = DFSUtil.convertToDFS(cluster.getFileSystem());
    String filePath = "/test/file1";
    RaidDFSUtil.constructFakeRaidFile(dfs, filePath, RaidCodec.getCodec("rs"));

    FileStatus stat = dfs.getFileStatus(new Path(filePath));
    LocatedBlocks blocks = dfs.getClient().
        getLocatedBlocks(filePath, 0, stat.getLen());

    Block block = blocks.getLocatedBlocks().get(0).getBlock();
    DFSTestUtil.corruptBlock(block, cluster);

    RaidDFSUtil.reportCorruptBlocksToNN(dfs, 
        new LocatedBlock[] {blocks.getLocatedBlocks().get(0)});

    final FSNamesystem namesystem = cluster.getNameNode().namesystem;
    assertEquals(1, namesystem.getRaidMissingBlocksCount()); // one raid missing blocks;
    assertEquals(0, namesystem.getMissingBlocksCount());  // zero non-raid missing blocks;

  } finally {
    if (cluster != null) {
      cluster.shutdown();
    }
  }
}
项目:hadoop-EAR    文件:TestDirectoryRaidEncoder.java   
public void testIdenticalBlockSizeFileDirectory() throws Exception {
  mySetup();
  int stripeLength = 4;
  long blockSize = 8192L;
  try {
    for (String code: RaidDFSUtil.codes) {
      LOG.info("testIdenticalBlockSizeFileDirectory: Test code " + code);
      Path sourceDir = new Path("/user/raid");
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[] {1000L, blockSize, 2*blockSize, 4000L}, blockSize,
          blockSize);
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[] {blockSize, 2*blockSize, 3*blockSize, 4*blockSize},
          blockSize, blockSize);
      int halfBlock = (int)blockSize/2;
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[] {blockSize + halfBlock, 2*blockSize + halfBlock,
                     3*blockSize + halfBlock, 4*blockSize + halfBlock},
          blockSize, blockSize);
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[] {blockSize+1, 9*blockSize+1, 2*blockSize+1,
          3*blockSize+1}, blockSize, blockSize);
    }
  } finally {
    myTearDown();
  }
}
项目:hadoop-EAR    文件:TestFastFileCheck.java   
public void testVerifySourceFile() throws Exception {
  mySetup();

  try {
    Path srcPath = new Path("/user/dikang/raidtest/file0");
    int numBlocks = 8;
    TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 
        1, numBlocks, 8192L);
    assertTrue(fileSys.exists(srcPath));
    Codec codec = Codec.getCodec("rs");
    FileStatus stat = fileSys.getFileStatus(srcPath);

    // verify good file
    assertTrue(FastFileCheck.checkFile(conf, (DistributedFileSystem)fileSys, 
        fileSys, srcPath, null, codec, 
        RaidUtils.NULL_PROGRESSABLE, true));

    // verify bad file
    LocatedBlocks fileLoc =
        RaidDFSUtil.getBlockLocations((DistributedFileSystem)fileSys, 
            srcPath.toUri().getPath(),
            0, stat.getLen());
    // corrupt file1
    Random rand = new Random();
    int idx = rand.nextInt(numBlocks);
    TestRaidDfs.corruptBlock(srcPath, 
        fileLoc.getLocatedBlocks().get(idx).getBlock(), 
        NUM_DATANODES, true, dfs);

    assertFalse(FastFileCheck.checkFile(conf, (DistributedFileSystem)fileSys, 
        fileSys, srcPath, null, codec, 
        RaidUtils.NULL_PROGRESSABLE, true));
  } finally {
    stopCluster();
  }
}
项目:RDFS    文件:TestDirectoryRaidEncoder.java   
public void testIdenticalBlockSizeFileDirectory() throws Exception {
  mySetup();
  int stripeLength = 4;
  long blockSize = 8192L;
  try {
    for (String code: RaidDFSUtil.codes) {
      LOG.info("testIdenticalBlockSizeFileDirectory: Test code " + code);
      Path sourceDir = new Path("/user/raid");
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[] {1000L, blockSize, 2*blockSize, 4000L}, blockSize,
          blockSize);
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[] {blockSize, 2*blockSize, 3*blockSize, 4*blockSize},
          blockSize, blockSize);
      int halfBlock = (int)blockSize/2;
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[] {blockSize + halfBlock, 2*blockSize + halfBlock,
                     3*blockSize + halfBlock, 4*blockSize + halfBlock},
          blockSize, blockSize);
      validateMultipleFiles(code, fileSys, sourceDir, stripeLength,
          new long[] {blockSize+1, 9*blockSize+1, 2*blockSize+1,
          3*blockSize+1}, blockSize, blockSize);
    }
  } finally {
    myTearDown();
  }
}
项目:mapreduce-fork    文件:BlockFixer.java   
/**
 * Returns the corrupt blocks in a file.
 */
List<LocatedBlock> corruptBlocksInFile(DistributedFileSystem fs,
                                       String uriPath, FileStatus stat)
  throws IOException {
  List<LocatedBlock> corrupt = new LinkedList<LocatedBlock>();
  LocatedBlocks locatedBlocks =
    RaidDFSUtil.getBlockLocations(fs, uriPath, 0, stat.getLen());
  for (LocatedBlock b: locatedBlocks.getLocatedBlocks()) {
    if (b.isCorrupt() ||
        (b.getLocations().length == 0 && b.getBlockSize() > 0)) {
      corrupt.add(b);
    }
  }
  return corrupt;
}
项目:mapreduce-fork    文件:LocalBlockFixer.java   
/**
 * @return A list of corrupt files as obtained from the namenode
 */
List<Path> getCorruptFiles() throws IOException {
  DistributedFileSystem dfs = helper.getDFS(new Path("/"));

  String[] files = RaidDFSUtil.getCorruptFiles(dfs);
  List<Path> corruptFiles = new LinkedList<Path>();
  for (String f: files) {
    Path p = new Path(f);
    if (!history.containsKey(p.toString())) {
      corruptFiles.add(p);
    }
  }
  RaidUtils.filterTrash(getConf(), corruptFiles);
  return corruptFiles;
}
项目:hadoop-EAR    文件:TestRaidMissingBlocksQueue.java   
/**
 * Take down a datanode to generate raid missing blocks, and then bring it back
 * will restore the missing blocks.
 */
@Test
public void testRaidMissingBlocksByTakingDownDataNode() throws IOException, InterruptedException {
  MiniDFSCluster cluster = null;
  Configuration conf = new Configuration();
  try {
    cluster = new MiniDFSCluster(conf, 1, true, null);
    final FSNamesystem namesystem = cluster.getNameNode().namesystem;
    final DistributedFileSystem dfs = DFSUtil.convertToDFS(cluster.getFileSystem());
    String filePath = "/test/file1";
    RaidCodec rsCodec = RaidCodec.getCodec("rs");
    RaidDFSUtil.constructFakeRaidFile(dfs, filePath, rsCodec);

    DatanodeDescriptor[] datanodes = (DatanodeDescriptor[])
        namesystem.heartbeats.toArray(
            new DatanodeDescriptor[1]);
    assertEquals(1, datanodes.length);

    // shutdown the datanode
    DataNodeProperties dnprop = shutdownDataNode(cluster, datanodes[0]);
    assertEquals(rsCodec.numStripeBlocks, namesystem.getRaidMissingBlocksCount());
    assertEquals(0, namesystem.getMissingBlocksCount()); // zero non-raid missing block
    assertEquals(0, namesystem.getNonCorruptUnderReplicatedBlocks());

    // bring up the datanode
    cluster.restartDataNode(dnprop);

    // Wait for block report
    LOG.info("wait for its block report to come in");
    NumberReplicas num;
    FileStatus stat = dfs.getFileStatus(new Path(filePath));
    LocatedBlocks blocks = dfs.getClient().
        getLocatedBlocks(filePath, 0, stat.getLen());
    long startTime = System.currentTimeMillis();
    do {
      Thread.sleep(1000);
      int totalCount = 0;
      namesystem.readLock();
      try {
        for (LocatedBlock block : blocks.getLocatedBlocks()) {
          num = namesystem.countNodes(block.getBlock());
          totalCount += num.liveReplicas();
        }
        if (totalCount == rsCodec.numDataBlocks) {
          break;
        } else {
          LOG.info("wait for block report, received total replicas: " + totalCount);
        }
      } finally {
        namesystem.readUnlock();
      }
    } while (System.currentTimeMillis() - startTime < 30000);
    assertEquals(0, namesystem.getRaidMissingBlocksCount());
    assertEquals(0, namesystem.getMissingBlocksCount()); // zero non-raid missing block
    assertEquals(0, namesystem.getNonCorruptUnderReplicatedBlocks());

  } finally {
    if (cluster != null) {
      cluster.shutdown();
    }
  }   
}
项目:hadoop-EAR    文件:TestDirectoryBlockFixer.java   
/**
 * Corrupt a parity file and wait for it to get fixed.
 */
private void implParityBlockFix(String testName, boolean local)
  throws Exception {
  LOG.info("Test " + testName + " started.");
  int stripeLength = 3;
  mySetup(stripeLength); 
  long[] crcs = new long[3];
  int[] seeds = new int[3];
  Path dirPath = new Path("/user/dhruba/raidtest");
  Path[] files = TestRaidDfs.createTestFiles(dirPath,
      fileSizes, blockSizes, crcs, seeds, fileSys, (short)1);
  Path destPath = new Path("/destraid/user/dhruba");
  Path parityFile = new Path("/destraid/user/dhruba/raidtest");
  LOG.info("Test " + testName + " created test files");
  Configuration localConf = this.getRaidNodeConfig(conf, local);

  try {
    cnode = RaidNode.createRaidNode(null, localConf);
    TestRaidDfs.waitForDirRaided(LOG, fileSys, dirPath, destPath);
    cnode.stop(); cnode.join();

    long parityCRC = RaidDFSUtil.getCRC(fileSys, parityFile);

    FileStatus parityStat = fileSys.getFileStatus(parityFile);
    DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
    LocatedBlocks locs = RaidDFSUtil.getBlockLocations(
      dfs, parityFile.toUri().getPath(), 0, parityStat.getLen());
    String[] corruptFiles = DFSUtil.getCorruptFiles(dfs);

    assertEquals("no corrupt files expected", 0, corruptFiles.length);
    assertEquals("filesFixed() should return 0 before fixing files",
                 0, cnode.blockIntegrityMonitor.getNumFilesFixed());

    // Corrupt parity blocks for different stripes.
    int[] corruptBlockIdxs = new int[]{0, 1, 2};
    for (int idx: corruptBlockIdxs)
      corruptBlock(locs.get(idx).getBlock(), dfsCluster);
    RaidDFSUtil.reportCorruptBlocks(dfs, parityFile, corruptBlockIdxs,
        2*blockSize);

    corruptFiles = DFSUtil.getCorruptFiles(dfs);
    assertEquals("file not corrupted",
                 1, corruptFiles.length);
    assertEquals("wrong file corrupted",
                 corruptFiles[0], parityFile.toUri().getPath());

    cnode = RaidNode.createRaidNode(null, localConf);
    long start = System.currentTimeMillis();
    while (cnode.blockIntegrityMonitor.getNumFilesFixed() < 1 &&
           System.currentTimeMillis() - start < 120000) {
      LOG.info("Test " + testName + " waiting for files to be fixed.");
      Thread.sleep(3000);
    }
    TestBlockFixer.verifyMetrics(fileSys, cnode, local, 1L, 
        corruptBlockIdxs.length);

    long checkCRC = RaidDFSUtil.getCRC(fileSys, parityFile);

    assertEquals("file not fixed",
                 parityCRC, checkCRC);

  } catch (Exception e) {
    LOG.info("Test " + testName + " Exception " + e +
             StringUtils.stringifyException(e));
    throw e;
  } finally {
    myTearDown();
  }
  LOG.info("Test " + testName + " completed.");
}
项目:hadoop-EAR    文件:TestBlockFixer.java   
public void testMultiplePriorities() throws Exception {
  LOG.info("Test testMultiplePriorities started.");
  Path srcFile = new Path("/home/test/file1");
  int repl = 1;
  int numBlocks = 8;
  long blockSize = 16384;
  int stripeLength = 3;
  Path destPath = new Path("/destraidrs");
  mySetup(stripeLength, -1); // never har
  Codec codec = Codec.getCodec("rs");
  LOG.info("Starting testMultiplePriorities");
  try {
    // Create test file and raid it.
    TestRaidDfs.createTestFilePartialLastBlock(
      fileSys, srcFile, repl, numBlocks, blockSize);
    FileStatus stat = fileSys.getFileStatus(srcFile);
    RaidNode.doRaid(conf, stat,
      destPath, codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE,
      false, repl, repl);

    // Corrupt first block of file.
    int blockIdxToCorrupt = 1;
    LOG.info("Corrupt block " + blockIdxToCorrupt + " of file " + srcFile);
    LocatedBlocks locations = getBlockLocations(srcFile, stat.getLen());
    corruptBlock(locations.get(blockIdxToCorrupt).getBlock(),
        dfsCluster);
    RaidDFSUtil.reportCorruptBlocks(fileSys, srcFile, new int[]{1}, blockSize);

    // Create Block Fixer and fix.
    FakeDistBlockIntegrityMonitor distBlockFixer = new FakeDistBlockIntegrityMonitor(conf);
    assertEquals(0, distBlockFixer.submittedJobs.size());

    // waiting for one job to submit
    long startTime = System.currentTimeMillis();
    while (System.currentTimeMillis() - startTime < 120000 &&
           distBlockFixer.submittedJobs.size() == 0) { 
      distBlockFixer.getCorruptionMonitor().checkAndReconstructBlocks();
      LOG.info("Waiting for jobs to submit");
      Thread.sleep(10000);
    }
    int submittedJob = distBlockFixer.submittedJobs.size();
    LOG.info("Already Submitted " + submittedJob + " jobs");
    assertTrue("Should submit more than 1 jobs", submittedJob >= 1);

    // Corrupt one more block.
    blockIdxToCorrupt = 4;
    LOG.info("Corrupt block " + blockIdxToCorrupt + " of file " + srcFile);
    locations = getBlockLocations(srcFile, stat.getLen());
    corruptBlock(locations.get(blockIdxToCorrupt).getBlock(),
        dfsCluster);
    RaidDFSUtil.reportCorruptBlocks(fileSys, srcFile, new int[]{4}, blockSize);

    // A new job should be submitted since two blocks are corrupt.
    startTime = System.currentTimeMillis();
    while (System.currentTimeMillis() - startTime < 120000 &&
           distBlockFixer.submittedJobs.size() == submittedJob) { 
      distBlockFixer.getCorruptionMonitor().checkAndReconstructBlocks();
      LOG.info("Waiting for more jobs to submit");
      Thread.sleep(10000);
    }
    LOG.info("Already Submitted " + distBlockFixer.submittedJobs.size()  + " jobs");
    assertTrue("Should submit more than 1 jobs",
        distBlockFixer.submittedJobs.size() - submittedJob >= 1);
  } finally {
    myTearDown();
  }
}
项目:hadoop-EAR    文件:TestDirectoryRaidEncoder.java   
private void validateSingleFile(String code, FileSystem fileSys, 
    Path sourceDir, int stripeLength, int blockNum, boolean lastPartial)
        throws Exception {
  LOG.info("Test file with " + blockNum + " blocks and " +
        (lastPartial? "partial": "full") + " last block");
  Codec codec = loadTestCodecs(code, stripeLength, true);
  Path parityDir = new Path(codec.parityDirectory);
  RaidDFSUtil.cleanUp(fileSys, sourceDir);
  RaidDFSUtil.cleanUp(fileSys, parityDir);
  fileSys.mkdirs(sourceDir);

  Path file1 = new Path(sourceDir, "file1");
  if (!lastPartial) {
    TestRaidDfs.createTestFile(fileSys, file1, 2, blockNum, 8192L);
  } else {
    TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1, 2,
        blockNum, 8192L);
  }
  Path parityFile = RaidNode.getOriginalParityFile(parityDir, sourceDir);
  // Do directory level raid
  LOG.info("Create a directory-raid parity file " + parityFile);
  assertTrue("Cannot raid directory " + sourceDir, 
      doRaid(conf, fileSys, sourceDir, codec));
  assertEquals("Modification time should be the same", 
      fileSys.getFileStatus(sourceDir).getModificationTime(),
      fileSys.getFileStatus(parityFile).getModificationTime());
  assertEquals("Replica num of source file should be reduced to 1",
      fileSys.getFileStatus(file1).getReplication(), 1);
  assertEquals("Replica num of parity file should be reduced to 1",
      fileSys.getFileStatus(parityFile).getReplication(), 1);
  long dirCRC = RaidDFSUtil.getCRC(fileSys, parityFile);
  long dirLen = fileSys.getFileStatus(parityFile).getLen();
  // remove the parity dir
  RaidDFSUtil.cleanUp(fileSys, parityDir);
  codec = loadTestCodecs(code, stripeLength, false);
  Path parityFile1 = RaidNode.getOriginalParityFile(parityDir,
      file1);
  LOG.info("Create a file-raid parity file " + parityFile1);
  assertTrue("Cannot raid file " + file1, 
      doRaid(conf, fileSys, file1, codec));
  assertTrue("Parity file doesn't match when the file has " + blockNum + 
      " blocks ", 
      TestRaidDfs.validateFile(fileSys, parityFile1, dirLen, dirCRC));
}
项目:hadoop-EAR    文件:TestDirectoryRaidEncoder.java   
public void testOneFileDirectory() throws Exception {
  mySetup();
  int stripeLength = 4;
  try {
    for (String code: RaidDFSUtil.codes) {
      LOG.info("testOneFileDirectory: Test code " + code);
      Codec codec = loadTestCodecs(code, stripeLength, true);
      Path sourceDir = new Path("/user/raid", code);
      assertTrue(fileSys.mkdirs(sourceDir));
      Path twoBlockFile = new Path(sourceDir, "twoBlockFile");;
      LOG.info("Test one file with 2 blocks");
      TestRaidDfs.createTestFile(fileSys, twoBlockFile, 2, 2, 8192L);
      assertTrue(fileSys.exists(twoBlockFile));
      assertFalse("Not enough blocks in the directory",
          RaidNode.doRaid(conf, fileSys.getFileStatus(sourceDir),
              new Path(codec.parityDirectory), codec,
              new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE,
              false, 1, 1));
      fileSys.delete(twoBlockFile, true);

      LOG.info("Test one file with blocks less than one stripe");
      validateSingleFile(code, fileSys, sourceDir, stripeLength, 3,
          false);
      validateSingleFile(code, fileSys, sourceDir, stripeLength, 3,
          true);
      LOG.info("Test one file with one stripe blocks");
      validateSingleFile(code, fileSys, sourceDir, stripeLength,
          stripeLength, false);
      validateSingleFile(code, fileSys, sourceDir, stripeLength,
          stripeLength, true);

      LOG.info("Test one file with more than one stripe blocks");
      validateSingleFile(code, fileSys, sourceDir, stripeLength,
          stripeLength + 2, false);
      validateSingleFile(code, fileSys, sourceDir, stripeLength,
          stripeLength + 2, true);
    }
  } finally {
    myTearDown();
  }
}
项目:hadoop-EAR    文件:TestDecoder.java   
public void verifyDecoder(String code, int parallelism) throws Exception {
  Codec codec = Codec.getCodec(code);
  conf.setInt("raid.encoder.parallelism", parallelism);
  ConfigBuilder cb = new ConfigBuilder(CONFIG_FILE);
  cb.addPolicy("RaidTest1", "/user/dikang/raidtest/file" + code + parallelism,
      1, 1, code);
  cb.persist();
  Path srcPath = new Path("/user/dikang/raidtest/file" + code + parallelism +
      "/file1");
  long blockSize = 8192 * 1024L;

  long crc = TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 
      1, 7, blockSize);
  doRaid(srcPath, codec);
  FileStatus srcStat = fileSys.getFileStatus(srcPath);
  ParityFilePair pair = ParityFilePair.getParityFile(codec, srcStat, conf);

  FileStatus file1Stat = fileSys.getFileStatus(srcPath);
  long length = file1Stat.getLen();
  LocatedBlocks file1Loc =
      RaidDFSUtil.getBlockLocations((DistributedFileSystem)fileSys, 
          srcPath.toUri().getPath(),
          0, length);

  // corrupt file

  int[] corruptBlockIdxs = new int[] {5};
  long errorOffset = 5 * blockSize;
  for (int idx: corruptBlockIdxs) {
    TestBlockFixer.corruptBlock(file1Loc.get(idx).getBlock(), dfsCluster);
  }

  RaidDFSUtil.reportCorruptBlocks((DistributedFileSystem)fileSys, srcPath,
      corruptBlockIdxs, blockSize);

  Decoder decoder = new Decoder(conf, codec);
  ByteArrayOutputStream out = new ByteArrayOutputStream();
  decoder.codec.simulateBlockFix = true;
  CRC32 oldCRC = decoder.fixErasedBlock(fileSys, srcStat, fileSys, 
      pair.getPath(), true, blockSize, errorOffset, blockSize, false, 
      out, null, null, false);

  decoder.codec.simulateBlockFix = false;
  out = new ByteArrayOutputStream();
  decoder.fixErasedBlock(fileSys, srcStat, fileSys, 
      pair.getPath(), true, blockSize, errorOffset, blockSize, false, 
      out, null, null, false);

  // calculate the new crc
  CRC32 newCRC = new CRC32();
  byte[] constructedBytes = out.toByteArray();
  newCRC.update(constructedBytes);

  assertEquals(oldCRC.getValue(), newCRC.getValue());
}
项目:hadoop-EAR    文件:TestSimulationParityBlockFixer.java   
public void testParityHarBadBlockFixer() throws Exception {
  LOG.info("Test testParityHarBlockFix started.");
  long blockSize = 8192L;
  int stripeLength = 3;
  mySetup(stripeLength, -1, "org.apache.hadoop.raid.BadXORCode",
      "org.apache.hadoop.raid.BadReedSolomonCode", "rs", true); 
  Path file1 = new Path("/user/dhruba/raidtest/file1");
  // Parity file will have 7 blocks.
  long crc = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
                                             1, 20, blockSize);
  LOG.info("Created test files");

  // create an instance of the RaidNode
  Configuration localConf = new Configuration(conf);
  localConf.setInt(RaidNode.RAID_PARITY_HAR_THRESHOLD_DAYS_KEY, 0);
  localConf.set("raid.blockfix.classname",
                "org.apache.hadoop.raid.DistBlockIntegrityMonitor");
  localConf.setLong("raid.blockfix.filespertask", 2L);

  try {
    cnode = RaidNode.createRaidNode(null, localConf);
    Path harDirectory =
      new Path("/destraidrs/user/dhruba/raidtest/raidtest" +
               RaidNode.HAR_SUFFIX);
    long start = System.currentTimeMillis();
    while (System.currentTimeMillis() - start < 1000 * 120) {
      if (fileSys.exists(harDirectory)) {
        break;
      }
      LOG.info("Waiting for har");
      Thread.sleep(1000);
    }
    assertEquals(true, fileSys.exists(harDirectory));
    cnode.stop(); cnode.join();
    DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
    String[] corruptFiles = DFSUtil.getCorruptFiles(dfs);
    assertEquals("no corrupt files expected", 0, corruptFiles.length);
    // Corrupt source blocks
    FileStatus stat = fileSys.getFileStatus(file1);
    LocatedBlocks locs = RaidDFSUtil.getBlockLocations(
        dfs, file1.toUri().getPath(), 0, stat.getLen());
    int[] corruptBlockIdxs = new int[]{0};
    for (int idx: corruptBlockIdxs) {
      TestBlockFixer.corruptBlock(locs.get(idx).getBlock(),
          dfsCluster);
    }
    RaidDFSUtil.reportCorruptBlocks(dfs, file1, corruptBlockIdxs,
        stat.getBlockSize());

    corruptFiles = DFSUtil.getCorruptFiles(dfs);
    assertEquals("file not corrupted", 1, corruptFiles.length);
    assertEquals("wrong file corrupted",
                 corruptFiles[0], file1.toUri().getPath());

    cnode = RaidNode.createRaidNode(null, localConf);
    start = System.currentTimeMillis();
    while (cnode.blockIntegrityMonitor.getNumFilesFixed() < 1 &&
           System.currentTimeMillis() - start < 120000) {
      LOG.info("Waiting for files to be fixed.");
      Thread.sleep(1000);
    }

    long checkCRC = RaidDFSUtil.getCRC(fileSys, file1);
    assertEquals("file not fixed", crc, checkCRC);
    // Verify the counters are right
    long expectedNumFailures = corruptBlockIdxs.length;
    assertEquals(expectedNumFailures,
        cnode.blockIntegrityMonitor.getNumBlockFixSimulationFailures());
    assertEquals(0,
        cnode.blockIntegrityMonitor.getNumBlockFixSimulationSuccess());
  } catch (Exception e) {
    LOG.info("Exception ", e);
    throw e;
  } finally {
    myTearDown();
  }
  LOG.info("Test testParityHarBlockFix completed.");
}
项目:hadoop-EAR    文件:TestDirectoryRaidDfs.java   
public void testTooManyErrorsDecode() throws Exception {
  LOG.info("testTooManyErrorsDecode start");
  long blockSize = 8192L;
  stripeLength = 3;
  mySetup("xor", 1);
  long[][] fsizes = {{2000L, 3000L, 2000L}, 
                     {blockSize + 1, blockSize + 1},
                     {2*blockSize, blockSize + blockSize/2}};
  long[][] bsizes = {{blockSize, blockSize, blockSize},
                     {blockSize, blockSize},
                     {2*blockSize, blockSize}};
  Integer[][] corrupts = {{0, 1}, {0, 2}, {1, 2}};
  try {
    for (String code: RaidDFSUtil.codes) {
      Codec curCodec = Codec.getCodec(code);
      Path srcDir = new Path("/user/dhruba/" + code);
      for (int i = 0; i < corrupts.length; i++) {
        for (int j = 0; j < fsizes.length; j++) {
          long[] crcs = new long[fsizes[j].length];
          int[] seeds = new int[fsizes[j].length];
          Path parityDir = new Path(codec.parityDirectory);
          RaidDFSUtil.cleanUp(fileSys, srcDir);
          RaidDFSUtil.cleanUp(fileSys, parityDir);
          TestRaidDfs.createTestFiles(srcDir, fsizes[j],
              bsizes[j], crcs, seeds, fileSys, (short)1);
          assertTrue(RaidNode.doRaid(conf,
              fileSys.getFileStatus(srcDir),
              new Path(curCodec.parityDirectory), curCodec,
              new RaidNode.Statistics(),
              RaidUtils.NULL_PROGRESSABLE,
              false, 1, 1));
          boolean expectedExceptionThrown = false;
          try {
            corruptBlocksInDirectory(conf, srcDir,
                crcs, corrupts[i], fileSys, dfs, true, false);
            // Should not reach.
          } catch (IOException e) {
            LOG.info("Expected exception caught" + e);
            expectedExceptionThrown = true;
          }
          assertTrue(expectedExceptionThrown);
        }
      }
    }
    LOG.info("testTooManyErrorsDecode complete");
  } finally {
    myTearDown();
  }
}
项目:hadoop-EAR    文件:TestDirectoryRaidDfs.java   
public void testTooManyErrorsEncode() throws Exception {
  LOG.info("testTooManyErrorsEncode complete");
  stripeLength = 3;
  mySetup("xor", 1);
  // Encoding should fail when even one block is corrupt.
  Random rand = new Random();
  try {
    for (String code: RaidDFSUtil.codes) {
      Codec curCodec = Codec.getCodec(code);
      Path srcDir = new Path("/user/dhruba/" + code);
      for (int j = 0; j < fileSizes.length; j++) {
        long[] crcs = new long[fileSizes[j].length];
        int[] seeds = new int[fileSizes[j].length];
        Path parityDir = new Path(codec.parityDirectory);
        RaidDFSUtil.cleanUp(fileSys, srcDir);
        RaidDFSUtil.cleanUp(fileSys, parityDir);
        TestRaidDfs.createTestFiles(srcDir, fileSizes[j],
            blockSizes[j], crcs, seeds, fileSys, (short)1);
        corruptBlocksInDirectory(conf, srcDir,
            crcs, new Integer[]{rand.nextInt() % 3},
            fileSys, dfs, false, false);
        boolean expectedExceptionThrown = false;
        try {
          RaidNode.doRaid(conf, fileSys.getFileStatus(srcDir),
              new Path(curCodec.parityDirectory), curCodec,
              new RaidNode.Statistics(),
              RaidUtils.NULL_PROGRESSABLE,
              false, 1, 1);
          // Should not reach.
        } catch (IOException e) {
          LOG.info("Expected exception caught" + e);
          expectedExceptionThrown = true;
        }
        assertTrue(expectedExceptionThrown);
      }
    }
    LOG.info("testTooManyErrorsEncode complete");
  } finally {
    myTearDown();
  }
}
项目:hadoop-EAR    文件:TestReadConstruction.java   
private boolean doThePartialTest(Codec codec,
                              int blockNum,
                              int[] corruptBlockIdxs) throws Exception {
  long blockSize = 8192 * 1024L;
  int bufferSize = 4192 * 1024;

  Path srcPath = new Path("/user/dikang/raidtest/file" + 
                          UUID.randomUUID().toString());

  long crc = TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 
      1, blockNum, blockSize);

  DistributedRaidFileSystem raidFs = getRaidFS();
  assertTrue(raidFs.exists(srcPath));

  // generate the parity files.
  doRaid(srcPath, codec);

  FileStatus file1Stat = fileSys.getFileStatus(srcPath);
  long length = file1Stat.getLen();
  LocatedBlocks file1Loc =
      RaidDFSUtil.getBlockLocations((DistributedFileSystem)fileSys, 
          srcPath.toUri().getPath(),
          0, length);
  // corrupt file1

  for (int idx: corruptBlockIdxs) {
    corruptBlock(file1Loc.get(idx).getBlock(), 
                              dfs);
  }
  RaidDFSUtil.reportCorruptBlocks((DistributedFileSystem)fileSys, srcPath,
                       corruptBlockIdxs, blockSize);

  // verify the partial read
  byte[] buffer = new byte[bufferSize];
  FSDataInputStream in = raidFs.open(srcPath);

  long numRead = 0;
  CRC32 newcrc = new CRC32();

  int num = 0;
  while (num >= 0) {
    num = in.read(numRead, buffer, 0, bufferSize);
    if (num < 0) {
      break;
    }
    numRead += num;
    newcrc.update(buffer, 0, num);
  }
  in.close();

  if (numRead != length) {
    LOG.info("Number of bytes read " + numRead +
             " does not match file size " + length);
    return false;
  }

  LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc);
  if (newcrc.getValue() != crc) {
    LOG.info("CRC mismatch of file " + srcPath.toUri().getPath() + ": " + 
              newcrc.getValue() + " vs. " + crc);
    return false;
  }
  return true;
}
项目:RDFS    文件:TestDirectoryBlockFixer.java   
/**
 * Corrupt a parity file and wait for it to get fixed.
 */
private void implParityBlockFix(String testName, boolean local)
  throws Exception {
  LOG.info("Test " + testName + " started.");
  int stripeLength = 3;
  mySetup(stripeLength); 
  long[] crcs = new long[3];
  int[] seeds = new int[3];
  Path dirPath = new Path("/user/dhruba/raidtest");
  Path[] files = TestRaidDfs.createTestFiles(dirPath,
      fileSizes, blockSizes, crcs, seeds, fileSys, (short)1);
  Path destPath = new Path("/destraid/user/dhruba");
  Path parityFile = new Path("/destraid/user/dhruba/raidtest");
  LOG.info("Test " + testName + " created test files");
  Configuration localConf = this.getRaidNodeConfig(conf, local);

  try {
    cnode = RaidNode.createRaidNode(null, localConf);
    TestRaidDfs.waitForDirRaided(LOG, fileSys, dirPath, destPath);
    cnode.stop(); cnode.join();

    long parityCRC = RaidDFSUtil.getCRC(fileSys, parityFile);

    FileStatus parityStat = fileSys.getFileStatus(parityFile);
    DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
    LocatedBlocks locs = RaidDFSUtil.getBlockLocations(
      dfs, parityFile.toUri().getPath(), 0, parityStat.getLen());
    String[] corruptFiles = DFSUtil.getCorruptFiles(dfs);

    assertEquals("no corrupt files expected", 0, corruptFiles.length);
    assertEquals("filesFixed() should return 0 before fixing files",
                 0, cnode.blockIntegrityMonitor.getNumFilesFixed());

    // Corrupt parity blocks for different stripes.
    int[] corruptBlockIdxs = new int[]{0, 1, 2};
    for (int idx: corruptBlockIdxs)
      corruptBlock(locs.get(idx).getBlock().getBlockName(), dfsCluster);
    RaidDFSUtil.reportCorruptBlocks(dfs, parityFile, corruptBlockIdxs,
        2*blockSize);

    corruptFiles = DFSUtil.getCorruptFiles(dfs);
    assertEquals("file not corrupted",
                 1, corruptFiles.length);
    assertEquals("wrong file corrupted",
                 corruptFiles[0], parityFile.toUri().getPath());

    cnode = RaidNode.createRaidNode(null, localConf);
    long start = System.currentTimeMillis();
    while (cnode.blockIntegrityMonitor.getNumFilesFixed() < 1 &&
           System.currentTimeMillis() - start < 120000) {
      LOG.info("Test " + testName + " waiting for files to be fixed.");
      Thread.sleep(3000);
    }
    assertEquals("file not fixed",
                 1, cnode.blockIntegrityMonitor.getNumFilesFixed());

    long checkCRC = RaidDFSUtil.getCRC(fileSys, parityFile);

    assertEquals("file not fixed",
                 parityCRC, checkCRC);

  } catch (Exception e) {
    LOG.info("Test " + testName + " Exception " + e +
             StringUtils.stringifyException(e));
    throw e;
  } finally {
    myTearDown();
  }
  LOG.info("Test " + testName + " completed.");
}
项目:RDFS    文件:TestBlockFixer.java   
/**
 * Create a file with three stripes, corrupt a block each in two stripes,
 * and wait for the the file to be fixed.
 */
private void implBlockFix(boolean local) throws Exception {
  LOG.info("Test testBlockFix started.");
  long blockSize = 8192L;
  int stripeLength = 3;
  mySetup(stripeLength, -1); // never har
  Path file1 = new Path("/user/dhruba/raidtest/file1");
  Path destPath = new Path("/destraid/user/dhruba/raidtest");
  long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
                                                        1, 7, blockSize);
  long file1Len = fileSys.getFileStatus(file1).getLen();
  LOG.info("Test testBlockFix created test files");

  // create an instance of the RaidNode
  Configuration localConf = new Configuration(conf);
  localConf.setInt("raid.blockfix.interval", 1000);
  if (local) {
    localConf.set("raid.blockfix.classname",
                  "org.apache.hadoop.raid.LocalBlockIntegrityMonitor");
  } else {
    localConf.set("raid.blockfix.classname",
                  "org.apache.hadoop.raid.DistBlockIntegrityMonitor");
  }
  localConf.setLong("raid.blockfix.filespertask", 2L);

  try {
    cnode = RaidNode.createRaidNode(null, localConf);
    TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
    cnode.stop(); cnode.join();

    FileStatus srcStat = fileSys.getFileStatus(file1);
    DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
    LocatedBlocks locs = RaidDFSUtil.getBlockLocations(
      dfs, file1.toUri().getPath(), 0, srcStat.getLen());

    String[] corruptFiles = DFSUtil.getCorruptFiles(dfs);
    assertEquals("no corrupt files expected", 0, corruptFiles.length);
    assertEquals("filesFixed() should return 0 before fixing files",
                 0, cnode.blockIntegrityMonitor.getNumFilesFixed());

    // Corrupt blocks in two different stripes. We can fix them.
    int[] corruptBlockIdxs = new int[]{0, 4, 6};
    for (int idx: corruptBlockIdxs)
      corruptBlock(locs.get(idx).getBlock().getBlockName(), dfsCluster);
    RaidDFSUtil.reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);

    corruptFiles = DFSUtil.getCorruptFiles(dfs);
    assertEquals("file not corrupted", 1, corruptFiles.length);
    assertEquals("wrong file corrupted",
                 corruptFiles[0], file1.toUri().getPath());
    assertEquals("wrong number of corrupt blocks", 3,
      RaidDFSUtil.corruptBlocksInFile(dfs, file1.toUri().getPath(), 0,
        srcStat.getLen()).size());

    cnode = RaidNode.createRaidNode(null, localConf);
    long start = System.currentTimeMillis();
    while (cnode.blockIntegrityMonitor.getNumFilesFixed() < 1 &&
           System.currentTimeMillis() - start < 120000) {
      LOG.info("Test testBlockFix waiting for files to be fixed.");
      Thread.sleep(1000);
    }
    assertEquals("file not fixed", 1, cnode.blockIntegrityMonitor.getNumFilesFixed());

    dfs = getDFS(conf, dfs);
    assertTrue("file not fixed",
               TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));

  } catch (Exception e) {
    LOG.info("Test testBlockFix Exception " + e +
             StringUtils.stringifyException(e));
    throw e;
  } finally {
    myTearDown();
  }
  LOG.info("Test testBlockFix completed.");
}
项目:RDFS    文件:TestBlockFixer.java   
public void testMultiplePriorities() throws Exception {
  Path srcFile = new Path("/home/test/file1");
  int repl = 1;
  int numBlocks = 8;
  long blockSize = 16384;
  int stripeLength = 3;
  Path destPath = new Path("/destraidrs");
  mySetup(stripeLength, -1); // never har
  Codec codec = Codec.getCodec("rs");
  LOG.info("Starting testMultiplePriorities");
  try {
    // Create test file and raid it.
    TestRaidDfs.createTestFilePartialLastBlock(
      fileSys, srcFile, repl, numBlocks, blockSize);
    FileStatus stat = fileSys.getFileStatus(srcFile);
    RaidNode.doRaid(conf, stat,
      destPath, codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE,
      false, repl, repl);

    // Corrupt first block of file.
    int blockIdxToCorrupt = 1;
    LOG.info("Corrupt block " + blockIdxToCorrupt + " of file " + srcFile);
    LocatedBlocks locations = getBlockLocations(srcFile, stat.getLen());
    corruptBlock(locations.get(blockIdxToCorrupt).getBlock().getBlockName(),
        dfsCluster);
    RaidDFSUtil.reportCorruptBlocks(fileSys, srcFile, new int[]{1}, blockSize);

    // Create Block Fixer and fix.
    FakeDistBlockIntegrityMonitor distBlockFixer = new FakeDistBlockIntegrityMonitor(conf);
    assertEquals(0, distBlockFixer.submittedJobs.size());

    // One job should be submitted.
    distBlockFixer.getCorruptionMonitor().checkAndReconstructBlocks();
    assertEquals(1, distBlockFixer.submittedJobs.size());

    // No new job should be submitted since we already have one.
    distBlockFixer.getCorruptionMonitor().checkAndReconstructBlocks();
    assertEquals(1, distBlockFixer.submittedJobs.size());

    // Corrupt one more block.
    blockIdxToCorrupt = 4;
    LOG.info("Corrupt block " + blockIdxToCorrupt + " of file " + srcFile);
    locations = getBlockLocations(srcFile, stat.getLen());
    corruptBlock(locations.get(blockIdxToCorrupt).getBlock().getBlockName(),
        dfsCluster);
    RaidDFSUtil.reportCorruptBlocks(fileSys, srcFile, new int[]{4}, blockSize);

    // A new job should be submitted since two blocks are corrupt.
    distBlockFixer.getCorruptionMonitor().checkAndReconstructBlocks();
    assertEquals(2, distBlockFixer.submittedJobs.size());
  } finally {
    myTearDown();
  }
}
项目:RDFS    文件:TestDirectoryRaidEncoder.java   
private void validateSingleFile(String code, FileSystem fileSys, 
    Path sourceDir, int stripeLength, int blockNum, boolean lastPartial)
        throws Exception {
  LOG.info("Test file with " + blockNum + " blocks and " +
        (lastPartial? "partial": "full") + " last block");
  Codec codec = loadTestCodecs(code, stripeLength, true);
  Path parityDir = new Path(codec.parityDirectory);
  RaidDFSUtil.cleanUp(fileSys, sourceDir);
  RaidDFSUtil.cleanUp(fileSys, parityDir);
  fileSys.mkdirs(sourceDir);

  Path file1 = new Path(sourceDir, "file1");
  if (!lastPartial) {
    TestRaidDfs.createTestFile(fileSys, file1, 2, blockNum, 8192L);
  } else {
    TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1, 2,
        blockNum, 8192L);
  }
  Path parityFile = RaidNode.getOriginalParityFile(parityDir, sourceDir);
  // Do directory level raid
  LOG.info("Create a directory-raid parity file " + parityFile);
  assertTrue("Cannot raid directory " + sourceDir, 
      doRaid(conf, fileSys, sourceDir, codec));
  assertEquals("Modification time should be the same", 
      fileSys.getFileStatus(sourceDir).getModificationTime(),
      fileSys.getFileStatus(parityFile).getModificationTime());
  assertEquals("Replica num of source file should be reduced to 1",
      fileSys.getFileStatus(file1).getReplication(), 1);
  assertEquals("Replica num of parity file should be reduced to 1",
      fileSys.getFileStatus(parityFile).getReplication(), 1);
  long dirCRC = RaidDFSUtil.getCRC(fileSys, parityFile);
  long dirLen = fileSys.getFileStatus(parityFile).getLen();
  // remove the parity dir
  RaidDFSUtil.cleanUp(fileSys, parityDir);
  codec = loadTestCodecs(code, stripeLength, false);
  Path parityFile1 = RaidNode.getOriginalParityFile(parityDir,
      file1);
  LOG.info("Create a file-raid parity file " + parityFile1);
  assertTrue("Cannot raid file " + file1, 
      doRaid(conf, fileSys, file1, codec));
  assertTrue("Parity file doesn't match when the file has " + blockNum + 
      " blocks ", 
      TestRaidDfs.validateFile(fileSys, parityFile1, dirLen, dirCRC));
}
项目:RDFS    文件:TestDirectoryRaidEncoder.java   
public void testOneFileDirectory() throws Exception {
  mySetup();
  int stripeLength = 4;
  try {
    for (String code: RaidDFSUtil.codes) {
      LOG.info("testOneFileDirectory: Test code " + code);
      Codec codec = loadTestCodecs(code, stripeLength, true);
      Path sourceDir = new Path("/user/raid", code);
      assertTrue(fileSys.mkdirs(sourceDir));
      Path twoBlockFile = new Path(sourceDir, "twoBlockFile");;
      LOG.info("Test one file with 2 blocks");
      TestRaidDfs.createTestFile(fileSys, twoBlockFile, 2, 2, 8192L);
      assertTrue(fileSys.exists(twoBlockFile));
      assertFalse("Not enough blocks in the directory",
          RaidNode.doRaid(conf, fileSys.getFileStatus(sourceDir),
              new Path(codec.parityDirectory), codec,
              new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE,
              false, 1, 1));
      fileSys.delete(twoBlockFile, true);

      LOG.info("Test one file with blocks less than one stripe");
      validateSingleFile(code, fileSys, sourceDir, stripeLength, 3,
          false);
      validateSingleFile(code, fileSys, sourceDir, stripeLength, 3,
          true);
      LOG.info("Test one file with one stripe blocks");
      validateSingleFile(code, fileSys, sourceDir, stripeLength,
          stripeLength, false);
      validateSingleFile(code, fileSys, sourceDir, stripeLength,
          stripeLength, true);

      LOG.info("Test one file with more than one stripe blocks");
      validateSingleFile(code, fileSys, sourceDir, stripeLength,
          stripeLength + 2, false);
      validateSingleFile(code, fileSys, sourceDir, stripeLength,
          stripeLength + 2, true);
    }
  } finally {
    myTearDown();
  }
}
项目:RDFS    文件:TestDirectoryRaidDfs.java   
public void testTooManyErrorsDecode() throws Exception {
  LOG.info("testTooManyErrorsDecode start");
  long blockSize = 8192L;
  stripeLength = 3;
  mySetup("xor", 1);
  long[][] fsizes = {{2000L, 3000L, 2000L}, 
                     {blockSize + 1, blockSize + 1},
                     {2*blockSize, blockSize + blockSize/2}};
  long[][] bsizes = {{blockSize, blockSize, blockSize},
                     {blockSize, blockSize},
                     {2*blockSize, blockSize}};
  Integer[][] corrupts = {{0, 1}, {0, 2}, {1, 2}};
  try {
    for (String code: RaidDFSUtil.codes) {
      Codec curCodec = Codec.getCodec(code);
      Path srcDir = new Path("/user/dhruba/" + code);
      for (int i = 0; i < corrupts.length; i++) {
        for (int j = 0; j < fsizes.length; j++) {
          long[] crcs = new long[fsizes[j].length];
          int[] seeds = new int[fsizes[j].length];
          Path parityDir = new Path(codec.parityDirectory);
          RaidDFSUtil.cleanUp(fileSys, srcDir);
          RaidDFSUtil.cleanUp(fileSys, parityDir);
          TestRaidDfs.createTestFiles(srcDir, fsizes[j],
              bsizes[j], crcs, seeds, fileSys, (short)1);
          assertTrue(RaidNode.doRaid(conf,
              fileSys.getFileStatus(srcDir),
              new Path(curCodec.parityDirectory), curCodec,
              new RaidNode.Statistics(),
              RaidUtils.NULL_PROGRESSABLE,
              false, 1, 1));
          boolean expectedExceptionThrown = false;
          try {
            corruptBlocksInDirectory(conf, srcDir,
                crcs, corrupts[i], fileSys, dfs, true, false);
            // Should not reach.
          } catch (IOException e) {
            LOG.info("Expected exception caught" + e);
            expectedExceptionThrown = true;
          }
          assertTrue(expectedExceptionThrown);
        }
      }
    }
    LOG.info("testTooManyErrorsDecode complete");
  } finally {
    myTearDown();
  }
}
项目:RDFS    文件:TestDirectoryRaidDfs.java   
public void testTooManyErrorsEncode() throws Exception {
  LOG.info("testTooManyErrorsEncode complete");
  stripeLength = 3;
  mySetup("xor", 1);
  // Encoding should fail when even one block is corrupt.
  Random rand = new Random();
  try {
    for (String code: RaidDFSUtil.codes) {
      Codec curCodec = Codec.getCodec(code);
      Path srcDir = new Path("/user/dhruba/" + code);
      for (int j = 0; j < fileSizes.length; j++) {
        long[] crcs = new long[fileSizes[j].length];
        int[] seeds = new int[fileSizes[j].length];
        Path parityDir = new Path(codec.parityDirectory);
        RaidDFSUtil.cleanUp(fileSys, srcDir);
        RaidDFSUtil.cleanUp(fileSys, parityDir);
        TestRaidDfs.createTestFiles(srcDir, fileSizes[j],
            blockSizes[j], crcs, seeds, fileSys, (short)1);
        corruptBlocksInDirectory(conf, srcDir,
            crcs, new Integer[]{rand.nextInt() % 3},
            fileSys, dfs, false, false);
        boolean expectedExceptionThrown = false;
        try {
          RaidNode.doRaid(conf, fileSys.getFileStatus(srcDir),
              new Path(curCodec.parityDirectory), curCodec,
              new RaidNode.Statistics(),
              RaidUtils.NULL_PROGRESSABLE,
              false, 1, 1);
          // Should not reach.
        } catch (IOException e) {
          LOG.info("Expected exception caught" + e);
          expectedExceptionThrown = true;
        }
        assertTrue(expectedExceptionThrown);
      }
    }
    LOG.info("testTooManyErrorsEncode complete");
  } finally {
    myTearDown();
  }
}
项目:RDFS    文件:TestReadConstruction.java   
private boolean doThePartialTest(Codec codec,
                              int blockNum,
                              int[] corruptBlockIdxs) throws Exception {
  long blockSize = 8192L;
  int bufferSize = 4192;

  Path srcPath = new Path("/user/dikang/raidtest/file" + 
                          UUID.randomUUID().toString());

  long crc = TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 
      1, blockNum, blockSize);

  DistributedRaidFileSystem raidFs = getRaidFS();
  assertTrue(raidFs.exists(srcPath));

  // generate the parity files.
  doRaid(srcPath, codec);

  FileStatus file1Stat = fileSys.getFileStatus(srcPath);
  long length = file1Stat.getLen();
  LocatedBlocks file1Loc =
      RaidDFSUtil.getBlockLocations((DistributedFileSystem)fileSys, 
          srcPath.toUri().getPath(),
          0, length);
  // corrupt file1

  for (int idx: corruptBlockIdxs) {
    corruptBlock(file1Loc.get(idx).getBlock().getBlockName(), 
                              dfs);
  }
  RaidDFSUtil.reportCorruptBlocks((DistributedFileSystem)fileSys, srcPath,
                       corruptBlockIdxs, blockSize);

  // verify the partial read
  byte[] buffer = new byte[bufferSize];
  FSDataInputStream in = raidFs.open(srcPath);

  long numRead = 0;
  CRC32 newcrc = new CRC32();

  int num = 0;
  while (num >= 0) {
    num = in.read(numRead, buffer, 0, bufferSize);
    if (num < 0) {
      break;
    }
    numRead += num;
    newcrc.update(buffer, 0, num);
  }
  in.close();

  if (numRead != length) {
    LOG.info("Number of bytes read " + numRead +
             " does not match file size " + length);
    return false;
  }

  LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc);
  if (newcrc.getValue() != crc) {
    LOG.info("CRC mismatch of file " + srcPath.toUri().getPath() + ": " + 
              newcrc + " vs. " + crc);
    return false;
  }
  return true;
}
项目:mapreduce-fork    文件:RaidShell.java   
/**
 * checks the raided file system, prints a list of corrupt files to
 * System.out and returns the number of corrupt files
 */
public int fsck(final String path) throws IOException {

  FileSystem fs = (new Path(path)).getFileSystem(conf);

  // if we got a raid fs, get the underlying fs
  if (fs instanceof DistributedRaidFileSystem) {
    fs = ((DistributedRaidFileSystem) fs).getFileSystem();
  }

  // check that we have a distributed fs
  if (!(fs instanceof DistributedFileSystem)) {
    throw new IOException("expected DistributedFileSystem but got " +
              fs.getClass().getName());
  }
  final DistributedFileSystem dfs = (DistributedFileSystem) fs;

  // get conf settings
  String xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath();
  String rsPrefix = RaidNode.rsDestinationPath(conf).toUri().getPath();
  if (!xorPrefix.endsWith("/")) {
    xorPrefix = xorPrefix + "/";
  }
  if (!rsPrefix.endsWith("/")) {
    rsPrefix = rsPrefix + "/";
  }
  LOG.debug("prefixes: " + xorPrefix + ", " + rsPrefix);

  // get a list of corrupted files (not considering parity blocks just yet)
  // from the name node
  // these are the only files we need to consider:
  // if a file has no corrupted data blocks, it is OK even if some
  // of its parity blocks are corrupted, so no further checking is
  // necessary
  final String[] files = RaidDFSUtil.getCorruptFiles(dfs);
  final List<Path> corruptFileCandidates = new LinkedList<Path>();
  for (final String f: files) {
    final Path p = new Path(f);
    // if this file is a parity file
    // or if it does not start with the specified path,
    // ignore it
    if (!p.toString().startsWith(xorPrefix) &&
        !p.toString().startsWith(rsPrefix) &&
        p.toString().startsWith(path)) {
      corruptFileCandidates.add(p);
    }
  }
  // filter files marked for deletion
  RaidUtils.filterTrash(conf, corruptFileCandidates);

  int numberOfCorruptFiles = 0;

  for (final Path corruptFileCandidate: corruptFileCandidates) {
    if (isFileCorrupt(dfs, corruptFileCandidate)) {
      System.out.println(corruptFileCandidate.toString());
      numberOfCorruptFiles++;
    }
  }

  return numberOfCorruptFiles;
}
项目:mapreduce-fork    文件:BlockFixer.java   
/**
 * Reads through a corrupt source file fixing corrupt blocks on the way.
 * @param srcPath Path identifying the corrupt file.
 * @throws IOException
 * @return true if file has been fixed, false if no fixing 
 * was necessary or possible.
 */
boolean processCorruptFile(Path srcPath, RaidNode.ParityFilePair parityPair,
                           Decoder decoder, Progressable progress)
  throws IOException {
  LOG.info("Processing corrupt file " + srcPath);

  DistributedFileSystem srcFs = getDFS(srcPath);
  FileStatus srcStat = srcFs.getFileStatus(srcPath);
  long blockSize = srcStat.getBlockSize();
  long srcFileSize = srcStat.getLen();
  String uriPath = srcPath.toUri().getPath();

  int numBlocksFixed = 0;
  List<LocatedBlock> corrupt =
    RaidDFSUtil.corruptBlocksInFile(srcFs, uriPath, 0, srcFileSize);
  if (corrupt.size() == 0) {
    return false;
  }
  for (LocatedBlock lb: corrupt) {
    ExtendedBlock corruptBlock = lb.getBlock();
    long corruptOffset = lb.getStartOffset();

    LOG.info("Found corrupt block " + corruptBlock +
             ", offset " + corruptOffset);

    final long blockContentsSize =
      Math.min(blockSize, srcFileSize - corruptOffset);
    File localBlockFile =
      File.createTempFile(corruptBlock.getBlockName(), ".tmp");
    localBlockFile.deleteOnExit();

    try {
      decoder.recoverBlockToFile(srcFs, srcPath, parityPair.getFileSystem(),
                                 parityPair.getPath(), blockSize,
                                 corruptOffset, localBlockFile,
                                 blockContentsSize);

      // We have a the contents of the block, send them.
      DatanodeInfo datanode = chooseDatanode(lb.getLocations());
      computeMetadataAndSendFixedBlock(datanode, localBlockFile,
                                      lb, blockContentsSize);
      numBlocksFixed++;
    } finally {
      localBlockFile.delete();
    }
    progress.progress();
  }
  LOG.info("Fixed " + numBlocksFixed + " blocks in " + srcPath);
  return true;
}
项目:mapreduce-fork    文件:BlockFixer.java   
/**
 * Fixes corrupt blocks in a parity file.
 * This function uses the corresponding source file to regenerate parity
 * file blocks.
 * @return true if file has been fixed, false if no fixing 
 * was necessary or possible.
 */
boolean processCorruptParityFile(Path parityPath, Encoder encoder,
                                 Progressable progress)
  throws IOException {
  LOG.info("Processing corrupt file " + parityPath);
  Path srcPath = sourcePathFromParityPath(parityPath);
  if (srcPath == null) {
    LOG.warn("Unusable parity file " + parityPath);
    return false;
  }

  DistributedFileSystem parityFs = getDFS(parityPath);
  FileStatus parityStat = parityFs.getFileStatus(parityPath);
  long blockSize = parityStat.getBlockSize();
  long parityFileSize = parityStat.getLen();
  FileStatus srcStat = getDFS(srcPath).getFileStatus(srcPath);
  long srcFileSize = srcStat.getLen();

  // Check timestamp.
  if (srcStat.getModificationTime() != parityStat.getModificationTime()) {
    LOG.info("Mismatching timestamp for " + srcPath + " and " + parityPath +
             ", moving on...");
    return false;
  }

  String uriPath = parityPath.toUri().getPath();
  int numBlocksFixed = 0;
  List<LocatedBlock> corrupt =
    RaidDFSUtil.corruptBlocksInFile(parityFs, uriPath, 0, parityFileSize);
  if (corrupt.size() == 0) {
    return false;
  }
  for (LocatedBlock lb: corrupt) {
    ExtendedBlock corruptBlock = lb.getBlock();
    long corruptOffset = lb.getStartOffset();

    LOG.info("Found corrupt block " + corruptBlock +
             ", offset " + corruptOffset);

    File localBlockFile =
      File.createTempFile(corruptBlock.getBlockName(), ".tmp");
    localBlockFile.deleteOnExit();

    try {
      encoder.recoverParityBlockToFile(parityFs, srcPath, srcFileSize,
                                       blockSize, parityPath,
                                       corruptOffset, localBlockFile);
      // We have a the contents of the block, send them.
      DatanodeInfo datanode = chooseDatanode(lb.getLocations());
      computeMetadataAndSendFixedBlock(datanode, localBlockFile, lb,
                                      blockSize);

      numBlocksFixed++;
    } finally {
      localBlockFile.delete();
    }
    progress.progress();
  }
  LOG.info("Fixed " + numBlocksFixed + " blocks in " + parityPath);
  return true;
}
项目:mapreduce-fork    文件:BlockFixer.java   
/**
 * Reads through a parity HAR part file, fixing corrupt blocks on the way.
 * A HAR block can contain many file blocks, as long as the HAR part file
 * block size is a multiple of the file block size.
 * @return true if file has been fixed, false if no fixing 
 * was necessary or possible.
 */
boolean processCorruptParityHarPartFile(Path partFile,
                                        Progressable progress)
  throws IOException {
  LOG.info("Processing corrupt file " + partFile);
  // Get some basic information.
  DistributedFileSystem dfs = getDFS(partFile);
  FileStatus partFileStat = dfs.getFileStatus(partFile);
  long partFileSize = partFileStat.getLen();
  long partFileBlockSize = partFileStat.getBlockSize();
  LOG.info(partFile + " has block size " + partFileBlockSize);

  // Find the path to the index file.
  // Parity file HARs are only one level deep, so the index files is at the
  // same level as the part file.
  String harDirectory = partFile.toUri().getPath(); // Temporarily.
  harDirectory =
    harDirectory.substring(0, harDirectory.lastIndexOf(Path.SEPARATOR));
  Path indexFile = new Path(harDirectory + "/" + HarIndex.indexFileName);
  FileStatus indexStat = dfs.getFileStatus(indexFile);
  // Parses through the HAR index file.
  HarIndex harIndex = new HarIndex(dfs.open(indexFile), indexStat.getLen());

  String uriPath = partFile.toUri().getPath();
  int numBlocksFixed = 0;
  List<LocatedBlock> corrupt =
    RaidDFSUtil.corruptBlocksInFile(dfs, uriPath, 0, partFileSize);
  if (corrupt.size() == 0) {
    return false;
  }
  for (LocatedBlock lb: corrupt) {
    ExtendedBlock corruptBlock = lb.getBlock();
    long corruptOffset = lb.getStartOffset();

    File localBlockFile =
      File.createTempFile(corruptBlock.getBlockName(), ".tmp");
    localBlockFile.deleteOnExit();
    processCorruptParityHarPartBlock(dfs, partFile, corruptBlock,
                                     corruptOffset, partFileStat, harIndex,
                                     localBlockFile, progress);
    // Now we have recovered the part file block locally, send it.
    try {
      DatanodeInfo datanode = chooseDatanode(lb.getLocations());
      computeMetadataAndSendFixedBlock(datanode, localBlockFile,
                                      lb, localBlockFile.length());
      numBlocksFixed++;
    } finally {
      localBlockFile.delete();
    }
    progress.progress();
  }
  LOG.info("Fixed " + numBlocksFixed + " blocks in " + partFile);
  return true;
}
项目:mapreduce-fork    文件:TestBlockFixer.java   
/**
 * Create a file with three stripes, corrupt a block each in two stripes,
 * and wait for the the file to be fixed.
 */
protected void implBlockFix(boolean local) throws Exception {
  LOG.info("Test testBlockFix started.");
  long blockSize = 8192L;
  int stripeLength = 3;
  mySetup(stripeLength, -1); // never har
  Path file1 = new Path("/user/dhruba/raidtest/file1");
  Path destPath = new Path("/destraid/user/dhruba/raidtest");
  long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
                                                        1, 7, blockSize);
  long file1Len = fileSys.getFileStatus(file1).getLen();
  LOG.info("Test testBlockFix created test files");

  // create an instance of the RaidNode
  Configuration localConf = new Configuration(conf);
  localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
  localConf.setInt("raid.blockfix.interval", 1000);
  if (local) {
    localConf.set("raid.blockfix.classname",
                  "org.apache.hadoop.raid.LocalBlockFixer");
  } else {
    localConf.set("raid.blockfix.classname",
                  "org.apache.hadoop.raid.DistBlockFixer");
  }
  localConf.setLong("raid.blockfix.filespertask", 2L);

  try {
    cnode = RaidNode.createRaidNode(null, localConf);
    TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
    cnode.stop(); cnode.join();

    FileStatus srcStat = fileSys.getFileStatus(file1);
    DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
    LocatedBlocks locs = RaidDFSUtil.getBlockLocations(
      dfs, file1.toUri().getPath(), 0, srcStat.getLen());

    String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
    assertEquals("no corrupt files expected", 0, corruptFiles.length);
    assertEquals("filesFixed() should return 0 before fixing files",
                 0, cnode.blockFixer.filesFixed());

    // Corrupt blocks in two different stripes. We can fix them.
    int[] corruptBlockIdxs = new int[]{0, 4, 6};
    for (int idx: corruptBlockIdxs)
      corruptBlock(locs.get(idx).getBlock());
    reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);

    corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
    assertEquals("file not corrupted", 1, corruptFiles.length);
    assertEquals("wrong file corrupted",
                 corruptFiles[0], file1.toUri().getPath());
    assertEquals("wrong number of corrupt blocks", 3,
      RaidDFSUtil.corruptBlocksInFile(dfs, file1.toUri().getPath(), 0,
        srcStat.getLen()).size());

    cnode = RaidNode.createRaidNode(null, localConf);
    long start = System.currentTimeMillis();
    while (cnode.blockFixer.filesFixed() < 1 &&
           System.currentTimeMillis() - start < 120000) {
      LOG.info("Test testBlockFix waiting for files to be fixed.");
      Thread.sleep(1000);
    }
    assertEquals("file not fixed", 1, cnode.blockFixer.filesFixed());

    dfs = getDFS(conf, dfs);
    assertTrue("file not fixed",
               TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));

  } catch (Exception e) {
    LOG.info("Test testBlockFix Exception " + e +
             StringUtils.stringifyException(e));
    throw e;
  } finally {
    myTearDown();
  }
  LOG.info("Test testBlockFix completed.");
}
项目:mapreduce-fork    文件:TestReedSolomonDecoder.java   
public void testDecoder() throws Exception {
  mySetup();
  int stripeSize = 10;
  int paritySize = 4;
  long blockSize = 8192;
  Path file1 = new Path("/user/raidtest/file1");
  Path recoveredFile1 = new Path("/user/raidtest/file1.recovered");
  Path parityFile1 = new Path("/rsraid/user/raidtest/file1");
  long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
                                                        1, 25, blockSize);
  FileStatus file1Stat = fileSys.getFileStatus(file1);

  conf.setInt("raid.rsdecoder.bufsize", 512);
  conf.setInt("raid.rsencoder.bufsize", 512);

  try {
    // First encode the file.
    ReedSolomonEncoder encoder = new ReedSolomonEncoder(
      conf, stripeSize, paritySize);
    short parityRepl = 1;
    encoder.encodeFile(fileSys, file1, fileSys, parityFile1, parityRepl,
      Reporter.NULL);

    // Ensure there are no corrupt files yet.
    DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
    String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
    assertEquals(corruptFiles.length, 0);

    // Now corrupt the file.
    long corruptOffset = blockSize * 5;
    FileStatus srcStat = fileSys.getFileStatus(file1);
    LocatedBlocks locations = RaidDFSUtil.getBlockLocations(dfs,
        file1.toUri().getPath(), 0, srcStat.getLen());
    corruptBlock(locations.get(5).getBlock());
    corruptBlock(locations.get(6).getBlock());
    TestBlockFixer.reportCorruptBlocks(dfs, file1, new int[]{5, 6},
        srcStat.getBlockSize());

    // Ensure file is corrupted.
    corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
    assertEquals(corruptFiles.length, 1);
    assertEquals(corruptFiles[0], file1.toString());

    // Fix the file.
    ReedSolomonDecoder decoder = new ReedSolomonDecoder(
      conf, stripeSize, paritySize);
    decoder.decodeFile(fileSys, file1, fileSys, parityFile1,
              corruptOffset, recoveredFile1);
    assertTrue(TestRaidDfs.validateFile(
                  fileSys, recoveredFile1, file1Stat.getLen(), crc1));
  } finally {
    myTearDown();
  }
}