public void testSmallFileDirectory() throws Exception { mySetup(); int stripeLength = 4; long blockSize = 8192L; try { for (String code: RaidDFSUtil.codes) { LOG.info("testSmallFileDirectory: Test code " + code); Path sourceDir = new Path("/user/raid"); validateMultipleFiles(code, fileSys, sourceDir, stripeLength, new long[]{1000L, 4000L, 1000L}, blockSize, 4096L); validateMultipleFiles(code, fileSys, sourceDir, stripeLength, new long[]{2000L, 3000L, 2000L, 3000L}, blockSize, 3072L); validateMultipleFiles(code, fileSys, sourceDir, stripeLength, new long[]{3000L, 3000L, 3000L, 3000L}, blockSize, 3072L); validateMultipleFiles(code, fileSys, sourceDir, stripeLength, new long[]{511L, 3584L, 3000L, 1234L, 512L, 1234L, 3000L, 3234L, 511L}, blockSize, 3584L); } } finally { myTearDown(); } }
public void testDifferentBlockSizeFileDirectory() throws Exception { mySetup(); int stripeLength = 3; long blockSize = 8192L; try { for (String code: RaidDFSUtil.codes) { LOG.info("testDifferentBlockSizeFileDirectory: Test code " + code); Path sourceDir = new Path("/user/raid"); validateMultipleFiles(code, fileSys, sourceDir, stripeLength, new long[] {1000, blockSize, 2*blockSize, 2*blockSize + 1}, new long[] {blockSize, blockSize, 2*blockSize, blockSize}, 2*blockSize); validateMultipleFiles(code, fileSys, sourceDir, stripeLength, new long[] {blockSize, 2*blockSize, 3*blockSize, 4*blockSize}, new long[] {blockSize, 2*blockSize, 3*blockSize, blockSize}, 3*blockSize); validateMultipleFiles(code, fileSys, sourceDir, stripeLength, new long[] {blockSize+1, 9*blockSize+1, 2*blockSize+1, blockSize+1}, new long[]{blockSize, 2*blockSize, 3*blockSize, blockSize}, 2*blockSize+512); } } finally { myTearDown(); } }
/** * gets a list of corrupt files from the name node * and filters out files that are currently being fixed or * that were recently fixed */ private List<Path> getCorruptFiles() throws IOException { DistributedFileSystem dfs = (DistributedFileSystem) (new Path("/")).getFileSystem(getConf()); String[] files = RaidDFSUtil.getCorruptFiles(dfs); List<Path> corruptFiles = new LinkedList<Path>(); for (String f: files) { Path p = new Path(f); // filter out files that are being fixed or that were recently fixed if (!fileIndex.containsKey(p.toString())) { corruptFiles.add(p); } } RaidUtils.filterTrash(getConf(), corruptFiles); return corruptFiles; }
private void waitForCorruptBlocks( int numCorruptBlocks, DistributedFileSystem dfs, Path file) throws Exception { String path = file.toUri().getPath(); FileStatus stat = dfs.getFileStatus(file); long start = System.currentTimeMillis(); long actual = 0; do { actual = RaidDFSUtil.corruptBlocksInFile( dfs, path, 0, stat.getLen()).size(); if (actual == numCorruptBlocks) break; if (System.currentTimeMillis() - start > 120000) break; LOG.info("Waiting for " + numCorruptBlocks + " corrupt blocks in " + path + ", found " + actual); Thread.sleep(1000); } while (true); assertEquals(numCorruptBlocks, actual); }
/** * sleeps for up to 20s until the number of corrupt files * in the file system is equal to the number specified */ private void waitUntilCorruptFileCount(DistributedFileSystem dfs, int corruptFiles) throws IOException { long waitStart = System.currentTimeMillis(); while (RaidDFSUtil.getCorruptFiles(dfs).length != corruptFiles) { try { Thread.sleep(1000); } catch (InterruptedException ignore) { } if (System.currentTimeMillis() > waitStart + 20000L) { break; } } int corruptFilesFound = RaidDFSUtil.getCorruptFiles(dfs).length; if (corruptFilesFound != corruptFiles) { throw new IOException("expected " + corruptFiles + " corrupt files but got " + corruptFilesFound); } }
/** * corrupt a block in a raided file, and make sure it will be shown in the * Raid missing blocks queue. */ @Test public void testCorruptBlocks() throws IOException { MiniDFSCluster cluster = null; Configuration conf = new Configuration(); try { cluster = new MiniDFSCluster(conf, 3, true, null); DistributedFileSystem dfs = DFSUtil.convertToDFS(cluster.getFileSystem()); String filePath = "/test/file1"; RaidDFSUtil.constructFakeRaidFile(dfs, filePath, RaidCodec.getCodec("rs")); FileStatus stat = dfs.getFileStatus(new Path(filePath)); LocatedBlocks blocks = dfs.getClient(). getLocatedBlocks(filePath, 0, stat.getLen()); Block block = blocks.getLocatedBlocks().get(0).getBlock(); DFSTestUtil.corruptBlock(block, cluster); RaidDFSUtil.reportCorruptBlocksToNN(dfs, new LocatedBlock[] {blocks.getLocatedBlocks().get(0)}); final FSNamesystem namesystem = cluster.getNameNode().namesystem; assertEquals(1, namesystem.getRaidMissingBlocksCount()); // one raid missing blocks; assertEquals(0, namesystem.getMissingBlocksCount()); // zero non-raid missing blocks; } finally { if (cluster != null) { cluster.shutdown(); } } }
public void testIdenticalBlockSizeFileDirectory() throws Exception { mySetup(); int stripeLength = 4; long blockSize = 8192L; try { for (String code: RaidDFSUtil.codes) { LOG.info("testIdenticalBlockSizeFileDirectory: Test code " + code); Path sourceDir = new Path("/user/raid"); validateMultipleFiles(code, fileSys, sourceDir, stripeLength, new long[] {1000L, blockSize, 2*blockSize, 4000L}, blockSize, blockSize); validateMultipleFiles(code, fileSys, sourceDir, stripeLength, new long[] {blockSize, 2*blockSize, 3*blockSize, 4*blockSize}, blockSize, blockSize); int halfBlock = (int)blockSize/2; validateMultipleFiles(code, fileSys, sourceDir, stripeLength, new long[] {blockSize + halfBlock, 2*blockSize + halfBlock, 3*blockSize + halfBlock, 4*blockSize + halfBlock}, blockSize, blockSize); validateMultipleFiles(code, fileSys, sourceDir, stripeLength, new long[] {blockSize+1, 9*blockSize+1, 2*blockSize+1, 3*blockSize+1}, blockSize, blockSize); } } finally { myTearDown(); } }
public void testVerifySourceFile() throws Exception { mySetup(); try { Path srcPath = new Path("/user/dikang/raidtest/file0"); int numBlocks = 8; TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, numBlocks, 8192L); assertTrue(fileSys.exists(srcPath)); Codec codec = Codec.getCodec("rs"); FileStatus stat = fileSys.getFileStatus(srcPath); // verify good file assertTrue(FastFileCheck.checkFile(conf, (DistributedFileSystem)fileSys, fileSys, srcPath, null, codec, RaidUtils.NULL_PROGRESSABLE, true)); // verify bad file LocatedBlocks fileLoc = RaidDFSUtil.getBlockLocations((DistributedFileSystem)fileSys, srcPath.toUri().getPath(), 0, stat.getLen()); // corrupt file1 Random rand = new Random(); int idx = rand.nextInt(numBlocks); TestRaidDfs.corruptBlock(srcPath, fileLoc.getLocatedBlocks().get(idx).getBlock(), NUM_DATANODES, true, dfs); assertFalse(FastFileCheck.checkFile(conf, (DistributedFileSystem)fileSys, fileSys, srcPath, null, codec, RaidUtils.NULL_PROGRESSABLE, true)); } finally { stopCluster(); } }
/** * Returns the corrupt blocks in a file. */ List<LocatedBlock> corruptBlocksInFile(DistributedFileSystem fs, String uriPath, FileStatus stat) throws IOException { List<LocatedBlock> corrupt = new LinkedList<LocatedBlock>(); LocatedBlocks locatedBlocks = RaidDFSUtil.getBlockLocations(fs, uriPath, 0, stat.getLen()); for (LocatedBlock b: locatedBlocks.getLocatedBlocks()) { if (b.isCorrupt() || (b.getLocations().length == 0 && b.getBlockSize() > 0)) { corrupt.add(b); } } return corrupt; }
/** * @return A list of corrupt files as obtained from the namenode */ List<Path> getCorruptFiles() throws IOException { DistributedFileSystem dfs = helper.getDFS(new Path("/")); String[] files = RaidDFSUtil.getCorruptFiles(dfs); List<Path> corruptFiles = new LinkedList<Path>(); for (String f: files) { Path p = new Path(f); if (!history.containsKey(p.toString())) { corruptFiles.add(p); } } RaidUtils.filterTrash(getConf(), corruptFiles); return corruptFiles; }
/** * Take down a datanode to generate raid missing blocks, and then bring it back * will restore the missing blocks. */ @Test public void testRaidMissingBlocksByTakingDownDataNode() throws IOException, InterruptedException { MiniDFSCluster cluster = null; Configuration conf = new Configuration(); try { cluster = new MiniDFSCluster(conf, 1, true, null); final FSNamesystem namesystem = cluster.getNameNode().namesystem; final DistributedFileSystem dfs = DFSUtil.convertToDFS(cluster.getFileSystem()); String filePath = "/test/file1"; RaidCodec rsCodec = RaidCodec.getCodec("rs"); RaidDFSUtil.constructFakeRaidFile(dfs, filePath, rsCodec); DatanodeDescriptor[] datanodes = (DatanodeDescriptor[]) namesystem.heartbeats.toArray( new DatanodeDescriptor[1]); assertEquals(1, datanodes.length); // shutdown the datanode DataNodeProperties dnprop = shutdownDataNode(cluster, datanodes[0]); assertEquals(rsCodec.numStripeBlocks, namesystem.getRaidMissingBlocksCount()); assertEquals(0, namesystem.getMissingBlocksCount()); // zero non-raid missing block assertEquals(0, namesystem.getNonCorruptUnderReplicatedBlocks()); // bring up the datanode cluster.restartDataNode(dnprop); // Wait for block report LOG.info("wait for its block report to come in"); NumberReplicas num; FileStatus stat = dfs.getFileStatus(new Path(filePath)); LocatedBlocks blocks = dfs.getClient(). getLocatedBlocks(filePath, 0, stat.getLen()); long startTime = System.currentTimeMillis(); do { Thread.sleep(1000); int totalCount = 0; namesystem.readLock(); try { for (LocatedBlock block : blocks.getLocatedBlocks()) { num = namesystem.countNodes(block.getBlock()); totalCount += num.liveReplicas(); } if (totalCount == rsCodec.numDataBlocks) { break; } else { LOG.info("wait for block report, received total replicas: " + totalCount); } } finally { namesystem.readUnlock(); } } while (System.currentTimeMillis() - startTime < 30000); assertEquals(0, namesystem.getRaidMissingBlocksCount()); assertEquals(0, namesystem.getMissingBlocksCount()); // zero non-raid missing block assertEquals(0, namesystem.getNonCorruptUnderReplicatedBlocks()); } finally { if (cluster != null) { cluster.shutdown(); } } }
/** * Corrupt a parity file and wait for it to get fixed. */ private void implParityBlockFix(String testName, boolean local) throws Exception { LOG.info("Test " + testName + " started."); int stripeLength = 3; mySetup(stripeLength); long[] crcs = new long[3]; int[] seeds = new int[3]; Path dirPath = new Path("/user/dhruba/raidtest"); Path[] files = TestRaidDfs.createTestFiles(dirPath, fileSizes, blockSizes, crcs, seeds, fileSys, (short)1); Path destPath = new Path("/destraid/user/dhruba"); Path parityFile = new Path("/destraid/user/dhruba/raidtest"); LOG.info("Test " + testName + " created test files"); Configuration localConf = this.getRaidNodeConfig(conf, local); try { cnode = RaidNode.createRaidNode(null, localConf); TestRaidDfs.waitForDirRaided(LOG, fileSys, dirPath, destPath); cnode.stop(); cnode.join(); long parityCRC = RaidDFSUtil.getCRC(fileSys, parityFile); FileStatus parityStat = fileSys.getFileStatus(parityFile); DistributedFileSystem dfs = (DistributedFileSystem)fileSys; LocatedBlocks locs = RaidDFSUtil.getBlockLocations( dfs, parityFile.toUri().getPath(), 0, parityStat.getLen()); String[] corruptFiles = DFSUtil.getCorruptFiles(dfs); assertEquals("no corrupt files expected", 0, corruptFiles.length); assertEquals("filesFixed() should return 0 before fixing files", 0, cnode.blockIntegrityMonitor.getNumFilesFixed()); // Corrupt parity blocks for different stripes. int[] corruptBlockIdxs = new int[]{0, 1, 2}; for (int idx: corruptBlockIdxs) corruptBlock(locs.get(idx).getBlock(), dfsCluster); RaidDFSUtil.reportCorruptBlocks(dfs, parityFile, corruptBlockIdxs, 2*blockSize); corruptFiles = DFSUtil.getCorruptFiles(dfs); assertEquals("file not corrupted", 1, corruptFiles.length); assertEquals("wrong file corrupted", corruptFiles[0], parityFile.toUri().getPath()); cnode = RaidNode.createRaidNode(null, localConf); long start = System.currentTimeMillis(); while (cnode.blockIntegrityMonitor.getNumFilesFixed() < 1 && System.currentTimeMillis() - start < 120000) { LOG.info("Test " + testName + " waiting for files to be fixed."); Thread.sleep(3000); } TestBlockFixer.verifyMetrics(fileSys, cnode, local, 1L, corruptBlockIdxs.length); long checkCRC = RaidDFSUtil.getCRC(fileSys, parityFile); assertEquals("file not fixed", parityCRC, checkCRC); } catch (Exception e) { LOG.info("Test " + testName + " Exception " + e + StringUtils.stringifyException(e)); throw e; } finally { myTearDown(); } LOG.info("Test " + testName + " completed."); }
public void testMultiplePriorities() throws Exception { LOG.info("Test testMultiplePriorities started."); Path srcFile = new Path("/home/test/file1"); int repl = 1; int numBlocks = 8; long blockSize = 16384; int stripeLength = 3; Path destPath = new Path("/destraidrs"); mySetup(stripeLength, -1); // never har Codec codec = Codec.getCodec("rs"); LOG.info("Starting testMultiplePriorities"); try { // Create test file and raid it. TestRaidDfs.createTestFilePartialLastBlock( fileSys, srcFile, repl, numBlocks, blockSize); FileStatus stat = fileSys.getFileStatus(srcFile); RaidNode.doRaid(conf, stat, destPath, codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, repl, repl); // Corrupt first block of file. int blockIdxToCorrupt = 1; LOG.info("Corrupt block " + blockIdxToCorrupt + " of file " + srcFile); LocatedBlocks locations = getBlockLocations(srcFile, stat.getLen()); corruptBlock(locations.get(blockIdxToCorrupt).getBlock(), dfsCluster); RaidDFSUtil.reportCorruptBlocks(fileSys, srcFile, new int[]{1}, blockSize); // Create Block Fixer and fix. FakeDistBlockIntegrityMonitor distBlockFixer = new FakeDistBlockIntegrityMonitor(conf); assertEquals(0, distBlockFixer.submittedJobs.size()); // waiting for one job to submit long startTime = System.currentTimeMillis(); while (System.currentTimeMillis() - startTime < 120000 && distBlockFixer.submittedJobs.size() == 0) { distBlockFixer.getCorruptionMonitor().checkAndReconstructBlocks(); LOG.info("Waiting for jobs to submit"); Thread.sleep(10000); } int submittedJob = distBlockFixer.submittedJobs.size(); LOG.info("Already Submitted " + submittedJob + " jobs"); assertTrue("Should submit more than 1 jobs", submittedJob >= 1); // Corrupt one more block. blockIdxToCorrupt = 4; LOG.info("Corrupt block " + blockIdxToCorrupt + " of file " + srcFile); locations = getBlockLocations(srcFile, stat.getLen()); corruptBlock(locations.get(blockIdxToCorrupt).getBlock(), dfsCluster); RaidDFSUtil.reportCorruptBlocks(fileSys, srcFile, new int[]{4}, blockSize); // A new job should be submitted since two blocks are corrupt. startTime = System.currentTimeMillis(); while (System.currentTimeMillis() - startTime < 120000 && distBlockFixer.submittedJobs.size() == submittedJob) { distBlockFixer.getCorruptionMonitor().checkAndReconstructBlocks(); LOG.info("Waiting for more jobs to submit"); Thread.sleep(10000); } LOG.info("Already Submitted " + distBlockFixer.submittedJobs.size() + " jobs"); assertTrue("Should submit more than 1 jobs", distBlockFixer.submittedJobs.size() - submittedJob >= 1); } finally { myTearDown(); } }
private void validateSingleFile(String code, FileSystem fileSys, Path sourceDir, int stripeLength, int blockNum, boolean lastPartial) throws Exception { LOG.info("Test file with " + blockNum + " blocks and " + (lastPartial? "partial": "full") + " last block"); Codec codec = loadTestCodecs(code, stripeLength, true); Path parityDir = new Path(codec.parityDirectory); RaidDFSUtil.cleanUp(fileSys, sourceDir); RaidDFSUtil.cleanUp(fileSys, parityDir); fileSys.mkdirs(sourceDir); Path file1 = new Path(sourceDir, "file1"); if (!lastPartial) { TestRaidDfs.createTestFile(fileSys, file1, 2, blockNum, 8192L); } else { TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1, 2, blockNum, 8192L); } Path parityFile = RaidNode.getOriginalParityFile(parityDir, sourceDir); // Do directory level raid LOG.info("Create a directory-raid parity file " + parityFile); assertTrue("Cannot raid directory " + sourceDir, doRaid(conf, fileSys, sourceDir, codec)); assertEquals("Modification time should be the same", fileSys.getFileStatus(sourceDir).getModificationTime(), fileSys.getFileStatus(parityFile).getModificationTime()); assertEquals("Replica num of source file should be reduced to 1", fileSys.getFileStatus(file1).getReplication(), 1); assertEquals("Replica num of parity file should be reduced to 1", fileSys.getFileStatus(parityFile).getReplication(), 1); long dirCRC = RaidDFSUtil.getCRC(fileSys, parityFile); long dirLen = fileSys.getFileStatus(parityFile).getLen(); // remove the parity dir RaidDFSUtil.cleanUp(fileSys, parityDir); codec = loadTestCodecs(code, stripeLength, false); Path parityFile1 = RaidNode.getOriginalParityFile(parityDir, file1); LOG.info("Create a file-raid parity file " + parityFile1); assertTrue("Cannot raid file " + file1, doRaid(conf, fileSys, file1, codec)); assertTrue("Parity file doesn't match when the file has " + blockNum + " blocks ", TestRaidDfs.validateFile(fileSys, parityFile1, dirLen, dirCRC)); }
public void testOneFileDirectory() throws Exception { mySetup(); int stripeLength = 4; try { for (String code: RaidDFSUtil.codes) { LOG.info("testOneFileDirectory: Test code " + code); Codec codec = loadTestCodecs(code, stripeLength, true); Path sourceDir = new Path("/user/raid", code); assertTrue(fileSys.mkdirs(sourceDir)); Path twoBlockFile = new Path(sourceDir, "twoBlockFile");; LOG.info("Test one file with 2 blocks"); TestRaidDfs.createTestFile(fileSys, twoBlockFile, 2, 2, 8192L); assertTrue(fileSys.exists(twoBlockFile)); assertFalse("Not enough blocks in the directory", RaidNode.doRaid(conf, fileSys.getFileStatus(sourceDir), new Path(codec.parityDirectory), codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, 1, 1)); fileSys.delete(twoBlockFile, true); LOG.info("Test one file with blocks less than one stripe"); validateSingleFile(code, fileSys, sourceDir, stripeLength, 3, false); validateSingleFile(code, fileSys, sourceDir, stripeLength, 3, true); LOG.info("Test one file with one stripe blocks"); validateSingleFile(code, fileSys, sourceDir, stripeLength, stripeLength, false); validateSingleFile(code, fileSys, sourceDir, stripeLength, stripeLength, true); LOG.info("Test one file with more than one stripe blocks"); validateSingleFile(code, fileSys, sourceDir, stripeLength, stripeLength + 2, false); validateSingleFile(code, fileSys, sourceDir, stripeLength, stripeLength + 2, true); } } finally { myTearDown(); } }
public void verifyDecoder(String code, int parallelism) throws Exception { Codec codec = Codec.getCodec(code); conf.setInt("raid.encoder.parallelism", parallelism); ConfigBuilder cb = new ConfigBuilder(CONFIG_FILE); cb.addPolicy("RaidTest1", "/user/dikang/raidtest/file" + code + parallelism, 1, 1, code); cb.persist(); Path srcPath = new Path("/user/dikang/raidtest/file" + code + parallelism + "/file1"); long blockSize = 8192 * 1024L; long crc = TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, 7, blockSize); doRaid(srcPath, codec); FileStatus srcStat = fileSys.getFileStatus(srcPath); ParityFilePair pair = ParityFilePair.getParityFile(codec, srcStat, conf); FileStatus file1Stat = fileSys.getFileStatus(srcPath); long length = file1Stat.getLen(); LocatedBlocks file1Loc = RaidDFSUtil.getBlockLocations((DistributedFileSystem)fileSys, srcPath.toUri().getPath(), 0, length); // corrupt file int[] corruptBlockIdxs = new int[] {5}; long errorOffset = 5 * blockSize; for (int idx: corruptBlockIdxs) { TestBlockFixer.corruptBlock(file1Loc.get(idx).getBlock(), dfsCluster); } RaidDFSUtil.reportCorruptBlocks((DistributedFileSystem)fileSys, srcPath, corruptBlockIdxs, blockSize); Decoder decoder = new Decoder(conf, codec); ByteArrayOutputStream out = new ByteArrayOutputStream(); decoder.codec.simulateBlockFix = true; CRC32 oldCRC = decoder.fixErasedBlock(fileSys, srcStat, fileSys, pair.getPath(), true, blockSize, errorOffset, blockSize, false, out, null, null, false); decoder.codec.simulateBlockFix = false; out = new ByteArrayOutputStream(); decoder.fixErasedBlock(fileSys, srcStat, fileSys, pair.getPath(), true, blockSize, errorOffset, blockSize, false, out, null, null, false); // calculate the new crc CRC32 newCRC = new CRC32(); byte[] constructedBytes = out.toByteArray(); newCRC.update(constructedBytes); assertEquals(oldCRC.getValue(), newCRC.getValue()); }
public void testParityHarBadBlockFixer() throws Exception { LOG.info("Test testParityHarBlockFix started."); long blockSize = 8192L; int stripeLength = 3; mySetup(stripeLength, -1, "org.apache.hadoop.raid.BadXORCode", "org.apache.hadoop.raid.BadReedSolomonCode", "rs", true); Path file1 = new Path("/user/dhruba/raidtest/file1"); // Parity file will have 7 blocks. long crc = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1, 1, 20, blockSize); LOG.info("Created test files"); // create an instance of the RaidNode Configuration localConf = new Configuration(conf); localConf.setInt(RaidNode.RAID_PARITY_HAR_THRESHOLD_DAYS_KEY, 0); localConf.set("raid.blockfix.classname", "org.apache.hadoop.raid.DistBlockIntegrityMonitor"); localConf.setLong("raid.blockfix.filespertask", 2L); try { cnode = RaidNode.createRaidNode(null, localConf); Path harDirectory = new Path("/destraidrs/user/dhruba/raidtest/raidtest" + RaidNode.HAR_SUFFIX); long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < 1000 * 120) { if (fileSys.exists(harDirectory)) { break; } LOG.info("Waiting for har"); Thread.sleep(1000); } assertEquals(true, fileSys.exists(harDirectory)); cnode.stop(); cnode.join(); DistributedFileSystem dfs = (DistributedFileSystem)fileSys; String[] corruptFiles = DFSUtil.getCorruptFiles(dfs); assertEquals("no corrupt files expected", 0, corruptFiles.length); // Corrupt source blocks FileStatus stat = fileSys.getFileStatus(file1); LocatedBlocks locs = RaidDFSUtil.getBlockLocations( dfs, file1.toUri().getPath(), 0, stat.getLen()); int[] corruptBlockIdxs = new int[]{0}; for (int idx: corruptBlockIdxs) { TestBlockFixer.corruptBlock(locs.get(idx).getBlock(), dfsCluster); } RaidDFSUtil.reportCorruptBlocks(dfs, file1, corruptBlockIdxs, stat.getBlockSize()); corruptFiles = DFSUtil.getCorruptFiles(dfs); assertEquals("file not corrupted", 1, corruptFiles.length); assertEquals("wrong file corrupted", corruptFiles[0], file1.toUri().getPath()); cnode = RaidNode.createRaidNode(null, localConf); start = System.currentTimeMillis(); while (cnode.blockIntegrityMonitor.getNumFilesFixed() < 1 && System.currentTimeMillis() - start < 120000) { LOG.info("Waiting for files to be fixed."); Thread.sleep(1000); } long checkCRC = RaidDFSUtil.getCRC(fileSys, file1); assertEquals("file not fixed", crc, checkCRC); // Verify the counters are right long expectedNumFailures = corruptBlockIdxs.length; assertEquals(expectedNumFailures, cnode.blockIntegrityMonitor.getNumBlockFixSimulationFailures()); assertEquals(0, cnode.blockIntegrityMonitor.getNumBlockFixSimulationSuccess()); } catch (Exception e) { LOG.info("Exception ", e); throw e; } finally { myTearDown(); } LOG.info("Test testParityHarBlockFix completed."); }
public void testTooManyErrorsDecode() throws Exception { LOG.info("testTooManyErrorsDecode start"); long blockSize = 8192L; stripeLength = 3; mySetup("xor", 1); long[][] fsizes = {{2000L, 3000L, 2000L}, {blockSize + 1, blockSize + 1}, {2*blockSize, blockSize + blockSize/2}}; long[][] bsizes = {{blockSize, blockSize, blockSize}, {blockSize, blockSize}, {2*blockSize, blockSize}}; Integer[][] corrupts = {{0, 1}, {0, 2}, {1, 2}}; try { for (String code: RaidDFSUtil.codes) { Codec curCodec = Codec.getCodec(code); Path srcDir = new Path("/user/dhruba/" + code); for (int i = 0; i < corrupts.length; i++) { for (int j = 0; j < fsizes.length; j++) { long[] crcs = new long[fsizes[j].length]; int[] seeds = new int[fsizes[j].length]; Path parityDir = new Path(codec.parityDirectory); RaidDFSUtil.cleanUp(fileSys, srcDir); RaidDFSUtil.cleanUp(fileSys, parityDir); TestRaidDfs.createTestFiles(srcDir, fsizes[j], bsizes[j], crcs, seeds, fileSys, (short)1); assertTrue(RaidNode.doRaid(conf, fileSys.getFileStatus(srcDir), new Path(curCodec.parityDirectory), curCodec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, 1, 1)); boolean expectedExceptionThrown = false; try { corruptBlocksInDirectory(conf, srcDir, crcs, corrupts[i], fileSys, dfs, true, false); // Should not reach. } catch (IOException e) { LOG.info("Expected exception caught" + e); expectedExceptionThrown = true; } assertTrue(expectedExceptionThrown); } } } LOG.info("testTooManyErrorsDecode complete"); } finally { myTearDown(); } }
public void testTooManyErrorsEncode() throws Exception { LOG.info("testTooManyErrorsEncode complete"); stripeLength = 3; mySetup("xor", 1); // Encoding should fail when even one block is corrupt. Random rand = new Random(); try { for (String code: RaidDFSUtil.codes) { Codec curCodec = Codec.getCodec(code); Path srcDir = new Path("/user/dhruba/" + code); for (int j = 0; j < fileSizes.length; j++) { long[] crcs = new long[fileSizes[j].length]; int[] seeds = new int[fileSizes[j].length]; Path parityDir = new Path(codec.parityDirectory); RaidDFSUtil.cleanUp(fileSys, srcDir); RaidDFSUtil.cleanUp(fileSys, parityDir); TestRaidDfs.createTestFiles(srcDir, fileSizes[j], blockSizes[j], crcs, seeds, fileSys, (short)1); corruptBlocksInDirectory(conf, srcDir, crcs, new Integer[]{rand.nextInt() % 3}, fileSys, dfs, false, false); boolean expectedExceptionThrown = false; try { RaidNode.doRaid(conf, fileSys.getFileStatus(srcDir), new Path(curCodec.parityDirectory), curCodec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, 1, 1); // Should not reach. } catch (IOException e) { LOG.info("Expected exception caught" + e); expectedExceptionThrown = true; } assertTrue(expectedExceptionThrown); } } LOG.info("testTooManyErrorsEncode complete"); } finally { myTearDown(); } }
private boolean doThePartialTest(Codec codec, int blockNum, int[] corruptBlockIdxs) throws Exception { long blockSize = 8192 * 1024L; int bufferSize = 4192 * 1024; Path srcPath = new Path("/user/dikang/raidtest/file" + UUID.randomUUID().toString()); long crc = TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, blockNum, blockSize); DistributedRaidFileSystem raidFs = getRaidFS(); assertTrue(raidFs.exists(srcPath)); // generate the parity files. doRaid(srcPath, codec); FileStatus file1Stat = fileSys.getFileStatus(srcPath); long length = file1Stat.getLen(); LocatedBlocks file1Loc = RaidDFSUtil.getBlockLocations((DistributedFileSystem)fileSys, srcPath.toUri().getPath(), 0, length); // corrupt file1 for (int idx: corruptBlockIdxs) { corruptBlock(file1Loc.get(idx).getBlock(), dfs); } RaidDFSUtil.reportCorruptBlocks((DistributedFileSystem)fileSys, srcPath, corruptBlockIdxs, blockSize); // verify the partial read byte[] buffer = new byte[bufferSize]; FSDataInputStream in = raidFs.open(srcPath); long numRead = 0; CRC32 newcrc = new CRC32(); int num = 0; while (num >= 0) { num = in.read(numRead, buffer, 0, bufferSize); if (num < 0) { break; } numRead += num; newcrc.update(buffer, 0, num); } in.close(); if (numRead != length) { LOG.info("Number of bytes read " + numRead + " does not match file size " + length); return false; } LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc); if (newcrc.getValue() != crc) { LOG.info("CRC mismatch of file " + srcPath.toUri().getPath() + ": " + newcrc.getValue() + " vs. " + crc); return false; } return true; }
/** * Corrupt a parity file and wait for it to get fixed. */ private void implParityBlockFix(String testName, boolean local) throws Exception { LOG.info("Test " + testName + " started."); int stripeLength = 3; mySetup(stripeLength); long[] crcs = new long[3]; int[] seeds = new int[3]; Path dirPath = new Path("/user/dhruba/raidtest"); Path[] files = TestRaidDfs.createTestFiles(dirPath, fileSizes, blockSizes, crcs, seeds, fileSys, (short)1); Path destPath = new Path("/destraid/user/dhruba"); Path parityFile = new Path("/destraid/user/dhruba/raidtest"); LOG.info("Test " + testName + " created test files"); Configuration localConf = this.getRaidNodeConfig(conf, local); try { cnode = RaidNode.createRaidNode(null, localConf); TestRaidDfs.waitForDirRaided(LOG, fileSys, dirPath, destPath); cnode.stop(); cnode.join(); long parityCRC = RaidDFSUtil.getCRC(fileSys, parityFile); FileStatus parityStat = fileSys.getFileStatus(parityFile); DistributedFileSystem dfs = (DistributedFileSystem)fileSys; LocatedBlocks locs = RaidDFSUtil.getBlockLocations( dfs, parityFile.toUri().getPath(), 0, parityStat.getLen()); String[] corruptFiles = DFSUtil.getCorruptFiles(dfs); assertEquals("no corrupt files expected", 0, corruptFiles.length); assertEquals("filesFixed() should return 0 before fixing files", 0, cnode.blockIntegrityMonitor.getNumFilesFixed()); // Corrupt parity blocks for different stripes. int[] corruptBlockIdxs = new int[]{0, 1, 2}; for (int idx: corruptBlockIdxs) corruptBlock(locs.get(idx).getBlock().getBlockName(), dfsCluster); RaidDFSUtil.reportCorruptBlocks(dfs, parityFile, corruptBlockIdxs, 2*blockSize); corruptFiles = DFSUtil.getCorruptFiles(dfs); assertEquals("file not corrupted", 1, corruptFiles.length); assertEquals("wrong file corrupted", corruptFiles[0], parityFile.toUri().getPath()); cnode = RaidNode.createRaidNode(null, localConf); long start = System.currentTimeMillis(); while (cnode.blockIntegrityMonitor.getNumFilesFixed() < 1 && System.currentTimeMillis() - start < 120000) { LOG.info("Test " + testName + " waiting for files to be fixed."); Thread.sleep(3000); } assertEquals("file not fixed", 1, cnode.blockIntegrityMonitor.getNumFilesFixed()); long checkCRC = RaidDFSUtil.getCRC(fileSys, parityFile); assertEquals("file not fixed", parityCRC, checkCRC); } catch (Exception e) { LOG.info("Test " + testName + " Exception " + e + StringUtils.stringifyException(e)); throw e; } finally { myTearDown(); } LOG.info("Test " + testName + " completed."); }
/** * Create a file with three stripes, corrupt a block each in two stripes, * and wait for the the file to be fixed. */ private void implBlockFix(boolean local) throws Exception { LOG.info("Test testBlockFix started."); long blockSize = 8192L; int stripeLength = 3; mySetup(stripeLength, -1); // never har Path file1 = new Path("/user/dhruba/raidtest/file1"); Path destPath = new Path("/destraid/user/dhruba/raidtest"); long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1, 1, 7, blockSize); long file1Len = fileSys.getFileStatus(file1).getLen(); LOG.info("Test testBlockFix created test files"); // create an instance of the RaidNode Configuration localConf = new Configuration(conf); localConf.setInt("raid.blockfix.interval", 1000); if (local) { localConf.set("raid.blockfix.classname", "org.apache.hadoop.raid.LocalBlockIntegrityMonitor"); } else { localConf.set("raid.blockfix.classname", "org.apache.hadoop.raid.DistBlockIntegrityMonitor"); } localConf.setLong("raid.blockfix.filespertask", 2L); try { cnode = RaidNode.createRaidNode(null, localConf); TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath); cnode.stop(); cnode.join(); FileStatus srcStat = fileSys.getFileStatus(file1); DistributedFileSystem dfs = (DistributedFileSystem)fileSys; LocatedBlocks locs = RaidDFSUtil.getBlockLocations( dfs, file1.toUri().getPath(), 0, srcStat.getLen()); String[] corruptFiles = DFSUtil.getCorruptFiles(dfs); assertEquals("no corrupt files expected", 0, corruptFiles.length); assertEquals("filesFixed() should return 0 before fixing files", 0, cnode.blockIntegrityMonitor.getNumFilesFixed()); // Corrupt blocks in two different stripes. We can fix them. int[] corruptBlockIdxs = new int[]{0, 4, 6}; for (int idx: corruptBlockIdxs) corruptBlock(locs.get(idx).getBlock().getBlockName(), dfsCluster); RaidDFSUtil.reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize); corruptFiles = DFSUtil.getCorruptFiles(dfs); assertEquals("file not corrupted", 1, corruptFiles.length); assertEquals("wrong file corrupted", corruptFiles[0], file1.toUri().getPath()); assertEquals("wrong number of corrupt blocks", 3, RaidDFSUtil.corruptBlocksInFile(dfs, file1.toUri().getPath(), 0, srcStat.getLen()).size()); cnode = RaidNode.createRaidNode(null, localConf); long start = System.currentTimeMillis(); while (cnode.blockIntegrityMonitor.getNumFilesFixed() < 1 && System.currentTimeMillis() - start < 120000) { LOG.info("Test testBlockFix waiting for files to be fixed."); Thread.sleep(1000); } assertEquals("file not fixed", 1, cnode.blockIntegrityMonitor.getNumFilesFixed()); dfs = getDFS(conf, dfs); assertTrue("file not fixed", TestRaidDfs.validateFile(dfs, file1, file1Len, crc1)); } catch (Exception e) { LOG.info("Test testBlockFix Exception " + e + StringUtils.stringifyException(e)); throw e; } finally { myTearDown(); } LOG.info("Test testBlockFix completed."); }
public void testMultiplePriorities() throws Exception { Path srcFile = new Path("/home/test/file1"); int repl = 1; int numBlocks = 8; long blockSize = 16384; int stripeLength = 3; Path destPath = new Path("/destraidrs"); mySetup(stripeLength, -1); // never har Codec codec = Codec.getCodec("rs"); LOG.info("Starting testMultiplePriorities"); try { // Create test file and raid it. TestRaidDfs.createTestFilePartialLastBlock( fileSys, srcFile, repl, numBlocks, blockSize); FileStatus stat = fileSys.getFileStatus(srcFile); RaidNode.doRaid(conf, stat, destPath, codec, new RaidNode.Statistics(), RaidUtils.NULL_PROGRESSABLE, false, repl, repl); // Corrupt first block of file. int blockIdxToCorrupt = 1; LOG.info("Corrupt block " + blockIdxToCorrupt + " of file " + srcFile); LocatedBlocks locations = getBlockLocations(srcFile, stat.getLen()); corruptBlock(locations.get(blockIdxToCorrupt).getBlock().getBlockName(), dfsCluster); RaidDFSUtil.reportCorruptBlocks(fileSys, srcFile, new int[]{1}, blockSize); // Create Block Fixer and fix. FakeDistBlockIntegrityMonitor distBlockFixer = new FakeDistBlockIntegrityMonitor(conf); assertEquals(0, distBlockFixer.submittedJobs.size()); // One job should be submitted. distBlockFixer.getCorruptionMonitor().checkAndReconstructBlocks(); assertEquals(1, distBlockFixer.submittedJobs.size()); // No new job should be submitted since we already have one. distBlockFixer.getCorruptionMonitor().checkAndReconstructBlocks(); assertEquals(1, distBlockFixer.submittedJobs.size()); // Corrupt one more block. blockIdxToCorrupt = 4; LOG.info("Corrupt block " + blockIdxToCorrupt + " of file " + srcFile); locations = getBlockLocations(srcFile, stat.getLen()); corruptBlock(locations.get(blockIdxToCorrupt).getBlock().getBlockName(), dfsCluster); RaidDFSUtil.reportCorruptBlocks(fileSys, srcFile, new int[]{4}, blockSize); // A new job should be submitted since two blocks are corrupt. distBlockFixer.getCorruptionMonitor().checkAndReconstructBlocks(); assertEquals(2, distBlockFixer.submittedJobs.size()); } finally { myTearDown(); } }
private boolean doThePartialTest(Codec codec, int blockNum, int[] corruptBlockIdxs) throws Exception { long blockSize = 8192L; int bufferSize = 4192; Path srcPath = new Path("/user/dikang/raidtest/file" + UUID.randomUUID().toString()); long crc = TestRaidDfs.createTestFilePartialLastBlock(fileSys, srcPath, 1, blockNum, blockSize); DistributedRaidFileSystem raidFs = getRaidFS(); assertTrue(raidFs.exists(srcPath)); // generate the parity files. doRaid(srcPath, codec); FileStatus file1Stat = fileSys.getFileStatus(srcPath); long length = file1Stat.getLen(); LocatedBlocks file1Loc = RaidDFSUtil.getBlockLocations((DistributedFileSystem)fileSys, srcPath.toUri().getPath(), 0, length); // corrupt file1 for (int idx: corruptBlockIdxs) { corruptBlock(file1Loc.get(idx).getBlock().getBlockName(), dfs); } RaidDFSUtil.reportCorruptBlocks((DistributedFileSystem)fileSys, srcPath, corruptBlockIdxs, blockSize); // verify the partial read byte[] buffer = new byte[bufferSize]; FSDataInputStream in = raidFs.open(srcPath); long numRead = 0; CRC32 newcrc = new CRC32(); int num = 0; while (num >= 0) { num = in.read(numRead, buffer, 0, bufferSize); if (num < 0) { break; } numRead += num; newcrc.update(buffer, 0, num); } in.close(); if (numRead != length) { LOG.info("Number of bytes read " + numRead + " does not match file size " + length); return false; } LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc); if (newcrc.getValue() != crc) { LOG.info("CRC mismatch of file " + srcPath.toUri().getPath() + ": " + newcrc + " vs. " + crc); return false; } return true; }
/** * checks the raided file system, prints a list of corrupt files to * System.out and returns the number of corrupt files */ public int fsck(final String path) throws IOException { FileSystem fs = (new Path(path)).getFileSystem(conf); // if we got a raid fs, get the underlying fs if (fs instanceof DistributedRaidFileSystem) { fs = ((DistributedRaidFileSystem) fs).getFileSystem(); } // check that we have a distributed fs if (!(fs instanceof DistributedFileSystem)) { throw new IOException("expected DistributedFileSystem but got " + fs.getClass().getName()); } final DistributedFileSystem dfs = (DistributedFileSystem) fs; // get conf settings String xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath(); String rsPrefix = RaidNode.rsDestinationPath(conf).toUri().getPath(); if (!xorPrefix.endsWith("/")) { xorPrefix = xorPrefix + "/"; } if (!rsPrefix.endsWith("/")) { rsPrefix = rsPrefix + "/"; } LOG.debug("prefixes: " + xorPrefix + ", " + rsPrefix); // get a list of corrupted files (not considering parity blocks just yet) // from the name node // these are the only files we need to consider: // if a file has no corrupted data blocks, it is OK even if some // of its parity blocks are corrupted, so no further checking is // necessary final String[] files = RaidDFSUtil.getCorruptFiles(dfs); final List<Path> corruptFileCandidates = new LinkedList<Path>(); for (final String f: files) { final Path p = new Path(f); // if this file is a parity file // or if it does not start with the specified path, // ignore it if (!p.toString().startsWith(xorPrefix) && !p.toString().startsWith(rsPrefix) && p.toString().startsWith(path)) { corruptFileCandidates.add(p); } } // filter files marked for deletion RaidUtils.filterTrash(conf, corruptFileCandidates); int numberOfCorruptFiles = 0; for (final Path corruptFileCandidate: corruptFileCandidates) { if (isFileCorrupt(dfs, corruptFileCandidate)) { System.out.println(corruptFileCandidate.toString()); numberOfCorruptFiles++; } } return numberOfCorruptFiles; }
/** * Reads through a corrupt source file fixing corrupt blocks on the way. * @param srcPath Path identifying the corrupt file. * @throws IOException * @return true if file has been fixed, false if no fixing * was necessary or possible. */ boolean processCorruptFile(Path srcPath, RaidNode.ParityFilePair parityPair, Decoder decoder, Progressable progress) throws IOException { LOG.info("Processing corrupt file " + srcPath); DistributedFileSystem srcFs = getDFS(srcPath); FileStatus srcStat = srcFs.getFileStatus(srcPath); long blockSize = srcStat.getBlockSize(); long srcFileSize = srcStat.getLen(); String uriPath = srcPath.toUri().getPath(); int numBlocksFixed = 0; List<LocatedBlock> corrupt = RaidDFSUtil.corruptBlocksInFile(srcFs, uriPath, 0, srcFileSize); if (corrupt.size() == 0) { return false; } for (LocatedBlock lb: corrupt) { ExtendedBlock corruptBlock = lb.getBlock(); long corruptOffset = lb.getStartOffset(); LOG.info("Found corrupt block " + corruptBlock + ", offset " + corruptOffset); final long blockContentsSize = Math.min(blockSize, srcFileSize - corruptOffset); File localBlockFile = File.createTempFile(corruptBlock.getBlockName(), ".tmp"); localBlockFile.deleteOnExit(); try { decoder.recoverBlockToFile(srcFs, srcPath, parityPair.getFileSystem(), parityPair.getPath(), blockSize, corruptOffset, localBlockFile, blockContentsSize); // We have a the contents of the block, send them. DatanodeInfo datanode = chooseDatanode(lb.getLocations()); computeMetadataAndSendFixedBlock(datanode, localBlockFile, lb, blockContentsSize); numBlocksFixed++; } finally { localBlockFile.delete(); } progress.progress(); } LOG.info("Fixed " + numBlocksFixed + " blocks in " + srcPath); return true; }
/** * Fixes corrupt blocks in a parity file. * This function uses the corresponding source file to regenerate parity * file blocks. * @return true if file has been fixed, false if no fixing * was necessary or possible. */ boolean processCorruptParityFile(Path parityPath, Encoder encoder, Progressable progress) throws IOException { LOG.info("Processing corrupt file " + parityPath); Path srcPath = sourcePathFromParityPath(parityPath); if (srcPath == null) { LOG.warn("Unusable parity file " + parityPath); return false; } DistributedFileSystem parityFs = getDFS(parityPath); FileStatus parityStat = parityFs.getFileStatus(parityPath); long blockSize = parityStat.getBlockSize(); long parityFileSize = parityStat.getLen(); FileStatus srcStat = getDFS(srcPath).getFileStatus(srcPath); long srcFileSize = srcStat.getLen(); // Check timestamp. if (srcStat.getModificationTime() != parityStat.getModificationTime()) { LOG.info("Mismatching timestamp for " + srcPath + " and " + parityPath + ", moving on..."); return false; } String uriPath = parityPath.toUri().getPath(); int numBlocksFixed = 0; List<LocatedBlock> corrupt = RaidDFSUtil.corruptBlocksInFile(parityFs, uriPath, 0, parityFileSize); if (corrupt.size() == 0) { return false; } for (LocatedBlock lb: corrupt) { ExtendedBlock corruptBlock = lb.getBlock(); long corruptOffset = lb.getStartOffset(); LOG.info("Found corrupt block " + corruptBlock + ", offset " + corruptOffset); File localBlockFile = File.createTempFile(corruptBlock.getBlockName(), ".tmp"); localBlockFile.deleteOnExit(); try { encoder.recoverParityBlockToFile(parityFs, srcPath, srcFileSize, blockSize, parityPath, corruptOffset, localBlockFile); // We have a the contents of the block, send them. DatanodeInfo datanode = chooseDatanode(lb.getLocations()); computeMetadataAndSendFixedBlock(datanode, localBlockFile, lb, blockSize); numBlocksFixed++; } finally { localBlockFile.delete(); } progress.progress(); } LOG.info("Fixed " + numBlocksFixed + " blocks in " + parityPath); return true; }
/** * Reads through a parity HAR part file, fixing corrupt blocks on the way. * A HAR block can contain many file blocks, as long as the HAR part file * block size is a multiple of the file block size. * @return true if file has been fixed, false if no fixing * was necessary or possible. */ boolean processCorruptParityHarPartFile(Path partFile, Progressable progress) throws IOException { LOG.info("Processing corrupt file " + partFile); // Get some basic information. DistributedFileSystem dfs = getDFS(partFile); FileStatus partFileStat = dfs.getFileStatus(partFile); long partFileSize = partFileStat.getLen(); long partFileBlockSize = partFileStat.getBlockSize(); LOG.info(partFile + " has block size " + partFileBlockSize); // Find the path to the index file. // Parity file HARs are only one level deep, so the index files is at the // same level as the part file. String harDirectory = partFile.toUri().getPath(); // Temporarily. harDirectory = harDirectory.substring(0, harDirectory.lastIndexOf(Path.SEPARATOR)); Path indexFile = new Path(harDirectory + "/" + HarIndex.indexFileName); FileStatus indexStat = dfs.getFileStatus(indexFile); // Parses through the HAR index file. HarIndex harIndex = new HarIndex(dfs.open(indexFile), indexStat.getLen()); String uriPath = partFile.toUri().getPath(); int numBlocksFixed = 0; List<LocatedBlock> corrupt = RaidDFSUtil.corruptBlocksInFile(dfs, uriPath, 0, partFileSize); if (corrupt.size() == 0) { return false; } for (LocatedBlock lb: corrupt) { ExtendedBlock corruptBlock = lb.getBlock(); long corruptOffset = lb.getStartOffset(); File localBlockFile = File.createTempFile(corruptBlock.getBlockName(), ".tmp"); localBlockFile.deleteOnExit(); processCorruptParityHarPartBlock(dfs, partFile, corruptBlock, corruptOffset, partFileStat, harIndex, localBlockFile, progress); // Now we have recovered the part file block locally, send it. try { DatanodeInfo datanode = chooseDatanode(lb.getLocations()); computeMetadataAndSendFixedBlock(datanode, localBlockFile, lb, localBlockFile.length()); numBlocksFixed++; } finally { localBlockFile.delete(); } progress.progress(); } LOG.info("Fixed " + numBlocksFixed + " blocks in " + partFile); return true; }
/** * Create a file with three stripes, corrupt a block each in two stripes, * and wait for the the file to be fixed. */ protected void implBlockFix(boolean local) throws Exception { LOG.info("Test testBlockFix started."); long blockSize = 8192L; int stripeLength = 3; mySetup(stripeLength, -1); // never har Path file1 = new Path("/user/dhruba/raidtest/file1"); Path destPath = new Path("/destraid/user/dhruba/raidtest"); long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1, 1, 7, blockSize); long file1Len = fileSys.getFileStatus(file1).getLen(); LOG.info("Test testBlockFix created test files"); // create an instance of the RaidNode Configuration localConf = new Configuration(conf); localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid"); localConf.setInt("raid.blockfix.interval", 1000); if (local) { localConf.set("raid.blockfix.classname", "org.apache.hadoop.raid.LocalBlockFixer"); } else { localConf.set("raid.blockfix.classname", "org.apache.hadoop.raid.DistBlockFixer"); } localConf.setLong("raid.blockfix.filespertask", 2L); try { cnode = RaidNode.createRaidNode(null, localConf); TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath); cnode.stop(); cnode.join(); FileStatus srcStat = fileSys.getFileStatus(file1); DistributedFileSystem dfs = (DistributedFileSystem)fileSys; LocatedBlocks locs = RaidDFSUtil.getBlockLocations( dfs, file1.toUri().getPath(), 0, srcStat.getLen()); String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs); assertEquals("no corrupt files expected", 0, corruptFiles.length); assertEquals("filesFixed() should return 0 before fixing files", 0, cnode.blockFixer.filesFixed()); // Corrupt blocks in two different stripes. We can fix them. int[] corruptBlockIdxs = new int[]{0, 4, 6}; for (int idx: corruptBlockIdxs) corruptBlock(locs.get(idx).getBlock()); reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize); corruptFiles = RaidDFSUtil.getCorruptFiles(dfs); assertEquals("file not corrupted", 1, corruptFiles.length); assertEquals("wrong file corrupted", corruptFiles[0], file1.toUri().getPath()); assertEquals("wrong number of corrupt blocks", 3, RaidDFSUtil.corruptBlocksInFile(dfs, file1.toUri().getPath(), 0, srcStat.getLen()).size()); cnode = RaidNode.createRaidNode(null, localConf); long start = System.currentTimeMillis(); while (cnode.blockFixer.filesFixed() < 1 && System.currentTimeMillis() - start < 120000) { LOG.info("Test testBlockFix waiting for files to be fixed."); Thread.sleep(1000); } assertEquals("file not fixed", 1, cnode.blockFixer.filesFixed()); dfs = getDFS(conf, dfs); assertTrue("file not fixed", TestRaidDfs.validateFile(dfs, file1, file1Len, crc1)); } catch (Exception e) { LOG.info("Test testBlockFix Exception " + e + StringUtils.stringifyException(e)); throw e; } finally { myTearDown(); } LOG.info("Test testBlockFix completed."); }
public void testDecoder() throws Exception { mySetup(); int stripeSize = 10; int paritySize = 4; long blockSize = 8192; Path file1 = new Path("/user/raidtest/file1"); Path recoveredFile1 = new Path("/user/raidtest/file1.recovered"); Path parityFile1 = new Path("/rsraid/user/raidtest/file1"); long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1, 1, 25, blockSize); FileStatus file1Stat = fileSys.getFileStatus(file1); conf.setInt("raid.rsdecoder.bufsize", 512); conf.setInt("raid.rsencoder.bufsize", 512); try { // First encode the file. ReedSolomonEncoder encoder = new ReedSolomonEncoder( conf, stripeSize, paritySize); short parityRepl = 1; encoder.encodeFile(fileSys, file1, fileSys, parityFile1, parityRepl, Reporter.NULL); // Ensure there are no corrupt files yet. DistributedFileSystem dfs = (DistributedFileSystem)fileSys; String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs); assertEquals(corruptFiles.length, 0); // Now corrupt the file. long corruptOffset = blockSize * 5; FileStatus srcStat = fileSys.getFileStatus(file1); LocatedBlocks locations = RaidDFSUtil.getBlockLocations(dfs, file1.toUri().getPath(), 0, srcStat.getLen()); corruptBlock(locations.get(5).getBlock()); corruptBlock(locations.get(6).getBlock()); TestBlockFixer.reportCorruptBlocks(dfs, file1, new int[]{5, 6}, srcStat.getBlockSize()); // Ensure file is corrupted. corruptFiles = RaidDFSUtil.getCorruptFiles(dfs); assertEquals(corruptFiles.length, 1); assertEquals(corruptFiles[0], file1.toString()); // Fix the file. ReedSolomonDecoder decoder = new ReedSolomonDecoder( conf, stripeSize, paritySize); decoder.decodeFile(fileSys, file1, fileSys, parityFile1, corruptOffset, recoveredFile1); assertTrue(TestRaidDfs.validateFile( fileSys, recoveredFile1, file1Stat.getLen(), crc1)); } finally { myTearDown(); } }